U
    /e!                     @  s   d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
 d dlmZmZ d dlmZmZ d dlmZ d d	lmZmZ d d
lmZmZ eeZG dd dZG dd dZdS )    )annotationsN)defaultdict)suppress)merge)parse_timedelta	stringify)ClientFuture)time)TimeoutError
log_errors)
get_client
get_workerc                   @  sL   e Zd ZdZdd ZdddZdd Zdd	d
ZdddZe	dddZ
dS )VariableExtensionzAn extension for the scheduler to manage Variables

    This adds the following routes to the scheduler

    *  variable-set
    *  variable-get
    *  variable-delete
    c                 C  sf   || _ t | _tt| _ttj| _t | _	| j j
| j| jd | j| j jd< | j| j jd< d S )N)variable_setvariable_getvariable-future-releasevariable_delete)	schedulerdict	variablesr   setwaitingasyncio	Conditionwaiting_conditionsstartedhandlersupdategetfuture_releaseZstream_handlersdelete)selfr    r#   8/tmp/pip-unpacked-wheel-g426oqom/distributed/variable.py__init__   s    

zVariableExtension.__init__Nc              
     s   |d k	r*d|d}| j j|gd| d n
d|d}z| j| }W n tk
rV   Y n0X |d dkr|d |krt| |d | || jkr| j4 I d H  | j  W 5 Q I d H R X || j|< d S )Nr	   )typevaluevariable-%skeysclientmsgpackr&   r'   )	r   Zclient_desires_keysr   KeyErrorr   Zensure_futurereleaser   
notify_all)r"   namekeydatar+   recordoldr#   r#   r$   r   -   s    


zVariableExtension.setc              
     sn   | j ||f rH| j| 4 I d H  | j|  I d H  W 5 Q I d H R X q | jj|gd| d | j ||f= d S )Nr(   r)   )r   r   waitr   Zclient_releases_keys)r"   r1   r0   r#   r#   r$   r.   ?   s
    &zVariableExtension.releasec              
     sX   | j ||f | | j ||f sT| j| 4 I d H  | j|   W 5 Q I d H R X d S N)r   remover   r/   )r"   r0   r1   tokenr+   r#   r#   r$   r    G   s    z VariableExtension.future_releasec                   s
  t  }| jkrt|d k	r(|t  |  }nd }|r>|dk r>t z& fdd}tj| |dI d H  W 5  j  X q j| }|d dkr|d }t j	}	 j
j|}
|
d k	r|
jnd}|	|d	}|d
kr|
jj|d< |
jj|d< t||} j||f |	 |S )Nr   c                     s$    j  I d H   j  I d H  d S r6   )r   acquirer5   r#   r"   r#   r$   _X   s    z VariableExtension.get.<locals>._)timeoutr&   r	   r'   Zlost)r8   stateerred	exception	traceback)r
   r   r   r   r.   r   wait_foruuiduuid4hexr   Ztasksr   r=   Zexception_blamer?   r@   r   r   add)r"   r0   r+   r<   startleftr;   r3   r1   r8   tsr=   msgr#   r:   r$   r   M   s0    




zVariableExtension.getc              	     s   z| j | }W n tk
r"   Y n$X |d dkrF| |d |I d H  tt | j|= W 5 Q R X tt | j |= W 5 Q R X | jd|  d S )Nr&   r	   r'   r(   )r   r-   r.   r   r   r   Zremove_client)r"   r0   r+   r4   r#   r#   r$   r!   n   s    

zVariableExtension.delete)NNNN)NNNN)NNN)NN)__name__
__module____qualname____doc__r%   r   r.   r    r   r   r!   r#   r#   r#   r$   r      s   	


