U
    ÷Õ/e(  ã                   @  sN  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZ d dlmZmZ d dlmZ d d	lmZ e e¡Ze
d
dƒZG dd„ dƒZeƒ Z dd„ Z!G dd„ de"ƒZ#dd„ Z$G dd„ dƒZ%e&ƒ Z'G dd„ deƒZ(G dd„ deƒZ)G dd„ deƒZ*G dd„ deƒZ+e+ƒ ed< dS )é    )ÚannotationsN)ÚdequeÚ
namedtuple)ÚFuture)ÚIOLoop)ÚCommÚCommClosedErrorÚ	ConnectorÚListener)ÚBackendÚbackends)Únested_deserialize)Úget_ipÚConnectionRequest©Úc2s_qÚs2c_qÚc_loopÚc_addrÚ
conn_eventc                   @  sL   e Zd ZdZdd„ Zedd„ ƒZdd„ Zdd	„ Zd
d„ Z	dd„ Z
dd„ ZdS )ÚManagerz?
    An object coordinating listeners and their addresses.
    c                 C  s*   t  ¡ | _t d¡| _d | _t ¡ | _	d S ©Né   )
ÚweakrefÚWeakValueDictionaryÚ	listenersÚ	itertoolsÚcountÚaddr_suffixesÚ_ipÚ	threadingÚLockÚlock©Úself© r%   ú;/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/inproc.pyÚ__init__   s    
zManager.__init__c                 C  s4   | j s.ztƒ | _ W n tk
r,   d| _ Y nX | j S )Nz	127.0.0.1)r   r   ÚOSErrorr#   r%   r%   r&   Úip%   s    z
Manager.ipc              	   C  s8   | j ( || jkr td|›ƒ‚|| j|< W 5 Q R X d S )Nzalready listening on )r"   r   ÚRuntimeError)r$   ÚaddrÚlistenerr%   r%   r&   Úadd_listener.   s    
zManager.add_listenerc              	   C  s8   | j ( z| j|= W n tk
r(   Y nX W 5 Q R X d S ©N)r"   r   ÚKeyError©r$   r+   r%   r%   r&   Úremove_listener4   s
    zManager.remove_listenerc              
   C  s4   | j $ |  |¡ | j |¡W  5 Q R £ S Q R X d S r.   )r"   Úvalidate_addressr   Úgetr0   r%   r%   r&   Úget_listener_for;   s    
zManager.get_listener_forc                 C  s   d| j t ¡ t| jƒf S )Nz%s/%d/%s)r)   ÚosÚgetpidÚnextr   r#   r%   r%   r&   Únew_address@   s    zManager.new_addressc                 C  sF   |  d¡\}}}|| jks*t|ƒt ¡ krBtd|| jt ¡ f ƒ‚dS )z3
        Validate the address' IP and pid.
        ú/z6inproc address %r does not match host (%r) or pid (%r)N)Úsplitr)   Úintr5   r6   Ú
ValueError)r$   r+   r)   ÚpidÚsuffixr%   r%   r&   r2   C   s    ÿÿzManager.validate_addressN)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r'   Úpropertyr)   r-   r1   r4   r8   r2   r%   r%   r%   r&   r      s   
r   c                   C  s   dt  ¡  S )z!
    Generate a new address.
    ú	inproc://)Úglobal_managerr8   r%   r%   r%   r&   r8   R   s    r8   c                   @  s   e Zd ZdS )Ú
QueueEmptyN)r?   r@   rA   r%   r%   r%   r&   rF   Y   s   rF   c                 C  s   |   ¡ rdS |  |¡ dS )z?Helper setting the result only if the future was not cancelled.N)Z	cancelledÚ
set_result)ÚfutÚresultr%   r%   r&   Ú_set_result_unless_cancelled]   s    rJ   c                   @  sF   e Zd ZdZdd„ Zdd„ Zdd„ Zdd	„ ZeZe	ƒ Z
e
fd
d„ZdS )ÚQueuezI
    A single-reader, single-writer, non-threadsafe, peekable queue.
    c                 C  s   t ƒ | _d | _d S r.   )r   Ú_qÚ_read_futurer#   r%   r%   r&   r'   i   s    zQueue.__init__c                 C  s   | j }|st‚| ¡ S r.   )rL   rF   Úpopleft)r$   Úqr%   r%   r&   Ú
