U
    /eN                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZmZ d dlmZ d dlmZ d dlmZmZmZmZmZmZmZ d dlZd dlm Z  d d	l!m"Z" d dl#Z#d d
l$m%Z% d dl&m'Z'm(Z( d dl)m*Z*m+Z+m,Z,m-Z-m.Z.m/Z/m0Z0 d dl1m2Z2 d dl3m4Z4 d dl5m6Z6 d dl7m8Z8m9Z9m:Z:m;Z;m<Z<m=Z= erd dl>m?Z? e?dZ@edZAedZBeeeeBf ZCG dd deZDdd eDD eD_EG dd deFZGeHeIZJdd ZKe%e#jLMdddZNe#jLMd ZOd!d"d#d$d%ZPG d&d' d'ZQG d(d) d)eRZSd*d+d*d,d-d.ZTG d/d0 d0eQZUG d1d2 d2ZVd3d4 ZWd5ddd6d7d"d8d9d:ZXdXd;d<ZYG d=d> d>ZZG d?d@ d@Z[G dAdB dBZ\dCdD Z]dEdFdGdHdIZ^G dJdK dKeZ_dYdEdMdKdNdOdPZ`dZdQdRdSdTdUdVdWZadS )[    )annotationsN)defaultdictdeque)	Container	Coroutine)Enum)partial)TYPE_CHECKINGAnyCallableClassVar	TypedDictTypeVarfinal)merge)IOLoop)parse_timedelta)profileprotocol)CommCommClosedErrorconnectget_address_host_portlistennormalize_addressunparse_host_port)PeriodicCallback)time)SystemMonitor)NoOpAwaitableget_tracebackhas_keywordiscoroutinefunctionrecursive_to_dicttruncate_exception)	ParamSpecPRTc                   @  sD   e Zd ZdZdZdZdZdZdZdZ	dZ
d	Zd
ZdZdZdZdZdS )Statusa
  
    This Enum contains the various states a cluster, worker, scheduler and nanny can be
    in. Some of the status can only be observed in one of cluster, nanny, scheduler or
    worker but we put them in the same Enum as they are compared with each
    other.
    	undefinedcreatedinitstartingrunningpausedstoppingstoppedclosingclosing_gracefullyclosedfailed
dont_replyN)__name__
__module____qualname____doc__r*   r+   r,   r-   r.   r/   r0   r1   r2   r3   r4   r5   r6    r;   r;   4/tmp/pip-unpacked-wheel-g426oqom/distributed/core.pyr)   9   s   r)   c                 C  s   i | ]}|j |qS r;   )name).0sr;   r;   r<   
<dictcomp>P   s      r@   c                   @  s   e Zd ZdS )	RPCClosedNr7   r8   r9   r;   r;   r;   r<   rA   S   s   rA   c                   s    fdd}|S )Nc                    s    d S Nr;   argskwargsexcr;   r<   _raise[   s    zraise_later.<locals>._raiser;   )rH   rI   r;   rG   r<   raise_laterZ   s    rJ   zdistributed.admin.tick.limitmsdefaultzdistributed.admin.pdb-on-errr   bool)funcreturnc                 C  sT   t | }t|j}|r(|d dkr(dS |rP|d dkrPtd|  dt dS dS )Nr   commTstreamzCalling the first argument of a RPC handler `stream` is deprecated. Defining this argument is optional. Either remove the argument or rename it to `comm` in .F)inspect	signaturelist
parameterswarningswarnFutureWarning)rO   sigparamsr;   r;   r<   _expects_commh   s    


r]   c                   @  s$   e Zd ZdZe ZdZdd ZdS )_LoopBoundMixinz@Backport of the private asyncio.mixins._LoopBoundMixin from 3.11Nc              	   C  sP   t  }| jd kr4| j | jd kr*|| _W 5 Q R X || jk	rLt| d|S )N# is bound to a different event loop)asyncioget_running_loop_loop_global_lockRuntimeError)selfloopr;   r;   r<   	_get_loop   s    


