U
    /e<                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ eeZG d	d
 d
Zdd ZdddZdddZG dd dZdS )    )annotationsN)Queue)Thread)sleep)merge)gen)timec                   @  s,   e Zd ZdZdZdZdZdZdZdZ	dZ
d	S )
bcolorsz[95mz[94mz[92mz[93mz[91mz[0mz[1mz[4mN)__name__
__module____qualname__HEADERZOKBLUEZOKGREENWARNINGFAILENDCBOLDZ	UNDERLINE r   r   >/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/old_ssh.pyr	      s   r	   c              
     s\  dd l }ddlm  ddlm}m} | }||  d}zDt	
dt	j |jd d d d d	d
d
d W q^W q> ||fk
rZ } ztdtj djd d d d tj  ttjd t| tj  t  |d7 }|dkr"tdtj d tj  td tdtj d| d tj  td W 5 d }~X Y q>X q>tdjd d d |jdd  d d	d\}jd  fdd fd d!fd"d#}| }	d$  rtd% |	  | rqqt }
t |
d& k rH d' | r<qHtd% q!  |!  d S )(Nr   )PipeTimeout)PasswordRequiredExceptionSSHExceptionparamikoaddressssh_usernamessh_portssh_private_keyT   )hostnameusernameportZkey_filenamecompresstimeoutZbanner_timeoutz[ dask ssh ] : zDSSH connection error when connecting to {addr}:{port} to run '{cmd}'cmd)addrr   r"   z,               SSH reported this exception:       z/SSH connection failed after 3 retries. Exiting.z               zRetrying... (attempt z/3)z[ {label} ] : {cmd}label)r&   r"   z$SHELL -i -c '')Zget_pty皙?c               	     sr   zR  } t| dkrP|  } td|  d djd | d   } q
W n  tjfk
rl   Y nX dS )z<
        Read stdout stream, time out if necessary.
        r   zstdout from ssh channel: %soutput_queuez[ {label} ] : {output}r&   )r&   outputN)	readlinelenrstriploggerdebugputformatsocketr!   line)r   cmd_dictstdoutr   r   read_from_stdout}   s     z#async_ssh.<locals>.read_from_stdoutc               	     s   z`  } t| dkr^|  } td|  d djd dtj |  tj	    } q
W n  t
jfk
rz   Y nX dS )z<
        Read stderr stream, time out if necessary.
        r   zstderr from ssh channel: %sr)   [ {label} ] : r&   r&   N)r+   r,   r-   r.   r/   r0   r1   r	   r   r   r2   r!   r3   )r   r5   stderrr   r   read_from_stderr   s"    z#async_ssh.<locals>.read_from_stderrc                    sV          rR  } d djd dtj d t|  tj  dS dS )zp
        Communicate a little bit, without blocking too long.
        Return True if the command ended.
        r)   r8   r&   r9   z'remote process exited with exit status TN)Zexit_status_readyZrecv_exit_statusr0   r1   r	   r   strr   )exit_status)channelr5   r;   r7   r   r   communicate   s     zasync_ssh.<locals>.communicateinput_queueg      ?g      @   )"r   Zparamiko.buffered_piper   Zparamiko.ssh_exceptionr   r   Z	SSHClientZset_missing_host_key_policyZAutoAddPolicylogging	getLoggersetLevelWARNconnectprintr	   r   r1   r   r<   	traceback	print_excos_exitr   Zexec_commandr>   
settimeoutZget_transportemptyZsend_ignorer   sendclose)r5   r   r   r   sshretriesestdinr?   	transportstartr   )r   r>   r5   r;   r7   r:   r6   r   	async_ssh"   s    




	 

rV   c              
   C  s   dj |ptj|d}| d k	r@d|  d| }|dj ||| d7 }tj d| d| tj }t }	t }
|||||	|
|||d		}tt|gd
}d|_	|
  t|d|iS )Nz8{python} -m distributed.cli.dask_scheduler --port {port})pythonr   	mkdir -p  && z,&> {logdir}/dask_scheduler_{addr}:{port}.log)r#   r   logdirz