!r   c                   @  sV   e Zd ZdZdddZdd Zdd	 Zdd
dZdddZdd Z	dd Z
dd ZdS )Variablea  Distributed Global Variable

    This allows multiple clients to share futures and data between each other
    with a single mutable variable.  All metadata is sequentialized through the
    scheduler.  Race conditions can occur.

    Values must be either Futures or msgpack-encodable data (ints, lists,
    strings, etc..)  All data will be kept and sent through the scheduler, so
    it is wise not to send too much.  If you want to share a large amount of
    data then ``scatter`` it and share the future instead.

    .. warning::

       This object is experimental and has known issues in Python 2

    Parameters
    ----------
    name: string (optional)
        Name used by other clients and the scheduler to identify the variable.
        If not given, a random name will be generated.
    client: Client (optional)
        Client used for communication with the scheduler.
        If not given, the default global client will be used.

    Examples
    --------
    >>> from dask.distributed import Client, Variable # doctest: +SKIP
    >>> client = Client()  # doctest: +SKIP
    >>> x = Variable('x')  # doctest: +SKIP
    >>> x.set(123)  # docttest: +SKIP
    >>> x.get()  # docttest: +SKIP
    123
    >>> future = client.submit(f, x)  # doctest: +SKIP
    >>> x.set(future)  # doctest: +SKIP

    See Also
    --------
    Queue: shared multi-producer/multi-consumer queue between clients
    Nr   c                 C  sJ   z|pt  | _W n tk
r0   t j| _Y nX |pBdt j | _d S )Nz	variable-)	r   currentr+   
ValueErrorr   rB   rC   rD   r0   )r"   r0   r+   maxsizer#   r#   r$   r%      s
    zVariable.__init__c                   sJ   t |tr,| jjjt|j| jdI d H  n| jjj|| jdI d H  d S )N)r1   r0   )r2   r0   )
isinstancer	   r+   r   r   r   r1   r0   )r"   r'   r#   r#   r$   _set   s    
 zVariable._setc                 K  s   | j j| j|f|S )zSet the value of this variable

        Parameters
        ----------
        value : Future or object
            Must be either a Future or a msgpack-encodable value
        )r+   syncrS   )r"   r'   kwargsr#   r#   r$   r      s    zVariable.setc                   s   | j jj|| j| j jdI d H }|d dkrt|d | j d|d d}|d dkrh|j|d	 |d
  | j d| j|d |d d n|d }|S )N)r<   r0   r+   r&   r	   r'   Tr=   )Zinformr=   r>   r?   r@   r   r8   )opr0   r1   r8   )	r+   r   r   r0   idr	   _stateZ	set_error_send_to_scheduler)r"   r<   dr'   r#   r#   r$   _get   s$      	zVariable._getc                 K  s"   t |}| jj| jfd|i|S )a@  Get the value of this variable

        Parameters
        ----------
        timeout : number or string or timedelta, optional
            Time in seconds to wait before timing out.
            Instead of number of seconds, it is also possible to specify
            a timedelta in string format, e.g. "200ms".
        r<   )r   r+   rT   r[   )r"   r<   rU   r#   r#   r$   r      s    
zVariable.getc                 C  s$   | j jdkr | j d| jd dS )zmDelete this variable

        Caution, this affects all clients currently pointing to this variable.
        Zrunningr   )rV   r0   N)r+   statusrY   r0   r:   r#   r#   r$   r!      s    zVariable.deletec                 C  s   | j | jjjfS r6   )r0   r+   r   addressr:   r#   r#   r$   __getstate__   s    zVariable.__getstate__c              	   C  s\   |\}}zt |}|jj|ks"tW n$ ttfk
rH   t|dd}Y nX | j||d d S )NF)Zset_as_default)r0   r+   )r   r   r]   AssertionErrorAttributeErrorr   r%   )r"   r=   r0   r]   r+   r#   r#   r$   __setstate__   s    zVariable.__setstate__)NNr   )N)N)rJ   rK   rL   rM   r%   rS   r   r[   r   r!   r^   ra   r#   r#   r#   r$   rN      s   (



rN   )
__future__r   r   loggingrB   collectionsr   
contextlibr   Ztlzr   Z
dask.utilsr   r   Zdistributed.clientr   r	   Zdistributed.metricsr
   Zdistributed.utilsr   r   Zdistributed.workerr   r   	getLoggerrJ   loggerr   rN   r#   r#   r#   r$   <module>   s   
j