z_LoopBoundMixin._get_loop)	r7   r8   r9   r:   	threadingLockrc   rb   rg   r;   r;   r;   r<   r^   x   s   r^   c                   @  s   e Zd ZdS )AsyncTaskGroupClosedErrorNrB   r;   r;   r;   r<   rj      s   rj   zCallable[P, Coro[T]]float)corofuncdelayrP   c                   s   dddd fdd}|S )zXDecorator to delay the evaluation of a coroutine function by the given delay in seconds.P.argsP.kwargsr(   )rE   rF   rP   c                    s    t I d H   | |I d H S rC   )r`   sleeprD   rl   rm   r;   r<   wrapper   s    z_delayed.<locals>.wrapperr;   )rl   rm   rr   r;   rq   r<   _delayed   s    rs   c                   @  sv   e Zd ZU dZded< ddddZdd	d
ddddZddd	d
ddddZddddZddddZ	dd Z
dS )AsyncTaskGroupzKCollection tracking all currently running asynchronous tasks within a grouprN   r4   NonerP   c                 C  s   d| _ t | _d S )NF)r4   set_ongoing_tasksre   r;   r;   r<   __init__   s    zAsyncTaskGroup.__init__zCallable[P, Coro[None]]rn   ro   )afuncrE   rF   rP   c                O  s@   | j rtd|  |||}|| jj | j| dS )aT  Schedule a coroutine function to be executed as an `asyncio.Task`.

        The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments
        as an `asyncio.Task`.

        Parameters
        ----------
        afunc
            Coroutine function to schedule.
        *args
            Arguments to be passed to `afunc`.
        **kwargs
            Keyword arguments to be passed to `afunc`

        Returns
        -------
            None

        Raises
        ------
        AsyncTaskGroupClosedError
            If the task group is closed.
        zHCannot schedule a new coroutine function as the group is already closed.N)r4   rj   rg   create_taskadd_done_callbackrx   removeadd)re   r{   rE   rF   taskr;   r;   r<   	call_soon   s    zAsyncTaskGroup.call_soonrk   )rm   r{   rE   rF   rP   c                O  s   | j t||f|| dS )a  Schedule a coroutine function to be executed after `delay` seconds as an `asyncio.Task`.

        The coroutine function `afunc` is scheduled with `args` arguments and `kwargs` keyword arguments
        as an `asyncio.Task` that is executed after `delay` seconds.

        Parameters
        ----------
        delay
            Delay in seconds.
        afunc
            Coroutine function to schedule.
        *args
            Arguments to be passed to `afunc`.
        **kwargs
            Keyword arguments to be passed to `afunc`

        Returns
        -------
            The None

        Raises
        ------
        AsyncTaskGroupClosedError
            If the task group is closed.
        N)r   rs   )re   rm   r{   rE   rF   r;   r;   r<   
call_later   s    !zAsyncTaskGroup.call_laterc                 C  s
   d| _ dS )znCloses the task group so that no new tasks can be scheduled.

        Existing tasks continue to run.
        TNr4   ry   r;   r;   r<   close   s    zAsyncTaskGroup.closec              
     s   |    t|  }d}| j|h  }r||D ]}|  q.zt|I dH  W q tjk
rx } z|}W 5 d}~X Y qX q|dk	r|dS )zClose the group and stop all currently running tasks.

        Closes the task group and cancels all tasks. All tasks are cancelled
        an additional time for each time this task is cancelled.
        N)r   r`   current_taskrg   rx   cancelwaitCancelledError)re   r   errZtasks_to_stopr   er;   r;   r<   stop   s    
zAsyncTaskGroup.stopc                 C  s
   t | jS rC   )lenrx   ry   r;   r;   r<   __len__  s    zAsyncTaskGroup.__len__N)r7   r8   r9   r:   __annotations__rz   r   r   r   r   r   r;   r;   r;   r<   rt      s   
##rt   c                	   @  s  e Zd ZdZdZdZdWddZed	d
ddZej	d	ddddZedd
ddZ
edd
ddZedd
ddZedd
ddZdd
ddZdd Zdd Zd d! Zed"d# Zd$d% Zd&d' Zd(d) Zd*d+ Zed,d- Zd.d/ Zd0d1 Zed2d
d3d4Zed2d
d5d6Zed7d8 Zed9d: Zed;d< Zd=d
d>d?Z d@dAdBdCdDdEdFZ!dXdGdHZ"dYdIdJZ#dKdL Z$dMdN Z%dZdOdPZ&d[dQdRZ'd2dSddTdUdVZ(dS )\ServeraO  Dask Distributed Server

    Superclass for endpoints in a distributed cluster, such as Worker
    and Scheduler objects.

    **Handlers**

    Servers define operations with a ``handlers`` dict mapping operation names
    to functions.  The first argument of a handler function will be a ``Comm``
    for the communication established with the client.  Other arguments
    will receive inputs from the keys of the incoming message which will
    always be a dictionary.

    >>> def pingpong(comm):
    ...     return b'pong'

    >>> def add(comm, x, y):
    ...     return x + y

    >>> handlers = {'ping': pingpong, 'add': add}
    >>> server = Server(handlers)  # doctest: +SKIP
    >>> server.listen('tcp://0.0.0.0:8000')  # doctest: +SKIP

    **Message Format**

    The server expects messages to be dictionaries with a special key, `'op'`
    that corresponds to the name of the operation, and other key-value pairs as
    required by the function.

    So in the example above the following would be good messages.

    *  ``{'op': 'ping'}``
    *  ``{'op': 'add', 'x': 10, 'y': 20}``

     r   N   Tc              	     s  |
d k	rt jdtdd tj_jjjj	d_
j
| |d krhtjdtj  g }|_i _j|pi  tjd tt  _d _d _d _d _i _|_t _d _ t! _"t#$ _%g _&t'(  _)_*t+j)dsftjdr\t,-j) d	d
 fdd}t.j/dtjdtjd|dj)_.n
