U
    /e"p                     @  s`  U d Z ddlmZ ddlZddlZddlmZ ddlmZ ddl	m
Z
mZmZmZ ddlZddlmZ ddlmZ dd	lmZ dd
lmZ ddlmZmZ e
rddlmZ ddlmZmZmZ e e!Z"e e!d Z#G dd dZ$G dd deZ%e
rddl&m'Z' ee%ddf Z(de)d< G dd dej*Z+G dd dZ,G dd de+Z-G dd de+Z.dS )a  Implementation of the Active Memory Manager. This is a scheduler extension which
sends drop/replicate suggestions to the worker.

See also :mod:`distributed.worker_memory` and :mod:`distributed.spill`, which implement
spill/pause/terminate mechanics on the Worker side.
    )annotationsN)defaultdict)	Generator)TYPE_CHECKINGAnyLiteral
NamedTuple)parse_timedelta)PeriodicCallback)Status)time)import_term
log_errors)Client)	Scheduler	TaskStateWorkerStatez.tasksc                	   @  s  e Zd ZU dZded< ded< ded< ded	< d
ed< ded< d9ddddddddddddddZdddddZdddd Zddd!d"Ze	ddd#d$Z
d%dd&d'd(Zeddd)d*Zddd+d,Zd-d.d/d0d1d2d3Zd-d.d/d0d4d5d6Zddd7d8ZdS ):ActiveMemoryManagerExtensiona}  Scheduler extension that optimizes memory usage across the cluster.
    It can be either triggered by hand or automatically every few seconds; at every
    iteration it performs one or both of the following:

    - create new replicas of in-memory tasks
    - destroy replicas of in-memory tasks; this never destroys the last available copy.

    There are no 'move' operations. A move is performed in two passes: first you create
    a copy and, in the next iteration, you delete the original (if the copy succeeded).

    This extension is configured by the dask config section
    ``distributed.scheduler.active-memory-manager``.
    r   	schedulerzset[ActiveMemoryManagerPolicy]policiesstrmeasurefloatintervalzdict[WorkerState, int]workers_memoryz:dict[TaskState, tuple[set[WorkerState], set[WorkerState]]]pendingNT)r   registerstartr   z%set[ActiveMemoryManagerPolicy] | Nonez
str | Noneboolzbool | Nonezfloat | None)r   r   r   r   r   r   c                C  s  || _ t | _|d krTt }tjdD ]*}| }t|d}|	|f | q(|D ]}	| 
|	 qX|sxtjd}|j}
dd t|
D }t|tr||krtddt| || _|r| |jd< | j|jd	< |d krttjd
}|| _|d krtjd}|r|   d S )Nz4distributed.scheduler.active-memory-manager.policiesclassz3distributed.scheduler.active-memory-manager.measurec                 S  s"   h | ]}| d s|dkr|qS )_sum)
startswith).0name r%   E/tmp/pip-unpacked-wheel-g426oqom/distributed/active_memory_manager.py	<setcomp>a   s    
  z8ActiveMemoryManagerExtension.__init__.<locals>.<setcomp>zCdistributed.scheduler.active-memory-manager.measure must be one of z, Zammamm_handlerz4distributed.scheduler.active-memory-manager.intervalz1distributed.scheduler.active-memory-manager.start)r   setr   daskconfiggetcopyr   popadd
add_policymemorydir
isinstancer   
ValueErrorjoinsortedr   
extensionsr(   handlersr	   r   r   )selfr   r   r   r   r   r   kwargsclspolicyZmemZmeasure_domainr%   r%   r&   __init__@   sN    


