U
    /e".                     @  s*  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlm	Z	m
Z
 d dlmZ d dlmZmZ d dlZd dlmZ d dlmZ d dlmZ d d	lmZ d d
lmZ d dlmZ eeZG dd de Z!G dd de!Z"G dd de	Z#G dd de	Z$G dd de	Z%dddZ&dddZ'dS )    )annotationsN)ABCabstractmethod)suppress)AnyClassVar)parse_timedelta)registry)parse_address)time)get_default_compression)HIGHEST_PROTOCOLc                   @  s   e Zd ZdS )CommClosedErrorN__name__
__module____qualname__ r   r   9/tmp/pip-unpacked-wheel-g426oqom/distributed/comm/core.pyr      s   r   c                   @  s   e Zd ZdS )FatalCommClosedErrorNr   r   r   r   r   r      s   r   c                   @  s   e Zd ZU dZe Zded< ded< ded< ded< ded	< d
ed< d)d
dddZe	d*ddZ
e	d+ddZe	dd Ze	dd Ze	dd Zee	ddddZee	dddd Zed!d" Zed#d$ Zed%d& Zd'd( ZdS ),Comma  
    A message-oriented communication object, representing an established
    communication channel.  There should be only one reader and one
    writer at a time: to manage current communications, even with a
    single peer, you must create distinct ``Comm`` objects.

    Messages are arbitrary Python objects.  Concrete implementations
    of this class can implement different serialization mechanisms
    depending on the underlying transport's characteristics.
    zClassVar[weakref.WeakSet[Comm]]
_instancesz
str | Nonenamedict
local_inforemote_infohandshake_optionsbooldeserializeT)r   c                 C  s4   | j |  d| _d | _i | _i | _i | _|| _d S )NT)r   addZallow_offloadr   r   r   r   r   )selfr   r   r   r   __init__4   s    zComm.__init__Nc                   s   dS )aW  
        Read and return a message (a Python object).

        This method returns a coroutine.

        Parameters
        ----------
        deserializers : dict[str, tuple[Callable, Callable, bool]] | None
            An optional dict appropriate for distributed.protocol.deserialize.
            See :ref:`serialization` for more.
        Nr   )r    Zdeserializersr   r   r   read?   s    z	Comm.readc                   s   dS )a(  
        Write a message (a Python object).

        This method returns a coroutine.

        Parameters
        ----------
        msg
        on_error : str | None
            The behavior when serialization fails. See
            ``distributed.protocol.core.dumps`` for valid values.
        Nr   )r    msgZserializersZon_errorr   r   r   writeM   s    z
Comm.writec                   s   dS )z
        Close the communication cleanly.  This will attempt to flush
        outgoing buffers before actually closing the underlying transport.

        This method returns a coroutine.
        Nr   r    r   r   r   close\   s    z
Comm.closec                 C  s   dS )z
        Close the communication immediately and abruptly.
        Useful in destructors or generators' ``finally`` blocks.
        Nr   r%   r   r   r   aborte   s    z
Comm.abortc                 C  s   dS )z$Return whether the stream is closed.Nr   r%   r   r   r   closedl   s    zComm.closedstr)returnc                 C  s   dS )z;The local address. For logging and debugging purposes only.Nr   r%   r   r   r   local_addressp   s    zComm.local_addressc                 C  s   dS )z<The peer's address. For logging and debugging purposes only.Nr   r%   r   r   r   peer_addressu   s    zComm.peer_addressc                 C  s   i S )z
        Return backend-specific information about the communication,
        as a dict.  Typically, this is information which is initialized
        when the communication is established and doesn't vary afterwards.
        r   r%   r   r   r   
extra_infoz   s    zComm.extra_infoc                   C  s   t  ttjd d tdS )N   )compressionpythonpickle-protocol)r   tuplesysversion_infor   r   r   r   r   handshake_info   s    zComm.handshake_infoc              
   C  sr   zdt | d |d i}W n, tk
rF } ztd|W 5 d }~X Y nX | d |d krf| d |d< nd |d< |S )Nr1   zYour Dask versions may not be in sync. Please ensure that you have the same version of dask and distributed on your client, scheduler, and worker machinesr/   )minKeyError
ValueError)localremoteouter   r   r   handshake_configuration   s"      zComm.handshake_configurationc                 C  s,   d | jj|  rdnd| jp d| j| jS )Nz<{}{} {} local={} remote={}>z	 (closed) )format	__class__r   r(   r   r+   r,   r%   r   r   r   __repr__   s    zComm.__repr__)T)N)NN)r   r   r   __doc__weakrefWeakSetr   __annotations__r!   r   r"   r$   r&   r'   r(   propertyr+   r,   r-   staticmethodr5   r=   rA   r   r   r   r   r   !   s>   






r   c                   @  sp   e Zd Zedd Zedd Zeedd Zeedd Zd	d
 Z	dd Z
