U
    /e(                     @  s  d dl mZ d dlZd dlZd dlZd dlZd dlZd dlmZ d dl	m
Z
 d dlmZmZmZmZ d dlmZ d dlmZ d dlmZ d d	lmZmZmZ d d
lmZ d dlmZmZ edZej dkrd dl!m"Z"m#Z# nd dlm"Z"m#Z# ej dkrd dlm$Z% nG dd dZ%G dd deZ&G dd dZ'G dd dej(e"e Z)edddG dd de)e Z*edddG dd dee Z+edddG d d! d!Z,G d"d# d#e)e Z-dS )$    )annotationsN)	dataclass)	timedelta)GenericLiteralNoReturnTypeVar)IOLoop)Futureto_serialize)iscoroutinefunctionsyncthread_state)
WrappedKey)
get_client
get_worker_T)   	   )	Awaitable	Generator)r   
   )Eventc                   @  sD   e Zd ZddddZddddZdddd	Zddd
dZdS )_LateLoopEventNonereturnc                 C  s
   d | _ d S N)_eventself r"   5/tmp/pip-unpacked-wheel-g426oqom/distributed/actor.py__init__$   s    z_LateLoopEvent.__init__c                 C  s"   | j d krt | _ | j   d S r   )r   asyncior   setr    r"   r"   r#   r&   '   s    

z_LateLoopEvent.setboolc                 C  s   | j d k	o| j  S r   r   is_setr    r"   r"   r#   r)   -   s    z_LateLoopEvent.is_setc                   s$   | j d krt | _ | j  I d H S r   )r   r%   r   waitr    r"   r"   r#   r*   0   s    

z_LateLoopEvent.waitN)__name__
__module____qualname__r$   r&   r)   r*   r"   r"   r"   r#   r   #   s   r   c                      s   e Zd ZdZd fdd	Zdd Zdd Zed	d
 Zedd Z	edd Z
edd Zdd Zdd Zdd Zedd Z  ZS )ActoraM  Controls an object on a remote worker

    An actor allows remote control of a stateful object living on a remote
    worker.  Method calls on this object trigger operations on the remote
    object and return BaseActorFutures on which we can block to get results.

    Examples
    --------
    >>> class Counter:
    ...    def __init__(self):
    ...        self.n = 0
    ...    def increment(self):
    ...        self.n += 1
    ...        return self.n

    >>> from dask.distributed import Client
    >>> client = Client()

    You can create an actor by submitting a class with the keyword
    ``actor=True``.

    >>> future = client.submit(Counter, actor=True)
    >>> counter = future.result()
    >>> counter
    <Actor: Counter, key=Counter-1234abcd>

    Calling methods on this object immediately returns deferred ``BaseActorFuture``
    objects.  You can call ``.result()`` on these objects to block and get the
    result of the function call.

    >>> future = counter.increment()
    >>> future.result()
    1
    >>> future = counter.increment()
    >>> future.result()
    2
    Nc                   s   t  | || _|| _d | _|r0|| _d | _ndzt | _W n tk
rV   d | _Y nX z t	 | _t
|| jd kd| _W n tk
r   d | _Y nX d S )N)Zinform)superr$   _cls_address_future_worker_clientr   
ValueErrorr   r
   )r!   clsaddresskeyworker	__class__r"   r#   r$   ^   s     zActor.__init__c                 C  s   d| j j d| j dS )Nz<Actor: z, key=>)r0   r+   r8   r    r"   r"   r#   __repr__t   s    zActor.__repr__c                 C  s   t | j| j| jffS r   )r.   r0   r1   r8   r    r"   r"   r#   