t0 j)_.z$ddl1m2} t3t4|j)d_5W n t6k
r   d _5Y nX t3t7_8t3t7_9ddl1m:} t3t4|j)d_ i _;t<jjt=tjdd }|j;d< t> _?d_@d_At> _Bt=tjddd_CjC_Dt<jEjCd j;d< t<jFt=tjdd j;d< d_Gfdd}j)H| t#I _Jd _KtL||||||	d _Md!_Nd S )"Nz=The io_loop kwarg to Server is ignored and will be deprecated   
stacklevel)identityechoZconnection_streamZ
dump_statezdistributed.%s.blocked-handlers-r   z"distributed.worker.profile.enabledrN   rv   c                    s     } | d kp| j  S rC   )asyncio_loopZ	is_closedrf   )refr;   r<   r   h  s    zServer.__init__.<locals>.stop)z
profile.pyzselectors.pyz#distributed.worker.profile.intervalz distributed.worker.profile.cycle)Zomitintervalcycler   r   )Digestr   )Counterz)distributed.admin.system-monitor.intervali  monitorzdistributed.admin.tick.intervalrK   rL   Ztickzdistributed.admin.tick.cycleZticksc                     s   t   _d S rC   )rh   	get_ident	thread_idr;   ry   r;   r<   set_thread_ident  s    z)Server.__init__.<locals>.set_thread_ident)limitdeserializeserializersdeserializersconnection_argstimeoutserverF)OrX   rY   DeprecationWarningr)   r,   _statusr   r   handle_stream_to_dicthandlersupdatedaskconfiggettyper7   lowerblocked_handlersstream_handlersstruuidZuuid4id_address_listen_address_port_host_commsr   r   r   countersrt   _ongoing_background_tasksr`   Event_event_finished	listenersr   currentio_looprf   hasattrweakrefr   r   Zwatchr   Zdistributed.counterr   r   r   digestsImportErrorrk   digests_totaldigests_maxr   periodic_callbacksr   r   r   
_last_tick_tick_counter_last_tick_counter_last_tick_cycleZ_tick_interval_tick_interval_observed_measure_tick_cycle_ticksr   add_callbackri   _startup_lock_Server__startup_excConnectionPoolrpc_Server__stopped)re   r   r   r   Zconnection_limitr   r   r   r   r   r   r   r   r   pcr   r;   )r   re   r<   rz   1  s     








  



zServer.__init__r)   rv   c                 C  s(   z| j W S  tk
r"   tj Y S X d S rC   )r   AttributeErrorr)   r*   ry   r;   r;   r<   status  s    zServer.statusru   )valuerP   c                 C  s"   t |tstd||| _d S )NzExpected Status; got )
isinstancer)   	TypeErrorr   )re   r   r;   r;   r<   r     s    
intc                 C  s
   t | jS )zAThe number of total incoming connections listening to remote RPCs)r   r   ry   r;   r;   r<   incoming_comms_open  s    zServer.incoming_comms_openc                 C  s   t dd | j D S )z9The number of connections currently handling a remote RPCc                 S  s   g | ]\}}|d k	r|qS rC   r;   )r>   copr;   r;   r<   
<listcomp>  s      z0Server.incoming_comms_active.<locals>.<listcomp>)r   r   itemsry   r;   r;   r<   incoming_comms_active  s    zServer.incoming_comms_activec                 C  s   | j jS )zEThe number of connections currently open and waiting for a remote RPC)r   openry   r;   r;   r<   outgoing_comms_open  s    zServer.outgoing_comms_openc                 C  s   | j jS )zSThe number of outgoing connections that are currently used to
        execute a RPC)r   activery   r;   r;   r<   outgoing_comms_active  s    zServer.outgoing_comms_activezdict[str, int]c                   s    fdddD S )zA dict with various connection counters

        See also
        --------
        Server.incoming_comms_open
        Server.incoming_comms_active
        Server.outgoing_comms_open
        Server.outgoing_comms_active
        c                   s   i | ]}|t  |qS r;   )getattr)r>   attrry   r;   r<   r@     s    z2Server.get_connection_counters.<locals>.<dictcomp>)r   r   r   r   r;   ry   r;   ry   r<   get_connection_counters  s    

zServer.get_connection_countersc                   s   | j  I dH  dS )z"Wait until the server has finishedN)r   r   ry   r;   r;   r<   finished  s    zServer.finishedc                 C  s   |    S rC   )start	__await__ry   r;   r;   r<   r     s    zServer.__await__c                   s   | j  I dH  | S )a  Attempt to start the server. This is not idempotent and not protected against concurrent startup attempts.

        This is intended to be overwritten or called by subclasses. For a safe
        startup, please use ``Server.start`` instead.

        If ``death_timeout`` is configured, we will require this coroutine to
        finish before this timeout is reached. If the timeout is reached we will
        close the instance and raise an ``asyncio.TimeoutError``
        N)r   r   ry   r;   r;   r<   start_unsafe  s    
zServer.start_unsafec                   sF   j 4 I d H &  jtjkr4 jd k	s,t jn" jtjkrV W  5 Q I d H R  S t dd }ddd fdd}ztj	 
 |dI d H  W n tjk
r } z0||I d H  tt j d| d	|W 5 d }~X Y nH tk
r( } z(||I d H  tt j d
|W 5 d }~X Y nX tj _W 5 Q I d H R X  S )NZdeath_timeout	Exceptionru   )rH   rP   c                   s       I d H  tj _|  _d S rC   )r   r)   r5   r   r   rG   ry   r;   r<   _close_on_failure
  s    z'Server.start.<locals>._close_on_failure)r   z start timed out after zs.z failed to start.)r   r   r)   r5   r   AssertionErrorr,   r   r`   wait_forr   TimeoutErrorr   r7   r   rd   r.   )re   r   r   rH   r;   ry   r<   r      s,    (zServer.startc                   s   | I d H  | S rC   r;   ry   r;   r;   r<   
