U
    /e{                     @  s   d dl mZ d dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZmZ d dlZd dlmZ d dlmZ d d	lmZ d d
lmZ erd dlmZ eeZG dd dZdS )    )annotationsN)defaultdictdeque)Iterable)	timedelta)TYPE_CHECKINGcast)IOLoop)parse_timedelta)PeriodicCallback)time)WorkerStatec                   @  s  e Zd ZU dZded< ded< ded< ded< ded	< d
ed< d
ed< d
ed< ded< ded< ded< dejddfdddddddZddddZdddd Z	dd!d"d#d$Z
ddd%d&Zddd'd(d)Zd*dd+d,d-Zdd.d"d/d0Zddd1d2Zd3d4 Zed5dd6d7Zd8S )9AdaptiveCoread  
    The core logic for adaptive deployments, with none of the cluster details

    This class controls our adaptive scaling behavior.  It is intended to be
    used as a super-class or mixin.  It expects the following state and methods:

    **State**

    plan: set
        A set of workers that we think should exist.
        Here and below worker is just a token, often an address or name string

    requested: set
        A set of workers that the cluster class has successfully requested from
        the resource manager.  We expect that resource manager to work to make
        these exist.

    observed: set
        A set of workers that have successfully checked in with the scheduler

    These sets are not necessarily equivalent.  Often plan and requested will
    be very similar (requesting is usually fast) but there may be a large delay
    between requested and observed (often resource managers don't give us what
    we want).

    **Functions**

    target : -> int
        Returns the target number of workers that should exist.
        This is often obtained by querying the scheduler

    workers_to_close : int -> Set[worker]
        Given a target number of workers,
        returns a set of workers that we should close when we're scaling down

    scale_up : int -> None
        Scales the cluster up to a target number of workers, presumably
        changing at least ``plan`` and hopefully eventually also ``requested``

    scale_down : Set[worker] -> None
        Closes the provided set of workers

    Parameters
    ----------
    minimum: int
        The minimum number of allowed workers
    maximum: int | inf
        The maximum number of allowed workers
    wait_count: int
        The number of scale-down requests we should receive before actually
        scaling down
    interval: str
        The amount of time, like ``"1s"`` between checks
    intminimumzint | floatmaximum
wait_countintervalzPeriodicCallback | Noneperiodic_callbackzset[WorkerState]plan	requestedobservedzdefaultdict[WorkerState, int]close_countsbool	_adaptingzdeque[tuple[float, dict]]logr      Z1szstr | int | float | timedelta)r   r   r   r   c                   s   t |ts"t|s"td| | _| _| _t|d _	d  _
 fdd} j	rdd l}| fdd}t| j	d  _
 j| zt  _t  _t  _W n tk
r   Y nX tt _d	 _td
d _d S )Nz maximum must be int or inf; got secondsc                     s(   z j   W n tk
r"   Y nX d S N)r   startAttributeError selfr!   D/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/adaptive_core.pyfm   s    z AdaptiveCore.__init__.<locals>.fr   c                    s     } | r|   I d H  d S r   )adapt)core)self_refr!   r$   _adaptx   s    z%AdaptiveCore.__init__.<locals>._adapti  Fi'  )maxlen)
isinstancer   mathisinf	TypeErrorr   r   r   r
   r   r   weakrefrefr   loopZadd_callbacksetr   r   r   	Exceptionr   r   r   r   r   )r#   r   r   r   r   r%   r/   r)   r!   )r#   r(   r$   __init__]   s.    

zAdaptiveCore.__init__None)returnc                 C  s$   t d | jr | j  d | _d S )NzAdaptive stop)loggerinfor   stopr"   r!   r!   r$   r9      s    

zAdaptiveCore.stopc                   s
   t  dS )z.The target number of workers that should existNNotImplementedErrorr"   r!   r!   r$   target   s    zAdaptiveCore.targetlist)r<   r6   c                   s   t | j|d S )zW
        Give a list of workers to close that brings us down to target workers
        N)r=   r   )r#   r<   r!   r!   r$   workers_to_close   s    zAdaptiveCore.workers_to_closec                   s8   |   I dH }|| jkr$tt| j}|| jk r4| j}|S )z:Used internally, like target, but respects minimum/maximumN)r<   r   r   r   r   r#   nr!   r!   r$   safe_target   s    

zAdaptiveCore.safe_target)r@   r6   c                   s
   t  d S r   r:   r?   r!   r!   r$   
scale_down   s    zAdaptiveCore.scale_downr   )workersr6   c                   s
   t  d S r   r:   )r#   rC   r!   r!   r$   scale_up   s    zAdaptiveCore.scale_updictc                   s2  | j }| j}| j}|t|kr0| j  ddiS |t|krP| j  d|dS || }t }|r||t	t|| | |t|t| k r| j
|dI dH }|| t }|D ]0}	| j|	  d7  < | j|	 | jkr||	 qt| jD ]}
|
|ks|
|kr| j|
= q|r&dt|d	S ddiS dS )
zV
        Make scale up/down recommendations based on current state and target
        statussameup)rF   r@   )r<   N   down)rF   rC   )r   r   r   lenr   clearr2   updatetoolzZtaker>   r   addr=   )r#   r<   r   r   r   Znot_yet_arrivedto_closeLZfirmly_closewkr!   r!   r$   recommendations   s6    




zAdaptiveCore.recommendationsc                   s   | j r
dS d| _ d}zz|  I dH }| |I dH }|d dkrX| jt t|f |d}|dkrtW W dS |dkr| jf |I dH  |dkr| j	f |I dH  W nB t
k
r   |dkrtjddd	 |   ntjd
dd	 Y nX W 5 d| _ X dS )zy
        Check the current state, make recommendations, call scale

        This is the main event of the system
        NTFrF   rG   rH   rJ   zAdaptive stopping due to error)exc_infoz,Error during adaptive downscaling. Ignoring.)r   rA   rT   r   appendr   rE   poprD   rB   OSErrorr7   errorr9   )r#   rF   r<   rT   r!   r!   r$   r&      s2    


 zAdaptiveCore.adaptc                 C  s   |    d S r   )r9   r"   r!   r!   r$   __del__   s    zAdaptiveCore.__del__r	   c                 C  s   t  S r   )r	   currentr"   r!   r!   r$   r1      s    zAdaptiveCore.loopN)__name__
__module____qualname____doc____annotations__r,   infr4   r9   r<   r>   rA   rB   rD   rT   r&   rZ   propertyr1   r!   r!   r!   r$   r      s8   
7/)%r   )
__future__r   loggingr,   collectionsr   r   collections.abcr   datetimer   typingr   r   ZtlzrN   Ztornado.ioloopr	   Z
dask.utilsr
   Zdistributed.compatibilityr   Zdistributed.metricsr   Zdistributed.schedulerr   	getLoggerr\   r7   r   r!   r!   r!   r$   <module>   s   