__reduce__w   s    zActor.__reduce__c                 C  s   | j r| j jS | jjS d S r   )r3   loopr4   r    r"   r"   r#   _io_loopz   s    zActor._io_loopc                 C  s   | j r| j jS | jjS d S r   )r3   	schedulerr4   r    r"   r"   r#   _scheduler_rpc   s    zActor._scheduler_rpcc                 C  s>   | j r| j | jS | jjr*| j| jS t| jj| jS d S r   )r3   rpcr1   r4   Zdirect_to_workersProxyRPCrA   r    r"   r"   r#   _worker_rpc   s
    zActor._worker_rpcc                 C  s"   | j r| j jS t | jjkS d S r   )r4   Zasynchronous	threading	get_identr3   Z	thread_idr    r"   r"   r#   _asynchronous   s    zActor._asynchronousc                 O  sD   | j r| j j|f||S | jr*|||S t| jj|f||S d S r   )r4   r   rH   r3   r?   )r!   funcargskwargsr"   r"   r#   _sync   s
    
zActor._syncc                 C  s2   t tt| }|dd t| jD  t|S )Nc                 s  s   | ]}| d s|V  qdS )_N)
startswith).0attrr"   r"   r#   	<genexpr>   s     
 z Actor.__dir__.<locals>.<genexpr>)r&   dirtypeupdater0   sorted)r!   or"   r"   r#   __dir__   s    zActor.__dir__c                   s   j r"j jdkr"tdj j jr~jjjkr~ttddr~jjj	 }t| t
 rf S t rz fddS  S tj t rt fdd}|S fd	d
}|S d S )N)finishedpendingz(Worker holding Actor was lost.  Status: actorFc                    s   t  | |S r   )EagerActorFuture)rJ   rK   )rP   r"   r#   <lambda>       z#Actor.__getattr__.<locals>.<lambda>c                    s>   fddt jd  fdd}j|  S )Nc                    s   z8j jjdd  D dd  D dI d H } W nT tk
r   jrtj stjI d H   I d H  Y S td}t| Y S Y nX | d dkrt| d	 S t| d
 S )Nc                 S  s   g | ]}t |qS r"   r   )rO   argr"   r"   r#   
<listcomp>   s     zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<listcomp>c                 S  s   i | ]\}}|t |qS r"   r   )rO   kvr"   r"   r#   
<dictcomp>   s      zYActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_worker.<locals>.<dictcomp>)functionrZ   rJ   rK   z Unable to contact Actor's workerstatusOKresult	exception)	rE   Zactor_executer8   itemsOSErrorr2   done_Error_OK)rf   exc)rJ   r8   rK   run_actor_function_on_workerr!   r"   r#   rn      s     zEActor.__getattr__.<locals>.func.<locals>.run_actor_function_on_workerio_loopc                     s      I d H  d S r   )_set_resultr"   )actor_futurern   r"   r#   wait_then_set_result   s    z=Actor.__getattr__.<locals>.func.<locals>.wait_then_set_result)ActorFuturer@   Zadd_callback)rJ   rK   rs   r8   r!   )rr   rJ   rK   rn   r#   rI      s
    zActor.__getattr__.<locals>.funcc                    s8   j j jdI d H } | d dkr,| d S | d d S )N)	attributerZ   rd   re   rf   rg   )rE   Zactor_attributer8   )xru   r"   r#   get_actor_attribute_from_worker   s     z:Actor.__getattr__.<locals>.get_actor_attribute_from_worker)r2   rd   r5   r3   r7   r1   getattrr   Zactorsr8   r   callabler0   	functoolswrapsrL   )r!   r8   rZ   rI   rx   r"   )rP   r8   r!   r#   __getattr__   s0    


	zActor.__getattr__c                 C  s   | j jS r   )r2   clientr    r"   r"   r#   r~      s    zActor.client)N)r+   r,   r-   __doc__r$   r=   r>   propertyr@   rB   rE   rH   rL   rW   r}   r~   __classcell__r"   r"   r:   r#   r.   7   s"   &


	
Hr.   c                   @  s    e Zd ZdZdd Zdd ZdS )rD   zQ
    An rpc-like object that uses the scheduler's rpc to connect to a worker
    c                 C  s   || _ || _d S r   )rC   r1   )r!   rC   r7   r"   r"   r#   r$      s    zProxyRPC.__init__c                   s    fdd}|S )Nc                    s$    | d< j jj| dI d H }|S )Nop)r9   msg)rC   proxyr1   )r   rf   ru   r"   r#   rI      s    z"ProxyRPC.__getattr__.<locals>.funcr"   )r!   r8   rI   r"   ru   r#   r}      s    zProxyRPC.__getattr__N)r+   r,   r-   r   r$   r}   r"   r"   r"   r#   rD      s   rD   c                   @  sJ   e Zd ZdZejddddddZejdd	d