__aenter__  s    
zServer.__aenter__c                   s   |   I d H  d S rC   r   re   exc_type	exc_value	tracebackr;   r;   r<   	__aexit__   s    zServer.__aexit__c                 C  sJ   | j jt k	rt| dt | _| j D ]}|	 s0|
  q0dS )zStart Periodic Callbacks consistently

        This starts all PeriodicCallbacks stored in self.periodic_callbacks if
        they are not yet running. It does this safely by checking that it is using the
        correct event loop.
        r_   N)r   r   r`   ra   rd   r   r   r   valuesZ
is_runningr   )re   r   r;   r;   r<   start_periodic_callbacks#  s    zServer.start_periodic_callbacksc                   s^   | j r
d S d| _ t  | jD ] }| }t|r | q rZ fdd}| j| d S )NTc                     s   t j  I d H  d S rC   )r`   gatherr;   _stopsr;   r<   background_stops?  s    z%Server.stop.<locals>.background_stops)	r   rw   r   r   rT   isawaitabler   r   r   )re   listenerfuturer  r;   r  r<   r   2  s    

zServer.stopc                 C  s   | j r| j d S d S d S Nr   )r   ry   r;   r;   r<   r  D  s    
zServer.listenerc                 C  sP   t  }|| j }|| _|  jd7  _|tkr@tdt| j| | d| d S )N   zEvent loop was unresponsive in %s for %.2fs.  This is often caused by long-running GIL-holding functions or moving large chunks of data. This can cause timeouts and instability.ztick-duration)	r   r   r   tick_maximum_delayloggerinfor   r7   digest_metric)re   nowZtick_durationr;   r;   r<   r   K  s    
zServer._measure_tickc                 C  sH   | j s
d S t }| j| }| _| j | j }| j | _|| |p>d | _d S )Nr  )r   r   r   r   r   )re   r  Zlast_tick_cyclecountr;   r;   r<   r   ]  s    zServer._cycle_ticksr   c                 C  s(   | j s"| jdkrtd| jj| _ | j S )z~
        The address this Server can be contacted on.
        If the server is not up, yet, this raises a ValueError.
        Nz(cannot get address of non-running Server)r   r  
ValueErrorZcontact_addressry   r;   r;   r<   addressf  s
    

zServer.addressc                 C  s$   z| j W S  tk
r   Y dS X dS )z
        The address this Server can be contacted on.
        If the server is not up, yet, this returns a ``"not-running"``.
        znot-runningN)r  r  ry   r;   r;   r<   address_safer  s    zServer.address_safec                 C  s(   | j s"| jdkrtd| jj| _ | j S )z
        The address this Server is listening on.  This may be a wildcard
        address such as `tcp://0.0.0.0:1234`.
        Nz/cannot get listen address of non-running Server)r   r  r  listen_addressry   r;   r;   r<   r  }  s
    

zServer.listen_addressc                 C  s   | j st| j\| _ | _| j S )z
        The host this Server is running on.

        This will raise ValueError if the Server is listening on a
        non-IP based protocol.
        )r   r   r  r   ry   r;   r;   r<   host  s    zServer.hostc                 C  s   | j st| j\| _| _ | j S )z
        The port number this Server is listening on.

        This will raise ValueError if the Server is listening on a
        non-IP based protocol.
        )r   r   r  r   ry   r;   r;   r<   port  s    zServer.portzdict[str, str]c                 C  s   t | j| jdS )N)r   r   )r   r7   r   ry   r;   r;   r<   r     s    zServer.identityr;   excludezContainer[str]dict)r  rP   c                  sH   |   }| j| jj| jd}||  fdd| D }t| dS )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Server.identity
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        )r  r   r   c                   s   i | ]\}}| kr||qS r;   r;   )r>   kvr  r;   r<   r@     s       z#Server._to_dict.<locals>.<dictcomp>r  )r   r  r   r=   r   r   r   r#   )re   r  r  extrar;   r  r<   r     s    

zServer._to_dictc                 C  s   |S rC   r;   )re   datar;   r;   r<   r     s    zServer.echoc                   s~   |d kr| j }t|tr&t| j|}n&t|tr:t| }n|}t|tsLtt|| j	f| j
|d|I d H }| j| d S )N)r   allow_offload)default_portr   r   r   
default_iptupler   r   r   handle_commr   r   append)re   Zport_or_addrr  rF   addrr  r;   r;   r<   r     s$    


zServer.listenc                 C  s8   z| j | j| W n tk
r0   |  Y nX t S )zPStart a background task that dispatches new communications to coroutine-handlers)r   r   _handle_commrj   abortr   )re   rQ   r;   r;   r<   r     s
    zServer.handle_commc                   sj  | j r|  dS |j}d}td|t| j || j|< | I dH  z| j sz | I dH }td|| W n tk
r } z&t	 std||| W Y qW 5 d}~X Y n\ tk
r } z<td| |
 rނ n |t|ddI dH  W Y qHW 5 d}~X Y nX t|ts,td	t| z|d
}W n6 tk
rp } ztdt| |W 5 d}~X Y nX | jdk	r| jd
 | || j|< |dd}|dd}|dd}|dkr|r|dI dH  qd}	z@|| jkrd}
