U
    /e.                     @  s   d dl mZ d dlZd dlmZ d dlmZ d dlZd dl	m
Z
 d dlmZ d dlmZ d dlmZ eeZG d	d
 d
eZdS )    )annotationsN)isawaitable)IOLoop)parse_timedelta)AdaptiveCore)pickle)
log_errorsc                      s   e Zd ZdZd fdd	Zedd Zedd Zed	d
 Zedd Z	dd Z
ddd fddZdddddZedd Zdd ZeddddZ  ZS ) Adaptivea	  
    Adaptively allocate workers based on scheduler load.  A superclass.

    Contains logic to dynamically resize a Dask cluster based on current use.
    This class needs to be paired with a system that can create and destroy
    Dask workers using a cluster resource manager.  Typically it is built into
    already existing solutions, rather than used directly by users.
    It is most commonly used from the ``.adapt(...)`` method of various Dask
    cluster classes.

    Parameters
    ----------
    cluster: object
        Must have scale and scale_down methods/coroutines
    interval : timedelta or str, default "1000 ms"
        Milliseconds between checks
    wait_count: int, default 3
        Number of consecutive times that a worker should be suggested for
        removal before we remove it.
    target_duration: timedelta or str, default "5s"
        Amount of time we want a computation to take.
        This affects how aggressively we scale up.
    worker_key: Callable[WorkerState]
        Function to group workers together when scaling down
        See Scheduler.workers_to_close for more information
    minimum: int
        Minimum number of workers to keep around
    maximum: int
        Maximum number of workers to keep around
    **kwargs:
        Extra parameters to pass to Scheduler.workers_to_close

    Examples
    --------

    This is commonly used from existing Dask classes, like KubeCluster

    >>> from dask_kubernetes import KubeCluster
    >>> cluster = KubeCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Alternatively you can use it from your own Cluster class by subclassing
    from Dask's Cluster superclass

    >>> from distributed.deploy import Cluster
    >>> class MyCluster(Cluster):
    ...     def scale_up(self, n):
    ...         """ Bring worker count up to n """
    ...     def scale_down(self, workers):
    ...        """ Remove worker addresses from cluster """

    >>> cluster = MyCluster()
    >>> cluster.adapt(minimum=10, maximum=100)

    Notes
    -----
    Subclasses can override :meth:`Adaptive.target` and
    :meth:`Adaptive.workers_to_close` to control when the cluster should be
    resized. The default implementation checks if there are too many tasks
    per worker or too little memory available (see
    :meth:`distributed.Scheduler.adaptive_target`).
    The values for interval, min, max, wait_count and target_duration can be
    specified in the dask config under the distributed.adaptive key.
    Nc           	        s   || _ || _|| _|d kr&tjd}|d kr:tjd}|d krNtjd}|d krbtjd}|d krvtjd}t|| _t	d|| t
 j||||d d S )Nzdistributed.adaptive.intervalzdistributed.adaptive.minimumzdistributed.adaptive.maximumzdistributed.adaptive.wait-countz$distributed.adaptive.target-durationz/Adaptive scaling started: minimum=%s maximum=%s)minimummaximum
wait_countinterval)cluster
worker_key_workers_to_close_kwargsdaskconfiggetr   target_durationloggerinfosuper__init__)	selfr   r   r
   r   r   r   r   kwargs	__class__ ?/tmp/pip-unpacked-wheel-g426oqom/distributed/deploy/adaptive.pyr   T   s*    
   zAdaptive.__init__c                 C  s   | j jS N)r   Zscheduler_commr   r   r   r   	schedulerv   s    zAdaptive.schedulerc                 C  s   | j jS r   )r   planr    r   r   r   r"   z   s    zAdaptive.planc                 C  s   | j jS r   )r   	requestedr    r   r   r   r#   ~   s    zAdaptive.requestedc                 C  s   | j jS r   )r   observedr    r   r   r   r$      s    zAdaptive.observedc                   s   | j j| jdI dH S )a_  
        Determine target number of workers that should exist.

        Notes
        -----
        ``Adaptive.target`` dispatches to Scheduler.adaptive_target(),
        but may be overridden in subclasses.

        Returns
        -------
        Target number of workers

        See Also
        --------
        Scheduler.adaptive_target
        )r   N)r!   Zadaptive_targetr   r    r   r   r   target   s    zAdaptive.targetintdict)r%   returnc                   s2   t | jt | jkr | jI d H  t |I d H S r   )lenr"   r#   r   r   recommendationsr   r%   r   r   r   r*      s    zAdaptive.recommendationsz	list[str]c                   s4   | j jf || jrt| jnddd| jI dH S )a  
        Determine which, if any, workers should potentially be removed from
        the cluster.

        Notes
        -----
        ``Adaptive.workers_to_close`` dispatches to Scheduler.workers_to_close(),
        but may be overridden in subclasses.

        Returns
        -------
        List of worker names to close, if any

        See Also
        --------
        Scheduler.workers_to_close
        Nname)r%   key	attribute)r!   workers_to_closer   r   dumpsr   r+   r   r   r   r/      s    zAdaptive.workers_to_closec                   sN   |sd S t d| | jj|dddI d H  | j|}t|rJ|I d H  d S )NzRetiring workers %sT)namesremoveZclose_workers)r   r   r!   Zretire_workersr   
scale_downr   )r   workersfr   r   r   r3      s    zAdaptive.scale_downc                   s"   | j |}t|r|I d H  d S r   )r   Zscaler   )r   nr5   r   r   r   scale_up   s    zAdaptive.scale_upr   )r(   c                 C  s   | j r| j jS t S dS )zOverride Adaptive.loopN)r   loopr   currentr    r   r   r   r8      s    zAdaptive.loop)NNNNNNN)__name__
__module____qualname____doc__r   propertyr!   r"   r#   r$   r%   r*   r/   r   r3   r7   r8   __classcell__r   r   r   r   r	      s2   C       "




r	   )
__future__r   logginginspectr   Ztornado.ioloopr   Zdask.configr   Z
dask.utilsr   Z distributed.deploy.adaptive_corer   Zdistributed.protocolr   Zdistributed.utilsr   	getLoggerr:   r   r	   r   r   r   r   <module>   s   