dZdd	ddZdS )BaseActorFuturea  Future to an actor's method call

    Whenever you call a method on an Actor you get a BaseActorFuture immediately
    while the computation happens in the background.  You can call ``.result``
    to block and collect the full result

    See Also
    --------
    Actor
    Nstr | timedelta | float | Noner   timeoutr   c                 C  s   d S r   r"   r!   r   r"   r"   r#   rf     s    zBaseActorFuture.resultr'   r   c                 C  s   d S r   r"   r    r"   r"   r#   rj     s    zBaseActorFuture.donezLiteral['<ActorFuture>']c                 C  s   dS )Nz<ActorFuture>r"   r    r"   r"   r#   r=     s    zBaseActorFuture.__repr__)N)	r+   r,   r-   r   abcabstractmethodrf   rj   r=   r"   r"   r"   r#   r     s   r   TF)frozeneqc                   @  sH   e Zd ZU dZded< ddddZdd	dd
ddZddddZdS )r[   zUFuture to an actor's method call when an actor calls another actor on the same workerr   _resultGenerator[object, None, _T]r   c                 c  s   | j S r   r   r    r"   r"   r#   	__await__#  s    zEagerActorFuture.__await__Nobjectr   c                 C  s   | j S r   r   r   r"   r"   r#   rf   '  s    zEagerActorFuture.resultzLiteral[True]c                 C  s   dS )NTr"   r    r"   r"   r#   rj   *  s    zEagerActorFuture.done)N)r+   r,   r-   r   __annotations__r   rf   rj   r"   r"   r"   r#   r[     s
   
r[   c                   @  s$   e Zd ZU ded< ddddZdS )rl   r   _vr   c                 C  s   | j S r   )r   r    r"   r"   r#   unwrap2  s    z
_OK.unwrapNr+   r,   r-   r   r   r"   r"   r"   r#   rl   .  s   
rl   c                   @  s$   e Zd ZU ded< ddddZdS )rk   	Exception_er   r   c                 C  s
   | j d S r   )r   r    r"   r"   r#   r   :  s    z_Error.unwrapNr   r"   r"   r"   r#   rk   6  s   
rk   c                   @  sf   e Zd ZddddZddddZd	dd
dZddddZdddddZddddddZdS )rt   r	   ro   c                 C  s   || _ t | _d | _d S r   )r@   r   r   _out)r!   rp   r"   r"   r#   r$   ?  s    zActorFuture.__init__r   r   c                 C  s   |    S r   )r   r   r    r"   r"   r#   r   D  s    zActorFuture.__await__r'   c                 C  s
   | j  S r   r(   r    r"   r"   r#   rj   G  s    zActorFuture.doner   c                   s*   | j  I d H  | j}|d k	s"t| S r   )r   r*   r   AssertionErrorr   r!   outr"   r"   r#   r   J  s    zActorFuture._resultz_Error | _OK[_T]r   )r   r   c                 C  s   || _ | j  d S r   )r   r   r&   r   r"   r"   r#   rq   P  s    zActorFuture._set_resultNr   r   c                 C  s   t | j| j|dS )N)Zcallback_timeout)r   r@   r   r   r"   r"   r#   rf   T  s    zActorFuture.result)N)	r+   r,   r-   r$   r   rj   r   rq   rf   r"   r"   r"   r#   rt   >  s   rt   ).
__future__r   r   r%   r{   sysrF   Zdataclassesr   datetimer   typingr   r   r   r   Ztornado.ioloopr	   Zdistributed.clientr
   Zdistributed.protocolr   Zdistributed.utilsr   r   r   Zdistributed.utils_commr   Zdistributed.workerr   r   r   version_infocollections.abcr   r   r   r   r.   rD   ABCr   r[   rl   rk   rt   r"   r"   r"   r#   <module>   s@   
 =


