U
    /e0                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
mZ d dlmZ d dlmZ d dlmZ d dlmZ d dlZd dlmZ eeZd	d
 Zdd ZG dd dZedddZG dd dZdd Z dS )    )annotationsN)Callable)Queue)TypeVar)Future)IOLoop)get_mp_contextc              
   G  sN   z| j |f|  W n4 tk
rH } ztdt|s8 W 5 d}~X Y nX dS )zQ
    Helper to silence "IOLoop is closing" exception on IOLoop.add_callback.
    zIOLoop is clos(ed|ing)N)Zadd_callbackRuntimeErrorresearchstr)loopfuncargsexc r   7/tmp/pip-unpacked-wheel-g426oqom/distributed/process.py_loop_add_callback   s
    r   c              
   O  sR   z|||}W n0 t k
r> } zt| |j| W 5 d }~X Y nX t| |j| d S N)	Exceptionr   Zset_exception
set_result)r   futurer   r   kwargsresr   r   r   r   _call_and_set_future$   s
     r   c                   @  s   e Zd ZdZdZdZdS )_ProcessStateFN)__name__
__module____qualname__is_alivepidexitcoder   r   r   r   r   /   s   r   _T_async_processAsyncProcess)boundc                   @  s  e Zd ZU dZded< d8ddZdd	 Zd
d Zdd ZdddddZ	e
dd Ze
dd Ze
ddddZe
dd Zdd Zddd d!Zddd"d#Zd9d$d%Zd&d' Zd(d)dd*d+d,Zd-d. Zed/d0 Zed1d2 Zed3d4 Zed5d6 Zejd7d6 ZdS ):r#   z
    A coroutine-compatible multiprocessing.Process-alike.
    All normally blocking methods are wrapped in Tornado coroutines.
    zmultiprocessing.Process_processNr   c              	   C  s   |pi }t |s"tdt|t | _|p8tjdd| _t j	dd\}| _
t j| j|||||| j
tjjfd| _| jj| _t| t| j| _t | _t | _d | _d| _|   d S )Nz#`target` needs to be callable, not F)instance)Zduplextargetnamer   )callable	TypeErrortyper   _stater   current_loopr   ZPipe_keep_child_aliveProcess_rundaskconfigglobal_configr%   r)   _nameweakreffinalize_asyncprocess_finalizerZ_proc_finalizerPyQueue_watch_qr   _exit_future_exit_callback_closed_start_threads)selfr   r(   r)   r   r   parent_alive_piper   r   r   __init__@   s8    
  zAsyncProcess.__init__c                 C  s   d| j j d| j dS )N< >)	__class__r   r6   r@   r   r   r   __repr__j   s    zAsyncProcess.__repr__c                 C  s   | j rtdd S )Nz(invalid operation on closed AsyncProcess)r>   
ValueErrorrG   r   r   r   _check_closedm   s    zAsyncProcess._check_closedc              	   C  sp   t j| jd| j t| | j| j| j| j	| j
fd| _d| j_| j  dd }tj| || j	d| _d| j_d S )Nz#AsyncProcess %s watch message queuer'   Tc                 S  s   |  ddi d S )Nopstop)
put_nowaitqr   r   r   stop_thread   s    z0AsyncProcess._start_threads.<locals>.stop_threadrN   F)	threadingThread_watch_message_queuer)   r7   refr%   r/   r-   r;   r<   Z_watch_message_threaddaemonstartr8   _thread_finalizeratexit)r@   rP   r   r   r   r?   q   s     
zAsyncProcess._start_threadsintNone)r!   returnc                 C  s*   d | _ | jd k	r| |  | j| d S r   )r%   r=   r<   r   )r@   r!   r   r   r   _on_exit   s    

zAsyncProcess._on_exitc                   s*    fdd}t j|d}d|_|  dS )zP
        Immediately exit the process when parent_alive_pipe is closed.
        c                     s8   z    W n tk
r*   td Y n
X tdd S )Nz'unexpected state: should be unreachable)recvEOFErroros_exitr	   r   rA   r   r   monitor_parent   s
    
