U
    /e7	                     @  s2   d dl mZ d dlZd dlmZ G dd dZdS )    )annotationsN)timec                   @  sx   e Zd ZdZdddddZddd	d
ZddddZddddZddddZdddddZ	dddddZ
dS )ResourceLimitera  Limit an abstract resource

    This allows us to track usage of an abstract resource. If the usage of this
    resources goes beyond a defined maxvalue, we can block further execution

    Example::

        limiter = ResourceLimiter(2)
        limiter.increase(1)
        limiter.increase(2)
        limiter.decrease(1)

        # This will block since we're still not below maxvalue
        await limiter.wait_for_available()
    intNone)maxvaluereturnc                 C  s,   || _ d| _t | _d| _d| _d| _d S )Nr   g        )	_maxvalue	_acquiredasyncio	Condition
_condition_waiterstime_blocked_totaltime_blocked_avg)selfr    r   @/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_limiter.py__init__   s    
zResourceLimiter.__init__str)r   c                 C  s   d| j  d|   dS )Nz<ResourceLimiter maxvalue: z available: >)r	   	availabler   r   r   r   __repr__!   s    zResourceLimiter.__repr__c                 C  s   t d| j| j S )z2How far can the value be increased before blockingr   )maxr	   r
   r   r   r   r   r   $   s    zResourceLimiter.availableboolc                 C  s
   | j dkS )zLReturn True if nothing has been acquired / the limiter is in a neutral stater   r
   r   r   r   r   free(   s    zResourceLimiter.freec              
     s   t  }d}zn|  rW bdS | j4 I dH @ |  jd7  _| j| jI dH  |  jd8  _t  | }W 5 Q I dH R X W 5 |  j|7  _| jd |d  | _X dS )z,Block until the counter drops below maxvaluer   g?g?N   )r   r   r   r   r   r   wait_for)r   startdurationr   r   r   wait_for_available,   s    z"ResourceLimiter.wait_for_available)valuer   c                 C  s   |  j |7  _ dS )z&Increase the internal counter by valueNr   r   r#   r   r   r   increase<   s    zResourceLimiter.increasec              
     s\   || j kr td| d| j  |  j |8  _ | j4 I dH  | j  W 5 Q I dH R X dS )z&Decrease the internal counter by valuez5Cannot release more than what was acquired! release: z acquired: N)r
   RuntimeErrorr   
notify_allr$   r   r   r   decrease@   s    
zResourceLimiter.decreaseN)__name__
__module____qualname____doc__r   r   r   r   r"   r%   r(   r   r   r   r   r      s   r   )
__future__r   r   Zdistributed.metricsr   r   r   r   r   r   <module>   s   