z%ActiveMemoryManagerExtension.__init__r   methodreturnc                 C  s(   |dkst t| |}t|r$| S |S )zyScheduler handler, invoked from the Client by
        :class:`~distributed.active_memory_manager.AMMClientProxy`
        >   r   stoprun_oncerunning)AssertionErrorgetattrcallable)r9   r?   outr%   r%   r&   r(   z   s    
z(ActiveMemoryManagerExtension.amm_handlerNoner@   c                 C  s>   | j r
dS t| j| jd }|| jjdt|  < |  dS )zHStart executing every ``self.interval`` seconds until scheduler shutdownNg     @@amm-)rC   r
   rB   r   r   periodic_callbacksidr   r9   Zpcr%   r%   r&   r      s
    z"ActiveMemoryManagerExtension.startc                 C  s*   | j jdt|  d}|r&|  dS )zStop periodic executionrJ   N)r   rK   r.   rL   rA   rM   r%   r%   r&   rA      s    z!ActiveMemoryManagerExtension.stopc                 C  s   dt |  | jjkS )zGReturn True if the AMM is being triggered periodically; False otherwiserJ   )rL   r   rK   r9   r%   r%   r&   rC      s    z$ActiveMemoryManagerExtension.runningActiveMemoryManagerPolicy)r<   r@   c                 C  s.   t |tstd|| j| | |_d S )Nz(Expected ActiveMemoryManagerPolicy; got )r3   rO   	TypeErrorr   r/   manager)r9   r<   r%   r%   r&   r0      s    
z'ActiveMemoryManagerExtension.add_policyc                   s   t  }t| drti | _| j  fdd| jj D | _z| 	  | jrT| 
  W 5 | `| `X t  }td|| d  dS )zRun all policies once and asynchronously (fire and forget) enact their
        recommendations to replicate/drop tasks
        r   c                   s   i | ]}|t |j qS r%   )rE   r1   r#   wsr   r%   r&   
<dictcomp>   s     z9ActiveMemoryManagerExtension.run_once.<locals>.<dictcomp>z#Active Memory Manager run in %.0fmsi  N)r   hasattrrD   r   r   r   workersvaluesr   _run_policies_enact_suggestionsloggerdebug)r9   Zts_startZts_stopr%   rT   r&   rB      s    

z%ActiveMemoryManagerExtension.run_oncec              	   C  sP  t | jD ]>}td| | }d}z||}W n tk
rN   Y q
Y nX t|tsbt| }z| j	|j
 \}}W n0 tk
