U
    /e2>                  &   @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	m
Z
 d dlmZ d dlmZ d dlZd dlmZ d dlmZ d dlZd dlmZ d d	lmZ d d
lmZ d dlmZ d dlmZ d dlm Z  d dl!m"Z"m#Z# d dl$m%Z%m&Z& e'dZ(ej)dddZ*ej+de,dddej-de.ddej/de*dddej/de*dddej/de*dddej/d dd!d"ej/d#dd$d"ej/d%e.d&d'dej/d(d)ddd*d+ej/d,e.dd-dej/d.e.dd/dej/d0e.dd1dej/d2e.dd3dej/d4e.dd5dej/d6e0d d7dej/d8d9e.ddd:d;ej/d<e.dd=dej/d>d?dd@dAej/dBddCd"ej/dDe.dEdFdej/dGde.dHdIej/dJe.ddKdej/dLe.ddMdej/dNe.ddOdej/dPe.dEdQdej/dRe.ddSdej/dTe.ddUdej/dVe.dWddXd;ej/dYdZddd[d+ej/d\e.ddd]d^ej-d_d`ej1e daej/dbe.dddcd^ej/dde.ddede2 dfdfdgdhdiZ3dfdfdjdkdldmdndoZ4e5dpkre3  dS )q    )annotationsN)Iterator)suppress)Any)valmap)TimeoutError)	CPU_COUNT)Nanny)wait_for_signals)get_address_host_port)nprocesses_nthreads)validate_preload_argv)enable_proctitle_on_childrenenable_proctitle_on_current)import_termparse_portszdistributed.dask_workerT)existsZresolve_pathZworker)Zignore_unknown_options)nameZcontext_settings	schedulerF)typerequiredz--tls-ca-filez'CA cert(s) file for TLS (in PEM format))r   defaulthelpz
--tls-certz(certificate file for TLS (in PEM format)z	--tls-keyz(private key file for TLS (in PEM format)z--worker-porta-  Serving computation port, defaults to random. When creating multiple workers with --nworkers, a sequential range of worker ports may be used by specifying the first and last available ports like <first-port>:<last-port>. For example, --worker-port=3000:3026 will use ports 3000, 3001, ..., 3025, 3026.)r   r   z--nanny-porta%  Serving nanny port, defaults to random. When creating multiple nannies with --nworkers, a sequential range of nanny ports may be used by specifying the first and last available ports like <first-port>:<last-port>. For example, --nanny-port=3000:3026 will use ports 3000, 3001, ..., 3025, 3026.z--dashboard-addressz:0z4Address on which to listen for diagnostics dashboardz--dashboard/--no-dashboard	dashboardz+Launch the Dashboard [default: --dashboard])r   r   r   z--listen-addressz_The address to which the worker binds. Example: tcp://0.0.0.0:9000 or tcp://:9000 for IPv4+IPv6z--contact-addressz}The address the worker advertises to the scheduler for communication with it and other workers. Example: tcp://127.0.0.1:9000z--hostzServing host. Should be an ip address that is visible to the scheduler and other workers. See --listen-address and --contact-address if you need different listen and contact addresses. See --interface.z--interfacez&Network interface like 'eth0' or 'ib0'z
--protocolzProtocol like tcp, tls, or ucxz
--nthreadszNumber of threads per process.z
--nworkers	n_workerszNumber of worker processes to launch. If negative, then (CPU_COUNT + 1 + nworkers) is used. Set to 'auto' to set nworkers and nthreads dynamically based on CPU_COUNT)r   r   show_defaultr   z--namezA unique name for this worker like 'worker-1'. If used with --nworkers then the process number will be appended like name-0, name-1, name-2, ...z--memory-limitautoa-  
    Bytes of memory per process that the worker can use.
    This can be:
    - an integer (bytes), note 0 is a special case for no memory management.
    - a float (fraction of total system memory).
    - a string (like 5GB or 5000M).
    - 'auto' for automatically computing the memory limit.
    )r   r   r   z--nanny/--no-nannyz@Start workers in nanny process for management [default: --nanny]z