t|
j|t| jd}t|}n
| j| }W n0 tk
rT   tjd|t| jdd Y nX |dk	rtt |drt||d< td|j z\t!|r||f|}	n
|f |}	t"#|	r|	I dH }	nt"$|	rt%dt|	 W n t&k
r   | j't(j)krtj*d|dd Y qY nJ tk
r` } z*td| |
 rD nt|dd}	W 5 d}~X Y nX |r|	t(j+krz|j|	|dI dH  W nB ttfk
r } ztd||| W Y qW 5 d}~X Y nX d| j|< d }}	|r|, I dH  |
 rHqqHW 5 | j|= t	 sd|
 sdz|  W n2 tk
rb } ztd|| W 5 d}~X Y nX X dS )a  Dispatch new communications to coroutine-handlers

        Handlers is a dictionary mapping operation names to functions or
        coroutines.

            {'get_data': get_data,
             'ping': pingpong}

        Coroutines should expect a single Comm object.
        NzConnection from %r to %sz)Failed while closing connection to %r: %szMessage from %r: %szCLost connection to %r while reading message: %s. Last operation: %szException while reading from %suncaught-error)r   z(Bad message type.  Expected dict, got
  r   z.Received unexpected message without 'op' key: r   r   FreplyTOKz^The '{op}' handler has been explicitly disallowed in {obj}, possibly due to security concerns.)r   objzNo handler %s found in %s)exc_infozCalling into handler %szIComm handler returned unknown awaitable. Expected coroutine, instead got zLost connection to %rzException while handling op %s)r   z8Lost connection to %r while sending result for op %r: %s)-r   r$  peer_addressr
  debugr   r7   r   sysis_finalizingr4   r   errorreadOSError	exceptionwriteerror_messager   r  r   r   popKeyErrorr  r   r   r   formatrJ   r   warningr!   r]   rT   iscoroutiner  rd   r   r   r)   r.   r  r6   r   )re   rQ   r  r   r   msgr   Zclose_desiredr&  result_msgrH   handlerr;   r;   r<   r#    s    












  zServer._handle_commc           	        s  |pi }t d|j d}zFz|s.z| I d H }W n, tk
rf   d}t d|j Y q.Y nX t|t	t
fs||f}|D ]}|dkr q|d}|r|dkrd}t d|j  q| j| }t|r| jj|ft|| td	I d H  n|f t|| qt d
| qtd	I d H  q"W n. tk
r^   trXd	d l}|   Y nX W 5 | I d H  | stX d S )Nz%Starting established connection to %sFTz!Connection to %s has been closed.r'  r   zclose-streamz)Received 'close-stream' from %s; closing.r   zodd message %s)r
  r  r*  r   r4   r   r/  r   r   r  rV   r4  r   r"   r   r   r   r`   rp   r.  r   LOG_PDBpdb	set_trace)	re   rQ   r  r4   Zmsgsr9  r   r<  r>  r;   r;   r<   r   a  s\    

zServer.handle_streamc                   s   z| j D ]}|  q| js~d| _t }| jD ]6}| }t|r2t	
t| dt || q2|r~tj| I d H  | j I d H  | j I d H  tjdd t| jD  I d H  W 5 | j   X d S )NTzu is using an asynchronous `stop` method. Support for asynchronous `Listener.stop` will be removed in a future versionc                 S  s   g | ]}|  qS r;   r   r>   rQ   r;   r;   r<   r     s     z Server.close.<locals>.<listcomp>)r   rw   r   r   r   r   r   rT   r  rX   rY   r   PendingDeprecationWarningr   r`   r   r   r   r   rV   r   )re   r   r   r  r  r  r;   r;   r<   r     s(    


$zServer.closerk   )r=   r   rP   c                 C  sF   | j d k	r| j | | | j|  |7  < t| j| || j|< d S rC   )r   r   r   maxr   )re   r=   r   r;   r;   r<   r    s    
zServer.digest_metric)	NNr   TNNNNN)N)NT)N)N))r7   r8   r9   r:   r  r  rz   propertyr   setterr   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   r   r  r  r  r  r  r   r   r   r   r   r#  r   r   r  r;   r;   r;   r<   r   	  sx   $         
 

	





 
5
r   c                 C  s   dS )Ns   pongr;   rQ   r;   r;   r<   pingpong  s    rF  T)r&  r   r   r   )rQ   r&  c             	     s,  |}||d< | dd}d}|dkr(|}|dk	r8||d< zvz6| j||ddI dH  |rj| j|dI dH }nd}W n: tjtfk
r   d	} Y n tjk
r   d	} Y nX W 5 |r|   n|r|  I dH  X t	|t
r(| d
dkr(| jrtf |\}	}
}|
st|
|nt|d |S )zSend and recv with a Comm.

    Keyword arguments turn into the message

    response = await send_recv(comm, op='ping', reply=True)
    r&  r   FNr   raise)r   Zon_error)r   Tr   r%  exception_text)r   r$  r   r2  r/  r`   r   r0  r   r   r  r   clean_exceptionr   with_tracebackr   )rQ   r&  r   r   rF   r9  Zplease_closeZforce_closeresponse_rH   tbr;   r;   r<   	send_recv  s<    

rN  c                 C  s@   | d kr||f} n|d kr"|d ks&t t| tr8t|  } t| S rC   )r   r   r  r   r   r"  ipr  r;   r;   r<   addr_from_args  s    