z@AsyncProcess._immediate_exit_when_closed.<locals>.monitor_parent)r(   TN)rQ   rR   rU   rV   )clsrA   rc   tr   rb   r   _immediate_exit_when_closed   s    z(AsyncProcess._immediate_exit_when_closedc                 C  s@   |   | | dt _tjjtjj|dd ||| d S )N
MainThreadold)priority)	closerf   rQ   current_threadr)   r3   r4   updater5   )rd   r(   r   r   rA   r0   Zinherit_configr   r   r   r2      s
    

zAsyncProcess._run)processc           
        s   t   j  fdd} }td d| |d }	|	dkrht||d | q(|	dkrt||d j q(|	d	krt||d j q(|	d
krqq(ds(t|q(d S )Nc                    s^      tjtjd  fd} d| _|    d_j_t	d dj d S )Nz"AsyncProcess %s watch process joinr'   T[z] created process with pid )
rV   rQ   rR   r#   _watch_processrU   r   r    loggerdebug)threadr)   rm   rO   rselfrefstater   r   _start   s    
z1AsyncProcess._watch_message_queue.<locals>._startrn   z] got message rK   rV   r   	terminatekillrL   r   )	reprr)   getrp   rq   r   rx   ry   AssertionError)
rd   ru   rm   r   rv   rO   Zexit_futurerw   msgrK   r   rs   r   rS      s    
z!AsyncProcess._watch_message_queuec           	      C  s   t | }|  |j }}|d kr(d}d|_||_| }z|d k	rTt|j|j| W 5 d }X |d krxtd||j	 nt
d||j	| d S )N   FzE[%s] process %r exit status was already read will report exitcode 255z#[%s] process %r exited with code %r)rz   joinr!   r   r   r/   r\   rp   warningr    rq   )	rd   ru   rm   rv   rO   rt   r!   Zoriginal_exit_coder@   r   r   r   ro      s&    

zAsyncProcess._watch_processc                 C  s$   |    t }| jd|d |S )zQ
        Start the child process.

        This method returns a future.
        rV   rK   r   rJ   r   r;   rM   r@   Zfutr   r   r   rV     s    zAsyncProcess.startzasyncio.Future[None])r[   c                 C  s$   |    t }| jd|d |S )zTerminate the child process.

        This method returns a future.

        See also
        --------
        multiprocessing.Process.terminate
        rx   r   r   r   r   r   r   rx     s    	zAsyncProcess.terminatec                 C  s$   |    t }| jd|d |S )zSend SIGKILL to the child process.
        On Windows, this is the same as terminate().

        This method returns a future.

        See also
        --------
        multiprocessing.Process.kill
        ry   r   r   r   r   r   r   ry   !  s    
zAsyncProcess.killc                   sJ   |    | jjdk	std| jjdk	r,dS tt| j|I dH  dS )z_
        Wait for the child process to exit.

        This method returns a coroutine.
        Nzcan only join a started process)	rJ   r-   r    r|   r!   asynciowait_forZshieldr<   )r@   timeoutr   r   r   r   0  s
    zAsyncProcess.joinc                 C  s   | j s|   d| _d| _ dS )z
        Stop helper thread and release resources.  This method returns
        immediately and does not ensure the child process has exited.
        NT)r>   rW   r%   rG   r   r   r   rj   >  s    zAsyncProcess.closer"   z"Callable[[_T_async_process], None])r@   r   r[   c                 C  s@   t |rtdt|s"td| jjdks6td|| _dS )z
        Set a function to be called by the event loop when the process exits.
        The function is called with the AsyncProcess as sole argument.

        The function may not be a coroutine function.
        z-exit callback may not be a coroutine functionz exit callback should be callableNz5cannot set exit callback when process already started)inspectiscoroutinefunctionr|   r*   r-   r    r=   )r@   r   r   r   r   set_exit_callbackH  s    

zAsyncProcess.set_exit_callbackc                 C  s   | j jS r   )r-   r   rG   r   r   r   r   [  s    zAsyncProcess.is_alivec                 C  s   | j jS r   )r-   r    rG   r   r   r   r    ^  s    zAsyncProcess.pidc                 C  s   | j jS r   )r-   r!   rG   r   r   r   r!   b  s    zAsyncProcess.exitcodec                 C  s   | j S r   )r6   rG   r   r   r   r)   f  s    zAsyncProcess.namec                 C  s   | j jS r   r%   rU   rG   r   r   r   rU   j  s    zAsyncProcess.daemonc                 C  s   || j _d S r   r   )r@   valuer   r   r   rU   n  s    )NNNr   N)N)r   r   r   __doc____annotations__rB   rH   rJ   r?   r\   classmethodrf   r2   rS   ro   rV   rx   ry   r   rj   r   r   propertyr    r!   r)   rU   setterr   r   r   r   r#   8   s@   

*

*






c                 C  s>   |   r:ztd|   |   W n tk
r8   Y nX d S )Nzreaping stray process )r   rp   inforx   OSError)procr   r   r   r9   s  s    r9   )!
__future__r   r   r   loggingmultiprocessingr`   r
   rQ   r7   collections.abcr   queuer   r:   typingr   Ztornado.concurrentr   Ztornado.ioloopr   r3   Zdistributed.utilsr   	getLoggerr   rp   r   r   r   r"   r#   r9   r   r   r   r   <module>   s0   
  =