--pid-file zFile to write the process PIDz--local-directoryzDirectory to place worker files)r   r   r   z--resourceszResources for task constraints like "GPU=2 MEM=10e9". Resources are applied separately to each worker process (only relevant when starting multiple worker processes with '--nworkers').z--scheduler-filezXFilename to JSON encoded scheduler information. Use with dask scheduler --scheduler-filez--death-timeoutz.Seconds to wait for a scheduler before closingz--dashboard-prefixzPrefix for the dashboardz
--lifetimez6If provided, shut down the worker after this duration.z--lifetime-staggerz1Random amount by which to stagger lifetime valuesz--worker-classzdask.distributed.Workerz.Worker class used to instantiate workers from.z(--lifetime-restart/--no-lifetime-restartZlifetime_restartzWhether or not to restart the worker after the lifetime lapses. This assumes that you are using the --lifetime and --nanny keywordsz	--preloadzWModule that should be loaded by each worker process like "foo.bar" or "/path/to/foo.py")r   multipleZis_eagerr   Zpreload_argv)nargsr   callbackz--preload-nannyzNModule that should be loaded by each nanny like "foo.bar" or "/path/to/foo.py"z--scheduler-sniz4Scheduler SNI (if different from scheduler hostname)
str | None)worker_port
nanny_portc                   s  dt jd krtdt t \}}}t|d |d |d  t  t	  dd d|fd|fd	|ffD d
krt
 \ndkrdntdk rtd  dkrtd t d dkrstd t d  r|std t d dkr,|r,td t d |s8rR|rRtd t d zT|rt|dd\}t|}dkrd d rt dd\}}n| W n@ tk
r } z tdt|  t d W 5 d}~X Y nX st 	r:t	d}|tt  W 5 Q R X 	fdd}t| rndd tdd D ttndt|}t||
t 
kst!r|d< |d < |d!< t"n|sst#j$%d"ddkrtd#t&t't tW 5 Q R X d$ 
fd%d&}zLzt)*|  W n6 t+t)j+fk
r~   spt(d( t d Y nX W 5 t(d' X dS ))z6Launch a Dask worker attached to an existing schedulerzdask-workerr   z\dask-worker is deprecated and will be removed in a future release; use `dask worker` instead   c                 S  s   i | ]\}}|d k	r||qS N ).0kvr'   r'   ?/tmp/pip-unpacked-wheel-g426oqom/distributed/cli/dask_worker.py
<dictcomp>  s    zmain.<locals>.<dictcomp>tls_ca_fileZtls_worker_certZtls_worker_keyr   N   zVFailed to launch worker. Must specify --nworkers so that there's at least one process.zTFailed to launch worker.  You cannot use the --no-nanny argument when n_workers > 1.zVFailed to launch worker. Must specify --listen-address when --contact-address is givenzPFailed to launch worker. You cannot specify --listen-address when n_workers > 1.zcFailed to launch worker. You cannot specify --listen-address when --worker-port or --host is given.T)strict:[]zFailed to launch worker. wc                     s   t j rt   d S r&   )ospathr   remover'   )pid_filer'   r+   del_pid_filea  s    zmain.<locals>.del_pid_file, c                 s  s   | ]}| d V  qdS )=N)split)r(   pairr'   r'   r+   	<genexpr>i  s     zmain.<locals>.<genexpr>worker_classpreload_nannylisten_addresszscheduler-addresszINeed to provide scheduler address like
dask worker SCHEDULER_ADDRESS:8786Fc                    s   	fddt 
D   fdd}  fdd}t| }t|  }tj||gtjdI d H \}}dd |D  d S )	Nc                   sb   g | ]Z\}}	f
 d ks:dks:dkr>nt d t | d	|qS )r.   Nr   -)	scheduler_filenthreads	resourcessecuritycontact_addresshostr   dashboard_addressr   )str)r(   iZport_kwargs_i)rG   r   rI   rH   kwargsr   r   rD   rE   r   rC   sectr'   r+   