r   t }t }||f| j	|j
< Y nX |jdkr| |j
|j|}|r|| | j|  |j
j7  < q(|jdkr8| |j
|j|}|rH|| td| j| |j
j | j|< q(td|j q(q
dS )zSequentially run ActiveMemoryManagerPolicy.run() for all registered policies,
        obtain replicate/drop suggestions, and use them to populate self.pending.
        zRunning policy: %sN	replicatedropr   zUnknown op: )listr   r[   r\   runsendStopIterationr3   
Suggestionr   tsKeyErrorr)   op_find_recipient
candidatesr/   r   nbytes_find_droppermaxr4   )r9   r<   Z
policy_genrS   Z
suggestionpending_replpending_dropr%   r%   r&   rY      sL    


  
  
 z*ActiveMemoryManagerExtension._run_policiesr   set[WorkerState] | Nonezset[WorkerState]WorkerState | None)rd   rh   rl   r@   c                   s   | ddd fdd}j dkr8|dj   dS jrJ|d	 dS |dkr`| jj }n|| jj@ }|s||d
 dS |j8 }|s|d dS ||8 }|s|d dS t|| jjd}t	
d | |S )a  Choose a worker to acquire a new replica of an in-memory task among a set of
        candidates. If candidates is None, default to all workers in the cluster.
        Regardless, workers that either already hold a replica or are scheduled to
        receive one at the end of this AMM iteration are not considered.

        Returns
        -------
        The worker with the lowest memory usage (downstream of pending replications and
        drops), or None if no eligible candidates are available.
        r   rH   msgr@   c                   s   t d |  d S )Nz (replicate, %s, %s) rejected: %stask_loggerr\   rq   orig_candidatesrd   r%   r&   
log_reject   s       z@ActiveMemoryManagerExtension._find_recipient.<locals>.log_rejectr1   zts.state = Ntask is an actorzno running candidatesz$all candidates already own a replicaz-already pending replication on all candidateskeyz&(replicate, %s, %s): replicating to %s)stateactorr   rC   r-   who_hasminr   __getitem__rs   r\   )r9   rd   rh   rl   rw   choicer%   ru   r&   rg      s<    

   z,ActiveMemoryManagerExtension._find_recipient)rd   rh   rm   r@   c                   s*  | ddd fdd}t jt | dk r<|d dS jrN|d	 dS |dkrbj }n|j@ }|s||d
 dS ||8 }|s|d dS |dd jD @ }||8 }|s|d dS t|fddd}t |dkr|jtjkr|rt	dd |D r|d dS t
d | |S )aL  Choose a worker to drop its replica of an in-memory task among a set of
        candidates. If candidates is None, default to all workers in the cluster.
        Regardless, workers that either do not hold a replica or are already scheduled
        to drop theirs at the end of this AMM iteration are not considered.
        This method also ensures that a key will not lose its last replica.

        Returns
        -------
        The worker with the highest memory usage (downstream of pending replications and
        drops), or None if no eligible candidates are available.
        r   rH   rp   c                   s   t d |  d S )Nz(drop, %s, %s) rejected: %srr   rt   ru   r%   r&   rw   1  s    z>ActiveMemoryManagerExtension._find_dropper.<locals>.log_reject   zless than 2 replicas existNrx   z3no candidates suggested by the policy own a replicaz&already pending drop on all candidatesc                 S  s   h | ]
}|j qS r%   Zprocessing_on)r#   Z	waiter_tsr%   r%   r&   r'   L  s    z=ActiveMemoryManagerExtension._find_dropper.<locals>.<setcomp>z=all candidates have dependent tasks queued or running on themc                   s   | j tjk j|  fS N)statusr   rC   r   )rS   rN   r%   r&   <lambda>Y      z<ActiveMemoryManagerExtension._find_dropper.<locals>.<lambda>ry      c                 s  s   | ]}|j tjkV  qd S r   )r   r   rC   rR   r%   r%   r&   	<genexpr>n  s   z=ActiveMemoryManagerExtension._find_dropper.<locals>.<genexpr>zCthere is only one replica on workers that aren't paused or retiringz (drop, %s, %s): dropping from %s)lenr}   r|   r-   waitersrk   r   r   rC   allrs   r\   )r9   rd   rh   rm   rw   Z%candidates_with_dependents_processingr   r%   )rv   r9   rd   r&   rj     sd    



	   z*ActiveMemoryManagerExtension._find_dropperc           
      C  s6  t dt| j | jj}tt}tt}| j D ]~\}\}}|j	sHq4|rZ|j	| sZt
|D ]&}|rt||j	kstt
|| |j q^|D ]&}|r||j	kst
|| |j qq4dt  }| D ].\}}	t d|t|	 | jj|j|	|d q| D ]0\}}	t d|t|	 | jj|j|	|d q dS )zIterate through self.pending, which was filled by self._run_policies(), and
        push the suggestions to the workers through bulk comms. Return immediately.
        z"Enacting suggestions for %d tasks:zactive_memory_manager-z- %s to acquire %d replicas)stimulus_idz- %s to drop %d replicasN)r[   r\   r   r   r   validater   r_   itemsr}   rD   appendrz   r   Zrequest_acquire_replicasaddressZrequest_remove_replicas)
r9   r   Zdrop_by_workerZrepl_by_workerrd   rl   rm   rS   r   keysr%   r%   r&   rZ   }  s@        z/ActiveMemoryManagerExtension._enact_suggestions)N)__name__
__module____qualname____doc____annotations__r=   r(   r   rA   propertyrC   r0   r   rB   rY   rg   rj   rZ   r%   r%   r%   r&   r   !   s4   
 :09_r   c                   @  s*   e Zd ZU ded< ded< dZded< dS )rc   zLiteral[('replicate', 'drop')]rf   r   rd   Nrn   rh   )r   r   r   r   rh   r%   r%   r%   r&   rc     s   
rc   )	TypeAliasro   r   SuggestionGeneratorc                   @  s@   e Zd ZU dZdZded< ddddZejd	dd
dZ	dS )rO   zAbstract parent class)rQ   r   rQ   r   rI   c                 C  s   | j j dS )Nz())	__class__r   rN   r%   r%   r&   __repr__  s    z"ActiveMemoryManagerPolicy.__repr__r   c                 C  s   dS )ao  This method is invoked by the ActiveMemoryManager every few seconds, or
        whenever the user invokes ``client.amm.run_once``.

        It is an iterator that must emit
        :class:`~distributed.active_memory_manager.Suggestion` objects:

        - ``Suggestion("replicate", <TaskState>)``
        - ``Suggestion("replicate", <TaskState>, {subset of potential workers to replicate to})``
        - ``Suggeston("drop", <TaskState>)``
        - ``Suggestion("drop", <TaskState>, {subset of potential workers to drop from})``

        Each element yielded indicates the desire to create or destroy a single replica
        of a key. If a subset of workers is not provided, it defaults to all workers on
        the cluster. Either the ActiveMemoryManager or the Worker may later decide to
        disregard the request, e.g. because it would delete the last copy of a key or
        because the key is currently needed on that worker.

        You may optionally retrieve which worker it was decided the key will be
        replicated to or dropped from, as follows:

        .. code-block:: python

           choice = (yield Suggestion("replicate", ts))

        ``choice`` is either a WorkerState or None; the latter is returned if the
        ActiveMemoryManager chose to disregard the request.

        The current pending (accepted) suggestions can be inspected on
        ``self.manager.pending``; this includes the suggestions previously yielded by
        this same method.

        The current memory usage on each worker, *downstream of all pending
        suggestions*, can be inspected on ``self.manager.workers_memory``.
        Nr%   rN   r%   r%   r&   r`     s    zActiveMemoryManagerPolicy.runN)