rQ  c                   @  s   e Zd ZU dZe Zded< dZdZ	dddZ
d	d
 Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd Zdd ZdS )r   a  Conveniently interact with a remote server

    >>> remote = rpc(address)  # doctest: +SKIP
    >>> response = await remote.add(x=10, y=20)  # doctest: +SKIP

    One rpc object can be reused for several interactions.
    Additionally, this object creates and destroys many comms as necessary
    and so is safe to use in multiple overlapping communications.

    When done, close comms explicitly.

    >>> remote.close_comms()  # doctest: +SKIP
    zClassVar[weakref.WeakSet[rpc]]r   r;   NTc                 C  s`   i | _ t|| _|| _tj| _|| _|| _|d k	r6|n|| _	|pBi | _
t | _tj|  d S rC   )commscoerce_to_addressr  r   r)   r.   r   r   r   r   r   r   WeakSet_createdr   r   r   )re   argrQ   r   r   r   r   r   r;   r;   r<   rz     s    



zrpc.__init__c                   s   | j tjkrtdt }d}| j D ]"\}}| rB|| |r( qLq(|D ]}| j|= qP|rj| rt| j	| j
fd| ji| jI dH }d|_d| j|< |S )a?  Get an open communication

        Some comms to the ip/port target may be in current use by other
        coroutines.  We track this with the `comms` dict

            :: {comm: True/False if open and ready for use}

        This function produces an open communication, either by taking one
        that we've already made or making a new one if they are all taken.
        This also removes comms that have been closed.

        When the caller is done with the stream they should set

            self.comms[comm] = True

        As is done in __getattr__ below.
        z
RPC ClosedFr   Nr   )r   r)   r4   rA   rw   rR  r   r   r   r  r   r   r   r=   )re   Zto_clearr   rQ   r?   r;   r;   r<   	live_comm+  s.    


zrpc.live_commc                 C  s   dd }g }t | jD ](}|r| st||}|| qt | jD ](}|rJ| sJt||}|| qJ| j  |S )Nc                   sR   z0|   s.| dddI d H  |  I d H  W n tk
rL   |   Y nX d S )Nr   F)r   r&  )r4   r2  r   r0  r$  rE  r;   r;   r<   _close_commT  s    z$rpc.close_comms.<locals>._close_comm)rV   rR  r4   r`   ensure_futurer!  rU  clear)re   rX  ZtasksrQ   r   r;   r;   r<   close_commsS  s    	
zrpc.close_commsc                   s    fdd}|S )Nc               
     s   j d k	r"| dd kr"j | d< jd k	rD| dd krDj| d< d }z6 I d H }d  |_tf | d| I d H }W n^ ttfk
r } z<|rt|d d|nt|d d|d|W 5 d }~X Y nX d	j	|< |S )
Nr   r   zrpc.rQ   r   z-Exception while trying to call remote method z before comm was established.z using comm rS   T)
r   r   r   rW  r=   rN  rA   r   r   rR  )rF   rQ   r:  r   keyre   r;   r<   send_recv_from_rpck  s.    




z+rpc.__getattr__.<locals>.send_recv_from_rpcr;   re   r^  r_  r;   r]  r<   __getattr__j  s    zrpc.__getattr__c                   s4   | j tjkrtj|  tj| _ tj|   I d H S rC   )	r   r)   r4   r   r   discardr`   r   r[  ry   r;   r;   r<   	close_rpc  s    zrpc.close_rpcc                 C  s   t jdtdd | S Nz1the rpc synchronous context manager is deprecatedr   r   rX   rY   r   ry   r;   r;   r<   	__enter__  s    zrpc.__enter__c                 C  s   t |   d S rC   )r`   rY  rc  r   r;   r;   r<   __exit__  s    zrpc.__exit__c                   s   | S rC   r;   ry   r;   r;   r<   r     s    zrpc.__aenter__c                   s   |   I d H  d S rC   )rc  r   r;   r;   r<   r     s    zrpc.__aexit__c                 C  s\   | j tjkrXtj|  tj| _ dd | jD }|rXtd| t	| |D ]}|
  qJd S )Nc                 S  s   g | ]}|  s|qS r;   r   r@  r;   r;   r<   r     s      zrpc.__del__.<locals>.<listcomp>z(rpc object %s deleted with %d open comms)r   r)   r4   r   r   rb  rR  r
  r7  r   r$  )re   Z
still_openrQ   r;   r;   r<   __del__  s      zrpc.__del__c                 C  s   d| j t| jf S )Nz<rpc to %r, %d comms>)r  r   rR  ry   r;   r;   r<   __repr__  s    zrpc.__repr__)NNTNNNN)r7   r8   r9   r:   r   rT  r   r   rR  r  rz   rW  r[  ra  rc  rf  rg  r   r   rh  ri  r;   r;   r;   r<   r     s,   
       
(r   c                   @  s^   e Zd ZdZdddZedd Zdd Zd	d
 Zdd Z	dd Z
dd Zdd Zdd ZdS )PooledRPCCallzVThe result of ConnectionPool()('host:port')

    See Also:
        ConnectionPool
    Nc                 C  s(   || _ || _|| _|d k	r|n|| _d S rC   )r"  poolr   r   )re   r"  rk  r   r   r;   r;   r<   rz     s    zPooledRPCCall.__init__c                 C  s   | j S rC   r"  ry   r;   r;   r<   r    s    zPooledRPCCall.addressc                   s    fdd}|S )Nc               
     s   j d k	r"| dd kr"j | d< jd k	rD| dd krDj| d< jjI d H }|jd   }|_ztf | d| I d H W S jj| ||_X d S )Nr   r   zConnectionPool.r\  )	r   r   r   rk  r   r"  r=   reuserN  )rF   rQ   Z	prev_namer]  r;   r<   r_    s    

