U
    /eT                     @  s   d dl mZ d dlZd dlZd dlZd dlmZmZ d dlm	Z	 d dl
mZ d dlmZmZ d dlmZ eeZG dd	 d	ZG d
d dZdS )    )annotationsN)defaultdictdeque)parse_timedelta)Client)TimeoutError
log_errors)
get_workerc                   @  s4   e Zd ZdZdd Zed	ddZed
ddZdS )LockExtensionzAn extension for the scheduler to manage Locks

    This adds the following routes to the scheduler

    *  lock_acquire
    *  lock_release
    c                 C  s4   || _ tt| _t | _| j j| j| j	d d S )N)lock_acquirelock_release)
	schedulerr   r   eventsdictidshandlersupdateacquirerelease)selfr    r   4/tmp/pip-unpacked-wheel-g426oqom/distributed/lock.py__init__   s    
zLockExtension.__init__Nc                   s   t |trt|}|| jkr"d}n|| jkrt }| j| | | }|d k	r`t	||}z8z|I d H  W n  tk
r   d}Y W qY nX d}W 5 | j| 
 }||kstX q"|r|| jkst|| j|< |S )NTF)
isinstancelisttupler   asyncioEventr   appendwaitwait_forpopleftAssertionErrorr   )r   nameidtimeoutresulteventfutureZevent2r   r   r   r   #   s,    



zLockExtension.acquirec                 C  sd   t |trt|}| j||kr*td| j|= | j| rX| jj	| j| d j
 n| j|= d S )Nz#This lock has not yet been acquiredr   )r   r   r   r   get
ValueErrorr   r   ZloopZadd_callbackset)r   r#   r$   r   r   r   r   ?   s    

zLockExtension.release)NNN)NN)__name__
__module____qualname____doc__r   r   r   r   r   r   r   r   r
      s   	r
   c                   @  s\   e Zd ZdZdddZdddZdd	 Zd
d Zdd Zdd Z	dd Z
dd Zdd ZdS )Lockaw  Distributed Centralized Lock

    Parameters
    ----------
    name: string (optional)
        Name of the lock to acquire.  Choosing the same name allows two
        disconnected processes to coordinate a lock.  If not given, a random
        name will be generated.
    client: Client (optional)
        Client to use for communication with the scheduler.  If not given, the
        default global client will be used.

    Examples
    --------
    >>> lock = Lock('x')  # doctest: +SKIP
    >>> lock.acquire(timeout=1)  # doctest: +SKIP
    >>> # do things with protected resource
    >>> lock.release()  # doctest: +SKIP
    Nc                 C  s\   z|pt  | _W n tk
r0   t j| _Y nX |pBdt j | _t j| _	d| _
d S )Nzlock-F)r   currentclientr*   r	   uuidZuuid4hexr#   r$   _locked)r   r#   r2   r   r   r   r   a   s    zLock.__init__Tc                 C  sH   t |}|s |dk	rtdd}| jj| jjj| j| j|d}d| _|S )a/  Acquire the lock

        Parameters
        ----------
        blocking : bool, optional
            If false, don't wait on the lock in the scheduler at all.
        timeout : string or number or timedelta, optional
            Seconds to wait on the lock in the scheduler.  This does not
            include local coroutine time, network transfer time, etc..
            It is forbidden to specify a timeout when blocking is false.
            Instead of number of seconds, it is also possible to specify
            a timedelta in string format, e.g. "200ms".

        Examples
        --------
        >>> lock = Lock('x')  # doctest: +SKIP
        >>> lock.acquire(timeout="1s")  # doctest: +SKIP

        Returns
        -------
        True or False whether or not it successfully acquired the lock
        Nz/can't specify a timeout for a non-blocking callr   )r#   r$   r%   T)	r   r*   r2   syncr   r   r#   r$   r5   )r   blockingr%   r&   r   r   r   r   k   s    zLock.acquirec                 C  s6   |   std| jj| jjj| j| jd}d| _|S )z$Release the lock if already acquiredzLock is not yet acquired)r#   r$   F)	lockedr*   r2   r6   r   r   r#   r$   r5   )r   r&   r   r   r   r      s      zLock.releasec                 C  s   | j S N)r5   r   r   r   r   r8      s    zLock.lockedc                 C  s   |    | S r9   r   r:   r   r   r   	__enter__   s    zLock.__enter__c                 C  s   |    d S r9   r   r   exc_type	exc_value	tracebackr   r   r   __exit__   s    zLock.__exit__c                   s   |   I d H  | S r9   r;   r:   r   r   r   
__aenter__   s    zLock.__aenter__c                   s   |   I d H  d S r9   r=   r>   r   r   r   	__aexit__   s    zLock.__aexit__c                 C  s   t | jffS r9   )r0   r#   r:   r   r   r   
__reduce__   s    zLock.__reduce__)NN)TN)r,   r-   r.   r/   r   r   r   r8   r<   rB   rC   rD   rE   r   r   r   r   r0   L   s   


'
r0   )
__future__r   r   loggingr3   collectionsr   r   Z
dask.utilsr   Zdistributed.clientr   Zdistributed.utilsr   r   Zdistributed.workerr	   	getLoggerr,   loggerr
   r0   r   r   r   r   <module>   s   
;