scheduler :)	r"   r&   r   r   r@   r)   r   r   r   targetargsTthread)r1   sys
executabler	   r   r   r   r   rV   daemonrU   r   )rZ   r#   r   r   r   r   remote_pythonr"   r&   r@   r)   r5   r_   r   r   r   start_scheduler   s8       rd   distributed.cli.dask_workerc                 C  s   d|dkrdnd }|	s |d7 }|
r,|d7 }|r8|d7 }|rD|d7 }|j |pPtj|||||||
||d	
}|d k	r|d
j |d7 }| d k	rd|  d| }|dj || d7 }d| }t }t }||||||||d}tt|gd}d|_|  t|d|iS )NzX{python} -m {remote_dask_worker} {scheduler_addr}:{scheduler_port} --nthreads {nthreads}r$   z --nworkers {n_workers} z --host {worker_addr}z --memory-limit {memory_limit}z --worker-port {worker_port}z --nanny-port {nanny_port})
rW   remote_dask_workerscheduler_addrscheduler_portworker_addrnthreads	n_workersmemory_limitworker_port
nanny_portz$ --local-directory {local_directory})local_directoryrX   rY   z%&> {logdir}/dask_scheduler_{addr}.log)r#   rZ   zworker )r"   r&   r   r@   r)   r   r   r   r\   Tr_   )	r1   r`   ra   r   r   rV   rb   rU   r   )rZ   rh   ri   rj   rk   rl   r   r   r   nohostrm   rn   ro   rc   rg   rp   r"   r&   r@   r)   r5   r_   r   r   r   start_worker   sd     
rr   c                   @  sr   e Zd ZdddZejdd	 Zed
d Zej	dd Zedd Z
dd Zdd Zdd Zdd Zdd ZdS )
SSHClusterr   N   Fre   c                 K  sJ  || _ || _|| _|dd }|r:tdd|  |d k	rT|d k	rTtdn&|d k	rnt	dt
 |}n|d krzd}|| _|| _|| _|| _|	| _|| _|| _|| _|| _|| _|| _dd l}|
d k	rtj|
d|j d	 }
ttjd
j|
d tj  |
| _ g | _!t"|
||||||| _#g | _$|D ]}| %| q4d S )Nnprocsz.__init__() got an unexpected keyword argument z, z=Both nprocs and n_workers were specified. Use n_workers only.zZThe nprocs argument will be removed in a future release. It has been renamed to n_workers.r$   r   z	dask-ssh_z%Y-%m-%d_%H:%M:%SzaOutput will be redirected to logfiles stored locally on individual worker nodes under "{logdir}".)rZ   )&rh   ri   rk   pop	TypeErrorjoinkeys
ValueErrorwarningswarnFutureWarningrl   r   r   r   rq   rc   rm   rn   ro   rg   rp   datetimerJ   pathnowstrftimerG   r	   r   r1   r   rZ   threadsrd   	schedulerworkers
add_worker)selfrh   ri   Zworker_addrsrk   rl   r   r   r   rq   rZ   rc   rm   rn   ro   rg   rp   kwargsru   r~   r#   r   r   r   __init__S  st    
zSSHCluster.__init__c                 C  s   d S Nr   r   r   r   r   _start  s    zSSHCluster._startc                 C  s   t dt | jS Nz[The nprocs attribute will be removed in a future release. It has been renamed to n_workers.r{   r|   r}   rl   r   r   r   r   ru     s
    zSSHCluster.nprocsc                 C  s   t dt || _d S r   r   )r   valuer   r   r   ru     s
    c                 C  s   d| j | jf S )Nz%s:%d)rh   ri   r   r   r   r   scheduler_address  s    zSSHCluster.scheduler_addressc                 C  s^   | j g| j }z6|D ]"}|d  st|d   qqtd qW n tk
rX   Y nX d S )Nr)   r(   )r   r   rM   rG   getr   KeyboardInterruptr   Zall_processesprocessr   r   r   monitor_remote_processes  s    z#SSHCluster.monitor_remote_processesc                 C  sP   | j t| j| j| j|| j| j| j| j	| j
| j| j| j| j| j| j| j d S r   )r   appendrr   rZ   rh   ri   rk   rl   r   r   r   rq   rm   rn   ro   rc   rg   rp   )r   r   r   r   r   r     s(    zSSHCluster.add_workerc                 C  s6   | j g| j }|D ]}|d d |d   qd S )Nr@   shutdownr_   )r   r   r0   rx   r   r   r   r   r     s    zSSHCluster.shutdownc                 C  s   | S r   r   r   r   r   r   	__enter__  s    zSSHCluster.__enter__c                 C  s   |    d S r   )r   )r   exc_type	exc_valuerH   r   r   r   __exit__   s    zSSHCluster.__exit__)r   NNrt   NFNNNNNre   N)r
   r   r   r   r   	coroutiner   propertyru   setterr   r   r   r   r   r   r   r   r   r   rs   R  s6                
_



rs   )N)Nre   N)
__future__r   rB   rJ   r2   r`   rH   r{   queuer   	threadingr   r   r   Ztlzr   Ztornador   Zdistributed.metricsrC   r
   r.   r	   rV   rd   rr   rs   r   r   r   r   <module>   s.   
 1 
8   
W