U
    /ebL                     @  s   d dl mZ d dlZd dlZd dlZd dlZd dlmZ d dlmZm	Z	 d dl
Z
d dlmZ d dlmZ d dlmZ d dlmZmZmZ d d	lmZ d d
lmZmZ eeZG dd dZG dd deZdS )    )annotationsN)TimeoutError)defaultdictdeque)parse_timedelta)PeriodicCallback)time)DeadlineSyncMethodMixin
log_errors)retry_operation)
get_client
get_workerc                   @  s   e Zd ZdZdd ZdddZdddZedd	d
Zdd Z	dd Z
edddZedddZdd Zdd ZedddZdS )SemaphoreExtensionzAn extension for the scheduler to manage Semaphores

    This adds the following routes to the scheduler

    * semaphore_acquire
    * semaphore_release
    * semaphore_close
    * semaphore_refresh_leases
    * semaphore_register
    c              	   C  s   || _ ttj| _t | _tt| _| j j	| j
| j| j| j| j| jd ttttttttd| _ttjddd}t| j|d  | j jd< }|  ttjddd| _d S )	N)semaphore_registersemaphore_acquiresemaphore_releasesemaphore_closesemaphore_refresh_leasessemaphore_value)acquire_totalrelease_totalaverage_pending_lease_timependingz5distributed.scheduler.locks.lease-validation-intervalsdefault  zsemaphore-lease-timeout)distributed.scheduler.locks.lease-timeout)	schedulerr   asyncioEventeventsdict
max_leasesleaseshandlersupdatecreateacquirereleasecloserefresh_leases	get_valueintfloatmetricsr   daskconfiggetr   _check_lease_timeoutZperiodic_callbacksstartlease_timeout)selfr   Zvalidation_callback_timepc r9   9/tmp/pip-unpacked-wheel-g426oqom/distributed/semaphore.py__init__"   sF    

 
 zSemaphoreExtension.__init__Nc                 C  s   t | j| S N)lenr%   r7   namer9   r9   r:   r-   M   s    zSemaphoreExtension.get_valuec                 C  sP   || j kr(t|tst||| j |< n$|| j | krLtd|| j | f d S )Nz)Inconsistent max leases: %s, expected: %s)r$   
isinstancer.   AssertionError
ValueError)r7   r?   r$   r9   r9   r:   r(   Q   s    
zSemaphoreExtension.createc                 C  sX   t  }td||| |D ]8}|| j| krDtd| d| d || j| |< qd S )Nz'Refresh leases for %s with ids %s at %szRefreshing an unknown lease ID z for z. This might be due to leases timing out and may cause overbooking of the semaphore!This is often caused by long-running GIL-holding in the task which acquired the lease.)r   loggerdebugr%   critical)r7   r?   	lease_idsnowZid_r9   r9   r:   r,   ^   s    z!SemaphoreExtension.refresh_leasesc                 C  sn   d}|| j | ks*t| j | | j| k rft }td||| || j | |< | jd |  d7  < nd}|S )NTzAcquire lease %s for %s at %sr      F)r%   r=   r$   r   rC   rD   r0   )r7   r?   lease_idresultrG   r9   r9   r:   
_get_leasek   s    zSemaphoreExtension._get_leasec                 C  s   || j krdS dS )NFT)r$   r>   r9   r9   r:   _semaphore_exists{   s    
z$SemaphoreExtension._semaphore_existsc                   s  |  |std| dt|tr,t|}t|}| jd |  d7  < t	d|||j
 | j|   | ||}|stj| j|  |j
d}z|I d H  W qLW n tk
r   d}Y nX t	d||||j | jd	 | |j d
 | jd	 |< | jd |  d8  < |S )NzSemaphore `z` not known or already closed.r   rH   z1Trying to acquire %s for %s with %s seconds left.timeoutFz;Acquisition of lease %s for %s is %s after waiting for %ss.r      )rL   RuntimeErrorr@   listtupler	   afterr0   rC   rD   	remainingr"   clearrK   r    wait_forwaitr   elapsed)r7   r?   rN   rI   deadlinerJ   futurer9   r9   r:   r)      sH    


 

	zSemaphoreExtension.acquirec                 C  st   |  |s td| d d S t|tr2t|}|| jkrX|| j| krX| || ntd|d|d d S )NzTried to release semaphore `z(` but it is not known or already closed.z=Tried to release semaphore but it was already released: name=z, lease_id=z4. This can happen if the semaphore timed out before.)rL   rC   warningr@   rQ   rR   r%   _release_valuer7   r?   rI   r9   r9   r:   r*      s    