z5PooledRPCCall.__getattr__.<locals>.send_recv_from_rpcr;   r`  r;   r]  r<   ra    s    zPooledRPCCall.__getattr__c                   s   d S rC   r;   ry   r;   r;   r<   rc    s    zPooledRPCCall.close_rpcc                 C  s   t jdtdd | S rd  re  ry   r;   r;   r<   rf    s    zPooledRPCCall.__enter__c                 C  s   d S rC   r;   r   r;   r;   r<   rg    s    zPooledRPCCall.__exit__c                   s   | S rC   r;   ry   r;   r;   r<   r     s    zPooledRPCCall.__aenter__c                   s   d S rC   r;   )re   rE   r;   r;   r<   r     s    zPooledRPCCall.__aexit__c                 C  s   d| j dS )Nz<pooled rpc to >rl  ry   r;   r;   r<   ri    s    zPooledRPCCall.__repr__)NN)r7   r8   r9   r:   rz   rC  r  ra  rc  rf  rg  r   r   ri  r;   r;   r;   r<   rj    s   

rj  c                   @  s   e Zd ZU dZe Zded< d'ddZd	d
 Z	e
dd Ze
dd Zdd Zd(ddZdd Zdd Ze
ddddZd)ddZd*ddZdd  Zd!d" Zd#d$ Zd%d& ZdS )+r   ap  A maximum sized pool of Comm objects.

    This provides a connect method that mirrors the normal distributed.connect
    method, but provides connection sharing and tracks connection limits.

    This object provides an ``rpc`` like interface::

        >>> rpc = ConnectionPool(limit=512)
        >>> scheduler = rpc('127.0.0.1:8786')
        >>> workers = [rpc(address) for address in ...]

        >>> info = await scheduler.identity()

    It creates enough comms to satisfy concurrent connections to any
    particular address::

        >>> a, b = await asyncio.gather(scheduler.who_has(), scheduler.has_what())

    It reuses existing comms so that we don't have to continuously reconnect.

    It also maintains a comm limit to avoid "too many open file handle"
    issues.  Whenever this maximum is reached we clear out all idling comms.
    If that doesn't do the trick then we wait until one of the occupied comms
    closes.

    Parameters
    ----------
    limit: int
        The number of open comms to maintain at once
    deserialize: bool
        Whether or not to deserialize data by default or pass it through
    z)ClassVar[weakref.WeakSet[ConnectionPool]]
_instancesr   TNc	           	      C  s   || _ tt| _tt| _|| _|| _|| _|d k	r8|n|| _|pDi | _	|| _
|r\t|nd | _t | _| j|  t | _d| _d| _tj| _d S r  )r   r   rw   	availableoccupiedr  r   r   r   r   r   r   r   r   rT  rU  ro  r   _connecting_pending_count_connecting_countr)   r,   r   )	re   r   r   r   r  r   r   r   r   r;   r;   r<   rz   	  s     



zConnectionPool.__init__c                 C  s"   | j j| j| j | j kstdS )zh
        Validate important invariants of this class

        Used only for testing / debugging
        N)	semaphore_valuer   r   _n_connectingr   ry   r;   r;   r<   	_validate+  s    zConnectionPool._validatec                 C  s   t tt| j S rC   )summapr   rq  r   ry   r;   r;   r<   r   3  s    zConnectionPool.activec                 C  s   | j ttt| j  S rC   )r   ry  rz  r   rp  r   ry   r;   r;   r<   r   7  s    zConnectionPool.openc                 C  s   d| j | jt| jf S )Nz3<ConnectionPool: open=%d, active=%d, connecting=%d>)r   r   r   rr  ry   r;   r;   r<   ri  ;  s
    zConnectionPool.__repr__c                 C  s"   t |||d}t|| | j| jdS )zCached rpc objectsrO  )r   r   )rQ  rj  r   r   )re   r"  rP  r  r;   r;   r<   __call__B  s       zConnectionPool.__call__c                   s    fdd}|   S )Nc                     s      I d H   S rC   )r   r;   ry   r;   r<   rL  J  s    z#ConnectionPool.__await__.<locals>._)r   )re   rL  r;   ry   r<   r   I  s    zConnectionPool.__await__c                   s   t | j| _tj| _d S rC   )r`   	Semaphorer   ru  r)   r.   r   ry   r;   r;   r<   r   P  s    zConnectionPool.startr   rv   c                 C  s   | j S rC   )rt  ry   r;   r;   r<   rw  U  s    zConnectionPool._n_connectingc              	     s  |  j d7  _ zz| j I d H  zzz|  jd7  _t|f|pB| j| jd| jI d H }d|_t	
| |_| j|_| j| | j| | |W W ,W W \S  tk
r   | j   Y nX W 5 |  jd8  _X W n tjk
r   tdY nX W 5 |  j d8  _ X d S )Nr  )r   r   r   zConnectionPool closing.)rs  ru  acquirert  r   r   r   r   r=   r   r   _poolr  rU  r   rq  BaseExceptionreleaser`   r   r   )re   r"  r   rQ   r;   r;   r<   _connectY  s4    
zConnectionPool._connectc                   s   | j | }| j| }|rD| }| r4| j  q|| |S q| j rV|   t	
| ||}t	  | j| | fdd || jj z  I dH  W nD t	jk
r   |  z|I dH  W n tk
r   Y nX  Y nX |I dH S )zE
        Get a Comm to the given address.  For internal use.
        c                   s      S rC   )rw   )rL  doner;   r<   <lambda>      z(ConnectionPool.connect.<locals>.<lambda>N)rp  rq  r4  r4   ru  r  r   lockedcollectr`   r|   r  r   rr  r}   rb  r   r   r   r   )re   r"  r   rp  rq  rQ   Zconnect_attemptr;   r  r<   r   w  s2    



	zConnectionPool.connectc                 C  sp   || j | kr t |j nL| j | | | rD| j  n(| j	| 
| | j rl| jrl|   dS )zV
        Reuse an open communication to the given address.  For internal use.
        N)rq  r   r   r   r   r~   r4   ru  r  rp  r   r  rs  r  )re   r"  rQ   r;   r;   r<   rm    s    zConnectionPool.reusec                 C  sZ   t d| j| jt| j | j D ]0}|D ]}t	 