<listcomp>  s.   z%main.<locals>.run.<locals>.<listcomp>c                     s.   t j  I dH  t jdd  D  I dH  dS )z-Wait for all nannies to initialize and finishNc                 s  s   | ]}|  V  qd S r&   )finishedr(   nr'   r'   r+   r>     s     zHmain.<locals>.run.<locals>.wait_for_nannies_to_finish.<locals>.<genexpr>)asynciogatherr'   nanniesr'   r+   wait_for_nannies_to_finish  s    z5main.<locals>.run.<locals>.wait_for_nannies_to_finishc                     s2   t  I dH  dr.tjdd  D  I dH  dS )zTWait for SIGINT or SIGTERM and close all nannies upon receiving one of those signalsNTc                 s  s   | ]}|j d dV  qdS )
   )timeoutN)closerQ   r'   r'   r+   r>     s     zHmain.<locals>.run.<locals>.wait_for_signals_and_close.<locals>.<genexpr>)r
   rS   rT   r'   )rV   nannysignal_firedr'   r+   wait_for_signals_and_close  s    z5main.<locals>.run.<locals>.wait_for_signals_and_close)Zreturn_whenc                 S  s   g | ]}|  qS r'   )result)r(   Ztaskr'   r'   r+   rO     s     )	enumeraterS   Zcreate_taskwaitZFIRST_COMPLETED)rW   r]   Zwait_for_signals_and_close_taskZwait_for_nannies_to_finish_taskdone_)rG   r   rI   rH   rL   r   r   r[   rD   port_kwargsrE   r   rC   rM   r\   rN   rU   r+   run  s     "
zmain.<locals>.runz
End workerzTimed out starting worker),sysargvwarningswarnFutureWarninggcZget_thresholdset_thresholdr   r   r   intr   loggererrorexitr   rJ   
ValueErroropenwriter4   getpidatexitregisterreplacer<   dictr   floatr   _apportion_portslenAssertionErrorr	   daskconfiggetr   	TypeErrorinforS   rd   r   )r   rH   r#   rA   rG   r$   rD   r   r[   r   r7   rE   r   rC   Zdashboard_prefixr-   Ztls_certZtls_keyrI   r?   r@   rL   Zg0Zg1Zg2rb   efr8   rd   r'   )rG   r   rI   rH   rL   r   r   r[   rD   r7   rc   rE   r   rC   rM   r\   rN   r+   main&   s     b







*2
r   rl   boolzlist[dict[str, Any]])r#   r$   r   r[   returnc              	     s  t  ddd fdd}|| }||}dd t D }t|}d}	d}
|	sV|
rzt|\}}W n  tk
r   t|}Y qNY nX z|t| W n tk
r   d	}	Y nX z|t| W qN tk
r   d	}
Y qNX qNg }|D ]\}}|r|s6|r td
|  d| d  dntd
|  d  dt|}t|dkrT|d }|rt|}t|dkrx|d }||d}nd|i}|	| q|S )zSpread out evenly --worker-port and/or --nanny-port ranges to the workers and
    nannies, avoiding overlap.

    Returns
    =======
    List of kwargs to pass to the Worker or Nanny constructors
    r"   zIterator[int | None])sr   c                 3  sX   t | }|dgd gfkr2t D ]}|d V  q n"|D ]}|kr6| |V  q6d S )Nr   )r   rangeadd)r   portsrb   portr   seenr'   r+   parse_unique  s    
z&_apportion_ports.<locals>.parse_uniquec                 S  s   g | ]}t  t  fqS r'   )set)r(   rb   r'   r'   r+   rO     s    z$_apportion_ports.<locals>.<listcomp>TFz(Not enough ports in range --worker_port z --nanny_port z for z workersr.   r   )r   r#   r   )
r   r   iternextStopIterationr   rp   sortedrz   append)r#   r$   r   r[   r   Zworker_ports_iterZnanny_ports_iterr   Z
ports_iterZmore_wpsZmore_npsZworker_ports_iZnanny_ports_irL   ZwpnpZkwargs_ir'   r   r+   ry     sZ    



ry   __main__)6
__future__r   rS   rt   rj   loggingr4   re   rg   collections.abcr   
contextlibr   typingr   ZclickZtlzr   Ztornado.ioloopr   r|   Zdask.systemr   Zdistributedr	   Zdistributed._signalsr
   Zdistributed.commr   Zdistributed.deploy.utilsr   Zdistributed.preloadingr   Zdistributed.proctitler   r   Zdistributed.utilsr   r   	getLoggerrm   PathZpem_file_option_typecommandrw   argumentrJ   optionrl   ZUNPROCESSEDZversion_optionr   ry   __name__r'   r'   r'   r+   <module>   s  
		
      
         T [S