get_nowaitm   s    zQueue.get_nowaitc                 C  s8   | j rtdƒ‚tƒ }| j}|r.| | ¡ ¡ n|| _ |S )NzOnly one reader allowed)rM   ÚAssertionErrorr   rL   rG   rN   )r$   rH   rO   r%   r%   r&   r3   s   s    z	Queue.getc                 C  sD   | j }| j}|d k	r6t|ƒdks$t‚d | _t||ƒ n
| |¡ d S )Nr   )rL   rM   ÚlenrQ   rJ   Úappend)r$   ÚvaluerO   rH   r%   r%   r&   Ú
put_nowait}   s    zQueue.put_nowaitc                 C  s(   | j }|r|d S || jk	r |S t‚dS )zV
        Get the next object in the queue without removing it from the queue.
        r   N)rL   Ú_omittedrF   )r$   ÚdefaultrO   r%   r%   r&   Úpeek‹   s    
z
Queue.peekN)r?   r@   rA   rB   r'   rP   r3   rU   ÚputÚobjectrV   rX   r%   r%   r%   r&   rK   d   s   

rK   c                      sˆ   e Zd ZdZdZdddddœ‡ fdd„Zd	d
„ Zeddœdd„ƒZeddœdd„ƒZ	ddd„Z
ddd„Zdd„ Zdd„ Zdd„ Z‡  ZS )ÚInProcz¼
    An established communication based on a pair of in-process queues.

    Reminder: a Comm must always be used from a single thread.
    Its peer Comm can be running in any thread.
    FTÚstrÚbool)Ú
local_addrÚ	peer_addrÚdeserializec                   sV   t ƒ j|d || _|| _|| _|| _|| _d| _t 	| |  
¡ ¡| _d| j_d| _d S )N)r`   FT)Úsuperr'   Ú_local_addrÚ
_peer_addrÚ_read_qÚ_write_qÚ_write_loopÚ_closedr   ÚfinalizeÚ_get_finalizerÚ
_finalizerÚatexitÚ_initialized)r$   r^   r_   Úread_qÚwrite_qÚ
write_loopr`   ©Ú	__class__r%   r&   r'   ¥   s    	zInProc.__init__c                 C  s    t | ƒ}| j| j|fdd„}|S )Nc                 S  s"   t  d|› ¡ | | jt¡ d S )NzClosing dangling queue in )ÚloggerÚwarningÚadd_callbackrU   Ú_EOF)rn   ro   Úrr%   r%   r&   rh   ½   s    z'InProc._get_finalizer.<locals>.finalize)Úreprre   rf   )r$   rv   rh   r%   r%   r&   ri   º   s    zInProc._get_finalizer)Úreturnc                 C  s   | j S r.   )rb   r#   r%   r%   r&   Úlocal_addressÃ   s    zInProc.local_addressc                 C  s   | j S r.   )rc   r#   r%   r%   r&   Úpeer_addressÇ   s    zInProc.peer_addressÚignoredc                 Ã  sL   | j rtƒ ‚| j ¡ I d H }|tkr:d| _ | j ¡  tƒ ‚| jrHt|ƒ}|S ©NT)	rg   r   rd   r3   ru   rj   Údetachr`   r   )r$   ZdeserializersÚmsgr%   r%   r&   ÚreadË   s    
zInProc.readNc                 Ã  s$   |   ¡ rtƒ ‚| j | jj|¡ dS r   )Úclosedr   rf   rt   re   rU   )r$   r~   ZserializersZon_errorr%   r%   r&   ÚwriteÙ   s    zInProc.writec                 Ã  s   |   ¡  d S r.   )Úabortr#   r%   r%   r&   Úcloseâ   s    zInProc.closec                 C  sF   |   ¡ sB| j | jjt¡ | j t¡ d  | _| _d| _| j 	¡  d S r|   )
r€   rf   rt   re   rU   ru   rd   rg   rj   r}   r#   r%   r%   r&   r‚   å   s    zInProc.abortc                 C  s<   | j r
dS | jr4| j d¡tkr4d| _ | j ¡  dS dS dS )zï
        Whether this comm is closed.  An InProc comm is closed if:
            1) close() or abort() was called on this comm
            2) close() or abort() was called on the other end and the
               read queue is empty
        TNF)rg   rl   rd   rX   ru   rj   r}   r#   r%   r%   r&   r€   î   s    
zInProc.closed)T)r{   )NN)r?   r@   rA   rB   rl   r'   ri   rC   ry   rz   r   r   rƒ   r‚   r€   Ú__classcell__r%   r%   rp   r&   r[   ›   s   	 ù	

		r[   c                   @  sZ   e Zd ZdZddd„Zdd„ Zdd„ Zd	d
„ Zdd„ Zdd„ Z	e
dd„ ƒZe
dd„ ƒZdS )ÚInProcListenerÚinprocTc                 C  s.   t | _|p| j ¡ | _|| _|| _tƒ | _d S r.   )rE   Úmanagerr8   ÚaddressÚcomm_handlerr`   rK   Úlisten_q)r$   rˆ   r‰   r`   r%   r%   r&   r'     s
    zInProcListener.__init__c                 Ã  sJ   z|   |¡I d H  W n  tk