r   r   r   r   	__slots__r   r   abcabstractmethodr`   r%   r%   r%   r&   rO     s   
rO   c                   @  sp   e Zd ZU dZded< ddddZddd	d
dZddddZddddZddddZ	ddddZ
dS )AMMClientProxyzConvenience accessors to operate the AMM from the dask client

    Usage: ``client.amm.start()`` etc.

    All methods are asynchronous if the client is asynchronous and synchronous if the
    client is synchronous.
    r   _client)clientc                 C  s
   || _ d S r   )r   )r9   r   r%   r%   r&   r=     s    zAMMClientProxy.__init__r   r   r>   c                 C  s   | j j| j jj|dS )z8Remotely invoke ActiveMemoryManagerExtension.amm_handler)r?   )r   syncr   r(   )r9   r?   r%   r%   r&   _run  s    zAMMClientProxy._runrI   c                 C  s
   |  dS )Nr   r   rN   r%   r%   r&   r     s    zAMMClientProxy.startc                 C  s
   |  dS )NrA   r   rN   r%   r%   r&   rA     s    zAMMClientProxy.stopc                 C  s
   |  dS )NrB   r   rN   r%   r%   r&   rB     s    zAMMClientProxy.run_oncec                 C  s
   |  dS )NrC   r   rN   r%   r%   r&   rC     s    zAMMClientProxy.runningN)r   r   r   r   r   r=   r   r   rA   rB   rC   r%   r%   r%   r&   r     s   
r   c                   @  s   e Zd ZdZddddZdS )ReduceReplicaszrMake sure that in-memory tasks are not replicated on more workers than desired;
    drop the excess replicas.
    r   rI   c           
      c  s   d}d}| j jjD ]}d}tdd |jD }t|jt|| }|| j jkrr| j j| \}}|t|t| 7 }|dkr|d7 }||7 }t|D ]}	t	d|V  qq|rt
d|| d S )Nr   r   c                 S  s   h | ]}|j p|qS r%   r   )r#   waiterr%   r%   r&   r'     s     z%ReduceReplicas.run.<locals>.<setcomp>r^   z<ReduceReplicas: Dropping %d superfluous replicas of %d tasks)rQ   r   Zreplicated_tasksr   r   r}   rk   r   rangerc   r[   r\   )
r9   ZnkeysZndroprd   Zdesired_replicasZnwaitersZ	ndrop_keyrl   rm   r    r%   r%   r&   r`     s(    zReduceReplicas.runN)r   r   r   r   r`   r%   r%   r%   r&   r     s   r   c                   @  s^   e Zd ZU dZdZded< ded< dddd	Zdd
ddZdd
ddZdd
ddZ	dS )RetireWorkera(
  Replicate somewhere else all unique in-memory tasks on a worker, preparing for
    its shutdown.

    At any given time, the AMM may have registered multiple instances of this policy,
    one for each worker currently being retired - meaning that most of the time no
    instances will be registered at all. For this reason, this policy doesn't figure in
    the dask config (:file:`distributed.yaml`). Instances are added by
    :meth:`distributed.Scheduler.retire_workers` and automatically remove themselves
    once the worker has been retired. If the AMM is disabled in the dask config,
    :meth:`~distributed.Scheduler.retire_workers` will start a temporary ad-hoc one.

    **Failure condition**

    There may not be any suitable workers to receive the tasks from the retiring worker.
    This happens in two use cases:

    1. This is the only worker in the cluster, or
    2. All workers are either paused or being retired at the same time

    In either case, this policy will fail to move out all keys and set the
    ``no_recipients`` boolean to True. :meth:`~distributed.Scheduler.retire_workers`
    will abort the retirement.

    There is a third use case, where a task fails to be replicated away for whatever
    reason, e.g. because its recipient is unresponsive but the Scheduler doesn't know
    yet. In this case we'll just wait for the next AMM iteration and try again (possibly
    with a different receiving worker, e.g. if the receiving worker was hung but not yet
    declared dead).

    **Retiring a worker with spilled tasks**

    On its very first iteration, this policy suggests that other workers should fetch
    all unique in-memory tasks of the retiring worker. Frequently, this means that in
    the next few moments the retiring worker will be bombarded by
    :meth:`distributed.worker.Worker.get_data` calls from the rest of the cluster. This
    can be a problem if most of the managed memory of the worker has been spilled out,
    as it could send the worker above its terminate threshold. Two measures are in place
    in order to prevent this:

    - At every iteration, this policy drops all tasks on the retiring worker that have
      already been replicated somewhere else. This makes room for further tasks to be
      moved out of the spill file in order to be replicated onto another worker.
    - Once the worker passes the ``pause`` threshold,
      :meth:`~distributed.worker.Worker.get_data` throttles the number of outgoing
      connections to 1.

    Parameters
    ==========
    address: str
        URI of the worker to be retired
    r   no_recipientsr   r   r   r   r   c                 C  s   || _ d| _d S )NFr   )r9   r   r%   r%   r&   r=   b  s    zRetireWorker.__init__rI   c                 C  s   d| j dS )NzRetireWorker()r   rN   r%   r%   r&   r   f  s    zRetireWorker.__repr__r   c              	   c  s  | j jj| j}|dkr8td|  | j j|  dS |j	rvt
d| j dt|j	 d d| _| j j|  dS d}d}td| |jD ]}|jrqt|jd	krtd
||hV }|rq|j| j jj@ rq|d	7 }zt| j j| d }W n tk
r   d}Y nX |std|V }|s|d	7 }q|rbd| _t
d| j d| d | j j|  nD|rtd| j d| d n"td| j d | j j|  dS ) Nz/Removing policy %s: Worker no longer in clusterzTried retiring worker z, but it holds actor(s) z6, which can't be moved.The worker will not be retired.Tr   zRetiring %sr   r^   Fr]   z, but zk tasks could not be moved as there are no suitable workers to receive them. The worker will not be retired.zRetiring worker z; z keys are being moved away.z'; no unique keys need to be moved away.)rQ   r   rW   r,   r   r[   r\   r   removeZactorswarningr)   r   has_whatr|   r   r}   rc   rC   r   r   re   info)r9   rS   ZnreplZnno_recrd   Zdrop_wsZhas_pending_replZrec_wsr%   r%   r&   r`   i  s^    


zRetireWorker.runc                 C  sB   | | j jkrdS | j jj| j}|dkr.dS tdd |jD S )zCReturn True if it is safe to close the worker down; False otherwiseTNc                 s  s   | ]}t |jd kV  qdS )r   N)r   r}   )r#   rd   r%   r%   r&   r     s     z$RetireWorker.done.<locals>.<genexpr>)rQ   r   r   rW   r,   r   r   r   )r9   rS   r%   r%   r&   done  s    zRetireWorker.doneN)
r   r   r   r   r   r   r=   r   r`   r   r%   r%   r%   r&   r   (  s   
4\r   )/r   
__future__r   r   loggingcollectionsr   collections.abcr   typingr   r   r   r   r*   Z
dask.utilsr	   Zdistributed.compatibilityr
   Zdistributed.corer   Zdistributed.metricsr   Zdistributed.utilsr   r   Zdistributed.clientr   Zdistributed.schedulerr   r   r   	getLoggerr   r[   rs   r   rc   Ztyping_extensionsr   r   r   ABCrO   r   r   r   r%   r%   r%   r&   <module>   s:   
   2%