zSemaphoreExtension.releasec                 C  sB   t d|| | j| |= | j|   | jd |  d7  < d S )NzReleasing %s for %sr   rH   )rC   rD   r%   r"   setr0   r]   r9   r9   r:   r\      s    z!SemaphoreExtension._release_valuec                 C  s   t  }t| j }|D ]l}t| j| }td||| j|  |D ]>}|| j| |  }|| jkrDtd||| | j||d qDqd S )Nz7Validating leases for %s at time %s. Currently known %sz$Lease %s for %s timed out after %ss.)r?   rI   )r   rQ   r%   keysrC   rD   r6   r\   )r7   rG   Zsemaphore_namesr?   idsZ_idZtime_since_refreshr9   r9   r:   r4      s(    
z'SemaphoreExtension._check_lease_timeoutc                 C  s   |  |sdS | j|= || jkr(| j|= || jkrf| j| r^td| dt| j|  t | j|= || jd kr| jd | rtd| dt | j	 D ]\}}||kr||= qdS )zJHard close the semaphore without warning clients which still hold a lease.NzClosing semaphore z$ but there remain unreleased leases r   z  but there remain pending leases)
rL   r$   r"   r%   warningswarnsortedRuntimeWarningr0   items)r7   r?   _Zmetric_dictr9   r9   r:   r+      s*    




zSemaphoreExtension.close)N)NN)NN)NNN)NN)N)__name__
__module____qualname____doc__r;   r-   r(   r   r,   rK   rL   r)   r*   r\   r4   r+   r9   r9   r9   r:   r      s    +

0r   c                   @  s   e Zd ZdZd)ddZdd Zd	d
 Zdd Zdd Zd*ddZ	d+ddZ
dd Zdd Zdd Zdd Zdd Zdd Zdd  Zd!d" Zd#d$ Zd%d& Zd'd( ZdS ),	Semaphorea  Semaphore

    This `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`_
    will track leases on the scheduler which can be acquired and
    released by an instance of this class. If the maximum amount of leases are
    already acquired, it is not possible to acquire more and the caller waits
    until another lease has been released.

    The lifetime or leases are controlled using a timeout. This timeout is
    refreshed in regular intervals by the ``Client`` of this instance and
    provides protection from deadlocks or resource starvation in case of worker
    failure.
    The timeout can be controlled using the configuration option
    ``distributed.scheduler.locks.lease-timeout`` and the interval in which the
    scheduler verifies the timeout is set using the option
    ``distributed.scheduler.locks.lease-validation-interval``.

    A noticeable difference to the Semaphore of the python standard library is
    that this implementation does not allow to release more often than it was
    acquired. If this happens, a warning is emitted but the internal state is
    not modified.

    .. warning::

        This implementation is susceptible to lease overbooking in case of
        lease timeouts. It is advised to monitor log information and adjust
        above configuration options to suitable values for the user application.

    Parameters
    ----------
    max_leases: int (optional)
        The maximum amount of leases that may be granted at the same time. This
        effectively sets an upper limit to the amount of parallel access to a specific resource.
        Defaults to 1.
    name: string (optional)
        Name of the semaphore to acquire.  Choosing the same name allows two
        disconnected processes to coordinate.  If not given, a random
        name will be generated.
    register: bool
        If True, register the semaphore with the scheduler. This needs to be
        done before any leases can be acquired. If not done during
        initialization, this can also be done by calling the register method of
        this class.
        When registering, this needs to be awaited.
    scheduler_rpc: ConnectionPool
        The ConnectionPool to connect to the scheduler. If None is provided, it
        uses the worker or client pool. This parameter is mostly used for
        testing.
    loop: IOLoop
        The event loop this instance is using. If None is provided, reuse the
        loop of the active worker or client.

    Examples
    --------
    >>> from distributed import Semaphore
    ... sem = Semaphore(max_leases=2, name='my_database')
    ...
    ... def access_resource(s, sem):
    ...     # This automatically acquires a lease from the semaphore (if available) which will be
    ...     # released when leaving the context manager.
    ...     with sem:
    ...         pass
    ...
    ... futures = client.map(access_resource, range(10), sem=sem)
    ... client.gather(futures)
    ... # Once done, close the semaphore to clean up the state on scheduler side.
    ... sem.close()

    Notes
    -----
    If a client attempts to release the semaphore but doesn't have a lease acquired, this will raise an exception.


    When a semaphore is closed, if, for that closed semaphore, a client attempts to:

    - Acquire a lease: an exception will be raised.
    - Release: a warning will be logged.
    - Close: nothing will happen.


    dask executes functions by default assuming they are pure, when using semaphore acquire/releases inside
    such a function, it must be noted that there *are* in fact side-effects, thus, the function can no longer be
    considered pure. If this is not taken into account, this may lead to unexpected behavior.

    rH   NTc           
      C  s   z"t  }|p|j| _|p|j| _W n2 tk