dd ZdddddddZdS )Listenerc                   s   dS )z;
        Start listening for incoming connections.
        Nr   r%   r   r   r   start   s    zListener.startc                 C  s   dS )z
        Stop listening.  This does not shutdown already established
        communications, but prevents accepting new ones.
        Nr   r%   r   r   r   stop   s    zListener.stopc                 C  s   dS )z8
        The listening address as a URI string.
        Nr   r%   r   r   r   listen_address   s    zListener.listen_addressc                 C  s   dS )z
        An address this listener can be contacted on.  This can be
        different from `listen_address` if the latter is some wildcard
        address such as 'tcp://0.0.0.0:123'.
        Nr   r%   r   r   r   contact_address   s    zListener.contact_addressc                   s   |   I d H  | S NrI   r%   r   r   r   
__aenter__   s    zListener.__aenter__c                   s    |   }t|r|I d H  d S rM   )rJ   inspectisawaitable)r    exc_type	exc_value	tracebackfuturer   r   r   	__aexit__   s    
zListener.__aexit__c                   s    fdd}|   S )Nc                     s      I d H   S rM   rN   r   r%   r   r   _   s    zListener.__await__.<locals>._)	__await__)r    rW   r   r%   r   rX      s    zListener.__await__Nr   zdict[str, Any] | NoneNone)commhandshake_overridesr*   c                   s   |  |pi }tjd}t|dd}z6tj|||dI d H  tj| |dI d H }W nV t	k
r } z8t
t	 | I d H  W 5 Q R X td|d|W 5 d }~X Y nX ||_|j|jd< ||_|j|jd< ||j|j|_d S )N!distributed.comm.timeouts.connectsecondsdefaulttimeoutzComm z closed.address)r5   daskconfiggetr   asynciowait_forr$   r"   	Exceptionr   r&   r   r   r,   r   r+   r=   r   )r    rZ   r[   r   ra   	handshaker<   r   r   r   on_connection   s$    
$ zListener.on_connection)N)r   r   r   r   rI   rJ   rF   rK   rL   rO   rV   rX   rj   r   r   r   r   rH      s   

 rH   c                   @  s   e Zd ZedddZdS )	ConnectorTc                   s   dS )a  
        Connect to the given address and return a Comm object.
        This function returns a coroutine. It may raise EnvironmentError
        if the other endpoint is unreachable or unavailable.  It
        may raise ValueError if the address is malformed.
        Nr   )r    rb   r   r   r   r   connect   s    zConnector.connectN)T)r   r   r   r   rl   r   r   r   r   rk      s   rk   Tc                   sD  dkrt jdtddt| \}}t|}| }d}	t   fdd}
d}d}t	
d	| d
 }d}|
 dkrJz8tj|j|fd|i|t||
 dI dH }	W qbW qv tk
r    Y qv tjtfk
rF } zR|}}t|
 |d|  }td|}|d7 }t	
d|| t|I dH  W 5 d}~X Y qvX qvtd|  d d||	 |ppi }z6t|	 |
 I dH }t|	||
 I dH  W n^ tk
r } z>tt |	 I dH  W 5 Q R X td|  d d|W 5 d}~X Y nX ||	_|	j|	jd< ||	_|	j|	jd< |	|	j|	j|	_ |	S )z
    Connect to the given address (a URI such as ``tcp://127.0.0.1:1234``)
    and yield a ``Comm`` object.  If the connection attempt fails, it is
    retried until the *timeout* is expired.
    Nr\   r]   r^   c                    s     } t d| t  S )Nr   )maxr   )deadlinerI   ra   r   r   	time_left  s    zconnect.<locals>.time_leftg{Gz?r   zEstablishing connection to %s   r   r`         z7Could not connect to %s, waiting for %s before retryingzTimed out trying to connect to z after z sz/Timed out during handshake while connecting to rb   )!rc   rd   re   r   r
   r	   get_backendZget_connectorr   loggerdebugrf   rg   rl   r6   r   TimeoutErrorOSErrorrandomuniformsleepr5   r"   r$   rh   r   r&   r   Z
_peer_addrr   Z_local_addrr=   r   )addrra   r   r[   Zconnection_argsschemelocbackendZ	connectorrZ   rp   Zbackoff_baseattemptZintermediate_capZactive_exceptionexcZ	upper_capbackoffr   ri   r   ro   r   rl     s~    


  $
 rl   c                 K  sr   zt | dd\}}W n@ tk
rT   |dr8d|  } nd|  } t | dd\}}Y nX t|}|j|||f|S )aJ  
    Create a listener object with the given parameters.  When its ``start()``
    method is called, the listener will listen on the given address
    (a URI such as ``tcp://0.0.0.0``) and call *handle_comm* with a
    ``Comm`` object for each incoming connection.

    *handle_comm* can be a regular function or a coroutine.
    T)strictssl_contextztls://ztcp://)r
   r8   re   r	   rt   Zget_listener)r|   Zhandle_commr   kwargsr}   r~   r   r   r   r   listen\  s    	


r   )NTN)T)(
__future__r   rf   rP   loggingry   r3   rC   abcr   r   
contextlibr   typingr   r   rc   Z
dask.utilsr   Zdistributed.commr	   Zdistributed.comm.addressingr
   Zdistributed.metricsr   Z distributed.protocol.compressionr   Zdistributed.protocol.pickler   	getLoggerr   ru   IOErrorr   r   r   rH   rk   rl   r   r   r   r   r   <module>   s8   
 L     
Z