U
    /e                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZmZ d dlmZ d dlmZ d d	lmZ eeZG d
d deZdd ddddddejf	ddddddddddddddZdS )    )annotationsN)Any)	CPU_COUNT)WINDOWS)ProcessInterfaceSpecCluster)nprocesses_nthreads)	Scheduler)parse_memory_limitc                      s~   e Zd ZU dZded< ded< ded< ded< d	ed
< ddddddd fddZdd fddZdd fddZ  ZS )SubprocessWorkerae  A local Dask worker running in a dedicated subprocess

    Parameters
    ----------
    scheduler:
        Address of the scheduler
    worker_class:
        Python class to use to create the worker, defaults to 'distributed.Nanny'
    name:
        Name of the worker
    worker_kwargs:
        Keywords to pass on to the ``Worker`` class constructor
    str	schedulerworker_classdictworker_kwargs
str | Nonenamez!asyncio.subprocess.Process | Noneprocessdistributed.NannyNdict | NoneNone)r   r   r   r   returnc                   sB   t rtd|| _|| _|| _t|p(i | _d | _t 	  d S )Nz*SubprocessWorker does not support Windows.)
r   RuntimeErrorr   r   r   copyr   r   super__init__)selfr   r   r   r   	__class__ A/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/subprocess.pyr   -   s    zSubprocessWorker.__init__)r   c                   sF   t dd| jdtd| j| jdiI d H | _t 	 I d H  d S )NZdaskspecz--specr   )clsopts)
asyncioZcreate_subprocess_execr   jsondumpsr   r   r   r   start)r   r   r   r    r'   >   s    zSubprocessWorker.startc                   sj   | j rP| j jd krPt| j jjddD ]}|  q(| j   | j  I d H  d | _ t 	 I d H  d S )NT)	recursive)
r   
returncodepsutilProcesspidchildrenkillwaitr   close)r   childr   r   r    r0   H   s    

zSubprocessWorker.close)r   NN)	__name__
__module____qualname____doc____annotations__r   r'   r0   __classcell__r   r   r   r    r      s   
   
r   z:8787r   r   intr   r   z
int | Noner   r   )hostscheduler_portscheduler_kwargsdashboard_addressr   	n_workersthreads_per_workerr   silence_logskwargsr   c	                   s$  t rtd| sd} |pi }|p"i }|dkr>|dkr>t \}}|dkr\|dk	r\tdt| }|r|dkrtdttt| }|rd|krtdd|t	d|d< |dk	st
t| ||dk	|d|}t| ||d	|}t|d
}
t||dd
  fddt|D }tf ||
 d|d|	S )aw  Create in-process scheduler and workers running in dedicated subprocesses

    This creates a "cluster" of a scheduler running in the current process and
    workers running in dedicated subprocesses.

    .. warning::

       This function is experimental

    Parameters
    ----------
    host:
        Host address on which the scheduler will listen, defaults to localhost
    scheduler_port:
        Port fo the scheduler, defaults to 0 to choose a random port
    scheduler_kwargs:
            Keywords to pass on to scheduler
    dashboard_address:
        Address on which to listen for the Bokeh diagnostics server like
        'localhost:8787' or '0.0.0.0:8787', defaults to ':8787'

        Set to ``None`` to disable the dashboard.
        Use ':0' for a random port.
    worker_class:
        Worker class to instantiate workers from, defaults to 'distributed.Nanny'
    n_workers:
        Number of workers to start
    threads:
        Number of threads per each worker
    worker_kwargs:
        Keywords to pass on to the ``Worker`` class constructor
    silence_logs:
        Level of logs to print out to stdout, defaults to ``logging.WARN``

        Use a falsy value like False or None to disable log silencing.

    Examples
    --------
    >>> cluster = SubprocessCluster()  # Create a subprocess cluster  #doctest: +SKIP
    >>> cluster  # doctest: +SKIP
    SubprocessCluster(SubprocessCluster, 'tcp://127.0.0.1:61207', workers=5, threads=10, memory=16.00 GiB)

    >>> c = Client(cluster)  # connect to subprocess cluster  # doctest: +SKIP

    Scale the cluster to three workers

    >>> cluster.scale(3)  # doctest: +SKIP
    z+SubprocessCluster does not support Windows.z	127.0.0.1N   Zmemory_limitauto)logger)r9   portZ	dashboardr<   )r9   Znthreadsr?   )r"   options)r   r   c                   s   i | ]
}| qS r   r   ).0iworkerr   r    
<dictcomp>   s      z%SubprocessCluster.<locals>.<dictcomp>SubprocessCluster)workersr   rI   r   r?   )r   r   r   maxr   r8   mathceilr
   rC   AssertionErrortoolzmerger	   r   ranger   )r9   r:   r;   r<   r   r=   r>   r   r?   r@   r   rL   r   rH   r    rK   R   sb    <
   
		
rK   )
__future__r   r$   r   r%   loggingrN   typingr   r*   rQ   Zdask.systemr   Zdistributed.compatibilityr   Zdistributed.deploy.specr   r   Zdistributed.deploy.utilsr   Zdistributed.schedulerr	   Zdistributed.worker_memoryr
   	getLoggerr2   rC   r   WARNrK   r   r   r   r    <module>   s4   
;