|j | j  q,|  q$dS )zV
        Collect open but unused communications, to allow opening other ones.
        z>Collecting unused comms.  open: %d, active: %d, connecting: %dN)r
  r  r   r   r   rr  rp  r   r   r   r   r   ru  r  rZ  )re   rR  rQ   r;   r;   r<   r    s    zConnectionPool.collectc                 C  s   t d| || jkrF| j|}|D ]}t |j | j	  q&|| j
kr| j
|}|D ]}t |j | j	  q`dS )z6
        Remove all Comms to a given address.
        zRemoving comms to %sN)r
  r  rp  r4  r   r   r   r   ru  r  rq  )re   r"  rR  rQ   r;   r;   r<   r~     s    

zConnectionPool.removec                   s   t j| _| jD ]}|  q| j| jfD ]V}t }|rJ||	 d  q2t
jdd |D ddiI dH  |D ]}| j  qnq(| jrt
dI dH  qdS )z*
        Close all communications
        r  c                 s  s   | ]}|  V  qd S rC   r   r@  r;   r;   r<   	<genexpr>  s     z'ConnectionPool.close.<locals>.<genexpr>Zreturn_exceptionsTNg{Gzt?)r)   r4   r   rr  r   rp  rq  rw   r   popitemr`   r   ru  r  rp   )re   Zconn_futdrR  rL  r;   r;   r<   r     s     

zConnectionPool.close)r   TNTNNNN)NNN)N)N)r7   r8   r9   r:   r   rT  ro  r   rz   rx  rC  r   r   ri  r{  r   r   rw  r  r   rm  r  r~   r   r;   r;   r;   r<   r     s8   
!        
"




*r   c                 C  s   t | ttfrt|  } t| S rC   )r   rV   r  r   r   )or;   r;   r<   rS    s    rS  r  zlist[BaseException])r   rP   c                 C  s&   g }| j d k	r"|| j  | j } q|S rC   )	__cause__r!  )r   Zcausesr;   r;   r<   collect_causes  s
    
r  c                   @  s6   e Zd ZU ded< ded< ded< ded< ded< d	S )
ErrorMessager   r   zprotocol.Serializer1  zprotocol.Serialize | Noner   rH  traceback_textN)r7   r8   r9   r   r;   r;   r;   r<   r    s
   
r  r.  r   )r   r   rP   c           	      C  s   t jd}tjj| ft|   t }dt	
|}t| |} ztj| }tj| W n( tk
r   tjtt| }Y nX t|}ztj|}tj| W n  tk
r   tj|}Y nX t||krd}n
t|}|||t| |dS )a  Produce message to send back given an exception has occurred

    This does the following:

    1.  Gets the traceback
    2.  Truncates the exception and the traceback
    3.  Serializes the exception and traceback or
    4.  If they can't be serialized send string versions
    5.  Format a message and return

    See Also
    --------
    clean_exception : deserialize and unpack message into exception/traceback
    z"distributed.admin.max-error-lengthr   N)r   r1  r   rH  r  )r   r   r   tblibZpickling_supportinstallr  r    joinr   	format_tbr$   r   pickledumpsloadsr   reprZto_serializer   )	r   r   ZMAX_ERROR_LENrM  Ztb_textZe_bytesZe_serializedZtb_bytesZtb_serializedr;   r;   r<   r3    s2    


r3  z.BaseException | bytes | bytearray | str | Nonez(types.TracebackType | bytes | str | Noner
   zStuple[type[BaseException | None], BaseException | None, types.TracebackType | None])r1  r   rF   rP   c              	   K  s   t | ttfr>ztj| } W qP tk
r:   t| } Y qPX nt | trPt| } t |trztj|}W q tt	fk
r   d}Y qX nt |trd}t | t
s| dkstt |tjs|dkstt| | |fS )zReraise exception and traceback. Deserialize if necessary

    See Also
    --------
    error_message : create and serialize errors into message
    N)r   bytes	bytearrayr   r  r  r   r   r   r   r  r   typesTracebackTyper   )r1  r   rF   r;   r;   r<   rI  1  s"    


rI  )NNN)r.  )N)b
__future__r   r`   rT   loggingr,  rh   r   r  r   rX   r   collectionsr   r   collections.abcr   r   enumr   	functoolsr   typingr	   r
   r   r   r   r   r   r  Ztlzr   Ztornado.ioloopr   r   Z
dask.utilsr   Zdistributedr   r   Zdistributed.commr   r   r   r   r   r   r   Zdistributed.compatibilityr   Zdistributed.metricsr   Zdistributed.system_monitorr   Zdistributed.utilsr   r    r!   r"   r#   r$   Ztyping_extensionsr%   r&   r'   r(   ZCoror)   lookupIOErrorrA   	getLoggerr7   r
  rJ   r   r   r	  r=  r]   r^   rd   rj   rs   rt   r   rF  rN  rQ  r   rj  r   rS  r  r  r3  rI  r;   r;   r;   r<   <module>   s   $$	 	

 
p     89

 ):  1 