r4   t d¡ Y d S X |  |¡I d H  d S )Nz,Connection closed before handshake completed)Zon_connectionr   rr   Údebugr‰   )r$   Úcommr%   r%   r&   Ú_handle_stream
  s    
zInProcListener._handle_streamc                 Ã  sj   | j  ¡ I d H }|d krqftd| j d|j |j|j|j| jd}|j 	|j
j¡ t ¡  	| j|¡ q d S )NrD   ©r^   r_   rm   rn   ro   r`   )rŠ   r3   r[   rˆ   r   r   r   r   r`   rt   r   Úsetr   Úcurrentr   )r$   Úconn_reqrŒ   r%   r%   r&   Ú_listen  s    ú	zInProcListener._listenc                 C  s   | j  | jj|¡ d S r.   )Úlooprt   rŠ   rU   )r$   r‘   r%   r%   r&   Úconnect_threadsafe#  s    z!InProcListener.connect_threadsafec                 Ã  s.   t  ¡ | _t |  ¡ ¡| _| j | j	| ¡ d S r.   )
r   r   r“   ÚasyncioZensure_futurer’   Z_listen_futurer‡   r-   rˆ   r#   r%   r%   r&   Ústart&  s    
zInProcListener.startc                 C  s   | j  d ¡ | j | j¡ d S r.   )rŠ   rU   r‡   r1   rˆ   r#   r%   r%   r&   Ústop+  s    zInProcListener.stopc                 C  s
   d| j  S ©NrD   ©rˆ   r#   r%   r%   r&   Úlisten_address/  s    zInProcListener.listen_addressc                 C  s
   d| j  S r˜   r™   r#   r%   r%   r&   Úcontact_address3  s    zInProcListener.contact_addressN)T)r?   r@   rA   Úprefixr'   r   r’   r”   r–   r—   rC   rš   r›   r%   r%   r%   r&   r…      s   

r…   c                   @  s   e Zd Zdd„ Zddd„ZdS )ÚInProcConnectorc                 C  s
   || _ d S r.   )r‡   )r$   r‡   r%   r%   r&   r'   9  s    zInProcConnector.__init__Tc                 Ë  sˆ   | j  |¡}|d kr"td|›ƒ‚ttƒ tƒ t ¡ | j  ¡ t 	¡ d}| 
|¡ |j ¡ I d H  td|j d| |j|j|j|d}|S )Nzno endpoint for inproc address r   rD   rŽ   )r‡   r4   r(   r   rK   r   r   r8   r•   ÚEventr”   r   Úwaitr[   r   r   r   r“   )r$   rˆ   r`   Úconnection_argsr,   r‘   rŒ   r%   r%   r&   Úconnect<  s*    û
úzInProcConnector.connectN)T)r?   r@   rA   r'   r¡   r%   r%   r%   r&   r   8  s   r   c                   @  s8   e Zd ZeZdd„ Zdd„ Zdd„ Zdd„ Zd	d
„ Z	dS )ÚInProcBackendc                 C  s
   t | jƒS r.   )r   r‡   r#   r%   r%   r&   Úget_connector^  s    zInProcBackend.get_connectorc                 K  s   t |||ƒS r.   )r…   )r$   ÚlocZhandle_commr`   r    r%   r%   r&   Úget_listenera  s    zInProcBackend.get_listenerc                 C  s   | j  |¡ | j jS r.   )r‡   r2   r)   ©r$   r¤   r%   r%   r&   Úget_address_hostf  s    zInProcBackend.get_address_hostc                 C  s   |S r.   r%   r¦   r%   r%   r&   Úresolve_addressj  s    zInProcBackend.resolve_addressc                 C  s   | j  |¡ | j  ¡ S r.   )r‡   r2   r8   r¦   r%   r%   r&   Úget_local_address_form  s    z#InProcBackend.get_local_address_forN)
r?   r@   rA   rE   r‡   r£   r¥   r§   r¨   r©   r%   r%   r%   r&   r¢   Y  s   r¢   r†   ),Ú
__future__r   r•   r   Úloggingr5   r    r   Úcollectionsr   r   Ztornado.concurrentr   Ztornado.ioloopr   Zdistributed.comm.corer   r   r	   r
   Zdistributed.comm.registryr   r   Zdistributed.protocolr   Zdistributed.utilsr   Ú	getLoggerr?   rr   r   r   rE   r8   Ú	ExceptionrF   rJ   rK   rZ   ru   r[   r…   r   r¢   r%   r%   r%   r&   Ú<module>   s<   
 ÿ54e8!