U
    /e(                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ eeZG dd deZdS )    )annotationsN)	CPU_COUNT)get_template)SpecCluster)nprocesses_nthreads)Nanny)	Scheduler)Security)Worker)parse_memory_limitc                      sj   e Zd ZdZdddddddddejddddddddddddddf fdd	Zd	d
 Zd fdd	Z  Z	S )LocalClusterug  Create local Scheduler and Workers

    This creates a "cluster" of a scheduler and workers running on the local
    machine.

    Parameters
    ----------
    n_workers: int
        Number of workers to start
    memory_limit: str, float, int, or None, default "auto"
        Sets the memory limit *per worker*.

        Notes regarding argument data type:

        * If None or 0, no limit is applied.
        * If "auto", the total system memory is split evenly between the workers.
        * If a float, that fraction of the system memory is used *per worker*.
        * If a string giving a number of bytes (like ``"1GiB"``), that amount is used *per worker*.
        * If an int, that number of bytes is used *per worker*.

        Note that the limit will only be enforced when ``processes=True``, and the limit is only
        enforced on a best-effort basis — it's still possible for workers to exceed this limit.
    processes: bool
        Whether to use processes (True) or threads (False).  Defaults to True, unless
        worker_class=Worker, in which case it defaults to False.
    threads_per_worker: int
        Number of threads per each worker
    scheduler_port: int
        Port of the scheduler.  8786 by default, use 0 to choose a random port
    silence_logs: logging level
        Level of logs to print out to stdout.  ``logging.WARN`` by default.
        Use a falsey value like False or None for no change.
    host: string
        Host address on which the scheduler will listen, defaults to only localhost
    ip: string
        Deprecated.  See ``host`` above.
    dashboard_address: str
        Address on which to listen for the Bokeh diagnostics server like
        'localhost:8787' or '0.0.0.0:8787'.  Defaults to ':8787'.
        Set to ``None`` to disable the dashboard.
        Use ':0' for a random port.
    worker_dashboard_address: str
        Address on which to listen for the Bokeh worker diagnostics server like
        'localhost:8787' or '0.0.0.0:8787'.  Defaults to None which disables the dashboard.
        Use ':0' for a random port.
    diagnostics_port: int
        Deprecated.  See dashboard_address.
    asynchronous: bool (False by default)
        Set to True if using this cluster within async/await functions or within
        Tornado gen.coroutines.  This should remain False for normal use.
    blocked_handlers: List[str]
        A list of strings specifying a blocklist of handlers to disallow on the
        Scheduler, like ``['feed', 'run_function']``
    service_kwargs: Dict[str, Dict]
        Extra keywords to hand to the running services
    security : Security or bool, optional
        Configures communication security in this cluster. Can be a security
        object, or True. If True, temporary self-signed credentials will
        be created automatically.
    protocol: str (optional)
        Protocol to use like ``tcp://``, ``tls://``, ``inproc://``
        This defaults to sensible choice given other keyword arguments like
        ``processes`` and ``security``
    interface: str (optional)
        Network interface to use.  Defaults to lo/localhost
    worker_class: Worker
        Worker class used to instantiate workers from. Defaults to Worker if
        processes=False and Nanny if processes=True or omitted.
    **worker_kwargs:
        Extra worker arguments. Any additional keyword arguments will be passed
        to the ``Worker`` class constructor.

    Examples
    --------
    >>> cluster = LocalCluster()  # Create a local cluster  # doctest: +SKIP
    >>> cluster  # doctest: +SKIP
    LocalCluster("127.0.0.1:8786", workers=8, threads=8)

    >>> c = Client(cluster)  # connect to local cluster  # doctest: +SKIP

    Scale the cluster to three workers

    >>> cluster.scale(3)  # doctest: +SKIP

    Pass extra keyword arguments to Bokeh

    >>> LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})  # doctest: +SKIP
    Nr   z:8787F   c                   s  |d k	r|}|d k	r"t d |}|dkr8t d d }d|krJt d |d krd|d kpbt|t}|d krx|rttnt}d | _|| _|d krt }n$|dkrt }nt	|tst
d|d kr|rd|kr|dd }n$|r|jrd	}n| js|	sd
}nd}|ds|d }|d kr6|ds6|s6d}|p>i }|pHi }|d krx|d krx|rpt \}}nd}t}|d kr|d k	r|rtdt| nd}|r|d krtdttt| }|rd|krtdd|td|d< ||||||d k	||||
d	 ttt|||||	|||d k	||d
|p>i d}||d  fddt|D }t j||| |||
||d	 d S )NzMdiagnostics_port has been deprecated. Please use `dashboard_address=` insteadr   z_Setting `threads_per_worker` to 0 has been deprecated. Please set to None or to a specific int.	dashboardzSetting `dashboard` is discouraged. Please set `dashboard_address` to affect the scheduler (more common) and `worker_dashboard_address` for the worker (less common).Tz"security must be a Security objectz://ztls://z	inproc://ztcp://Zinprocz	127.0.0.1r   Zmemory_limitauto)logger)	hostZnthreadsservicesdashboard_addressr   	interfaceprotocolsecuritysilence_logs)
r   r   service_kwargsr   portr   r   r   r   blocked_handlers)clsoptionsc                   s   i | ]
}| qS  r   ).0iworkerr   </tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/local.py
<dictcomp>   s      z)LocalCluster.__init__.<locals>.<dictcomp>)	name	schedulerworkersr!   loopasynchronousr   r   scheduler_sync_interval)warningswarn
issubclassr   r
   status	processesr	   	temporary
isinstance	TypeErrorsplitZrequire_encryptionendswith
startswithr   r   maxintmathceilr   r   updater   toolzmergedictrangesuper__init__)selfr$   Z	n_workersZthreads_per_workerr.   r'   startr   ipZscheduler_portr   r   Zworker_dashboard_addressZdiagnostics_portr   Zworker_servicesr   r(   r   r   r   r   Zworker_classZscheduler_kwargsr)   Zworker_kwargsr%   r&   	__class__r    r"   r?   q   s    






   

zLocalCluster.__init__c                 O  s   t dd S )NzdThe `cluster.start_worker` function has been removed. Please see the `cluster.scale` method instead.)NotImplementedError)r@   argskwargsr   r   r"   start_worker	  s    zLocalCluster.start_workerc                   s(   t dj| jj| j|d}t j|dS )Nzlocal_cluster.html.j2)r-   r.   cluster_status)rI   )r   renderr-   r$   r.   r>   _repr_html_)r@   rI   rC   r   r"   rK     s    zLocalCluster._repr_html_)N)
__name__
__module____qualname____doc__loggingWARNr?   rH   rK   __classcell__r   r   rC   r"   r      s:   [ r   )
__future__r   rP   r7   r*   r:   Zdask.systemr   Zdask.widgetsr   Zdistributed.deploy.specr   Zdistributed.deploy.utilsr   Zdistributed.nannyr   Zdistributed.schedulerr   Zdistributed.securityr	   Zdistributed.workerr
   Zdistributed.worker_memoryr   	getLoggerrL   r   r   r   r   r   r"   <module>   s   