rT   t }|p@|j| _|pL|j| _Y nX |pfdt j | _|| _	t j| _
t | _d| _d | _|r|  | _ttjdddd }t| j|d d}	|	| _| j|	j d S )	Nz
semaphore-Tr   r   r      r   )Zcallback_time)r   r   looprB   r   uuiduuid4hexr?   r$   idr   _leasesr,   _registeredregisterr   r1   r2   r3   r   _refresh_leasesrefresh_callbackZadd_callbackr5   )
r7   r$   r?   rt   Zscheduler_rpcrm   ZworkerclientZrefresh_leases_intervalr8   r9   r9   r:   r;   S  s:    	

 zSemaphore.__init__c                   s2   t | jj| j| jd| j d| j dI d H  d S )Nzsemaphore register id=z name=)r?   r$   	operation)r   r   r   r?   r$   rq   r7   r9   r9   r:   	_register  s    zSemaphore._registerc                 K  s   |  | jS r<   )syncrz   )r7   kwargsr9   r9   r:   rt     s    zSemaphore.registerc                   s    fdd}|   S )Nc                     s    j r j I d H   S r<   )rs   r9   ry   r9   r:   create_semaphore  s    z-Semaphore.__await__.<locals>.create_semaphore)	__await__)r7   r}   r9   ry   r:   r~     s    zSemaphore.__await__c                   s\   | j rX| jrXtd| j| j| j t| jjt	| j| jd| jt	| j| jf dI d H  d S )Nz'%s refreshing leases for %s with IDs %sz6semaphore refresh leases: id=%s, lease_ids=%s, name=%s)rF   r?   rx   )
r,   rr   rC   rD   rq   r?   r   r   r   rQ   ry   r9   r9   r:   ru     s    zSemaphore._refresh_leasesc              	     s^   t  j}td| j| j| t| jj	| j||d| j|| jf dI d H }|rZ| j
| |S )Nz#%s requests lease for %s with ID %sz.semaphore acquire: id=%s, lease_id=%s, name=%s)r?   rN   rI   rx   )rn   ro   rp   rC   rD   rq   r?   r   r   r   rr   append)r7   rN   rI   rJ   r9   r9   r:   _acquire  s&    
   zSemaphore._acquirec                 C  s   t |}| j| j|dS )aG  
        Acquire a semaphore.

        If the internal counter is greater than zero, decrement it by one and return True immediately.
        If it is zero, wait until a release() is called and return True.

        Parameters
        ----------
        timeout : number or string or timedelta, optional
            Seconds to wait on acquiring the semaphore.  This does not
            include local coroutine time, network transfer time, etc..
            Instead of number of seconds, it is also possible to specify
            a timedelta in string format, e.g. "200ms".
        rM   )r   r{   r   )r7   rN   r9   r9   r:   r)     s    zSemaphore.acquirec                   sh   z0t | jj| j|d| j|| jf dI d H  W dS  tk
rb   tjd| j|| jf dd Y dS X d S )Nz.semaphore release: id=%s, lease_id=%s, name=%s)r?   rI   rx   TzRRelease failed for id=%s, lease_id=%s, name=%s. Cluster network might be unstable?)exc_infoF)r   r   r   r?   rq   	ExceptionrC   errorr7   rI   r9   r9   r:   _release  s$    zSemaphore._releasec                 C  s<   | j std| j  }td| j|| j | j| j|dS )a#  
        Release the semaphore.

        Returns
        -------
        bool
            This value indicates whether a lease was released immediately or not. Note that a user should  *not* retry
            this operation. Under certain circumstances (e.g. scheduler overload) the lease may not be released
            immediately, but it will always be automatically released after a specific interval configured using
            "distributed.scheduler.locks.lease-validation-interval" and "distributed.scheduler.locks.lease-timeout".
        zReleased too oftenz%s releases %s for %s)rI   )	rr   rP   popleftrC   rD   rq   r?   r{   r   r   r9   r9   r:   r*     s
    
zSemaphore.releasec                 C  s   | j | jj| jdS )zC
        Return the number of currently registered leases.
        r?   )r{   r   r   r?   ry   r9   r9   r:   r-     s    zSemaphore.get_valuec                 C  s   |    | S r<   r)   ry   r9   r9   r:   	__enter__  s    zSemaphore.__enter__c                 C  s   |    d S r<   r*   r7   exc_type	exc_value	tracebackr9   r9   r:   __exit__  s    zSemaphore.__exit__c                   s   |   I d H  | S r<   r   ry   r9   r9   r:   
__aenter__  s    zSemaphore.__aenter__c                   s   |   I d H  d S r<   r   r   r9   r9   r:   	__aexit__  s    zSemaphore.__aexit__c                 C  s   | j | jfS r<   )r?   r$   ry   r9   r9   r:   __getstate__  s    zSemaphore.__getstate__c                 C  s   |\}}| j ||dd d S )NF)r?   r$   rt   )r;   )r7   stater?   r$   r9   r9   r:   __setstate__  s    zSemaphore.__setstate__c                 C  s   | j   | j| jj| jdS )Nr   )rv   stopr{   r   r   r?   ry   r9   r9   r:   r+     s    
zSemaphore.closec                 C  s   | j   d S r<   )rv   r   ry   r9   r9   r:   __del__  s    zSemaphore.__del__)rH   NTNN)N)N)rg   rh   ri   rj   r;   rz   rt   r~   ru   r   r)   r   r*   r-   r   r   r   r   r   r   r+   r   r9   r9   r9   r:   rk      s0   X     
0

rk   )
__future__r   r    loggingrn   ra   r   collectionsr   r   r1   Z
dask.utilsr   Zdistributed.compatibilityr   Zdistributed.metricsr   Zdistributed.utilsr	   r
   r   Zdistributed.utils_commr   Zdistributed.workerr   r   	getLoggerrg   rC   r   rk   r9   r9   r9   r:   <module>   s"   
 g