U
    /e6u                    @  s  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlmZmZ d dlmZmZmZmZmZmZmZmZmZ d dlm Z  d dl!m"Z" d dl#m$Z$ d dl%m&Z&m'Z'm(Z(m)Z)m*Z*m+Z+m,Z, d dl-Z-d d	l.m/Z/m0Z0 d d
l1m2Z2m3Z3m4Z4m5Z5m6Z6m7Z7m8Z8m9Z9m:Z: d dl;m<Z< d dl=Z=d dl>m?Z? d dl@mAZAmBZBmCZCmDZDmEZEmFZF d dlGmHZH d dlImJZJmKZKmLZL d dlImMZN d dlOmPZP d dlQmRZRmSZS d dlTmUZU d dlVmWZW d dlXmYZYmZZZm[Z[m\Z\m]Z]m^Z^ d dl_m`Z` d dlambZb d dlcmdZdmeZemfZfmgZg d dlhmiZi d dljmkZkmlZl d dlmmnZn d dlompZp d dlqmrZr d dlsmtZtmuZu d dlvmwZw d d lxmyZy d d!lzm{Z{ d d"l|m}Z}m~Z~ d d#lmZmZ d d$lmZ d d%lmZ d d&lmZ d d'lmZ d d(lmZ d d)lmZ d d*lmZ d d+lmZ d d,lmZmZmZmZmZmZmZmZmZ d d-lmZmZmZ d d.lmZmZ d d/lmZ e&rd d0lmZ e)d1 Zd2ed3< d4Zd2ed5< d6Zd2ed7< d8Zd2ed9< neZeZeZeZd:d;d<d=d>d?d@dAhZdBedC< e	eZe=jdDZeDe=jdEZdFZereweeeeeeeneReieedGZG dHdI dIZG dJdK dKZG dLdM dMZejG dNdO dOZG dPdQ dQZG dRdS dSZG dTdU dUZG dVdW dWZG dXdY dYe*ZG dZd[ d[ZG d\d] d]eeyZdWd^d_d`daZdWd6d_dbdcZdWdddedfdgdhdidjZdWdkd_dldmZdMdkdndodpZdqdrdsdkdtdudvZdwdxdydzd{ZdMdxdwd|d}d~ZdMdxdd|ddZG dd de΃ZG dd dekZG dd dekZdS )    )annotationsN)defaultdictdeque)	Callable
Collection	ContainerHashableIterableIteratorMappingSequenceSet)suppress)partial)Number)TYPE_CHECKINGAnyClassVarLiteral
NamedTuplecastoverload)
SortedDict	SortedSet)	firstgroupbymergemerge_sorted
merge_with	partitionplucksecondvalmap)IOLoop)HighLevelGraph)format_bytesformat_time	key_splitparse_bytesparse_timedeltatmpfile)get_template)cluster_dump
preloadingprofileversions)scheduler_story)ActiveMemoryManagerExtensionRetireWorker)BatchedSend)HeapSet)CommCommClosedErrorget_address_hostnormalize_addressresolve_addressunparse_host_port)addresses_from_user_args)PeriodicCallback)Statusclean_exceptionrpc	send_recv)MemorySamplerExtension)SchedulerPlugin_get_plugin_name)EventExtension)get_handlers)LockExtension)	monotonictime)MultiLockExtension)
ServerNode)setproctitle)dumpsloads)
Serialized	serialize)PublishExtension)PubSubSchedulerExtension)QueueExtension)ReplayTaskScheduler)Security)SemaphoreExtension)ShuffleSchedulerExtension)WorkStealing)	AllTimeoutErrorempty_contextget_fileno_limitkey_split_group
log_errors
no_defaultrecursive_to_dictvalidate_key)gather_from_workersretry_operationscatter_to_workers)disable_gc_diagnosisenable_gc_diagnosis)VariableExtension)	TypeAlias)releasedwaiting	no-workerqueued
processingmemoryerred	forgottenrh   TaskStateStatezdict[str, TaskStateState]Recszdict[str, list[dict[str, Any]]]Msgsztuple[Recs, Msgs, Msgs]RecsMsgsri   rj   rk   rl   rm   rn   ro   rp   zset[TaskStateState]ALL_TASK_STATESzdistributed.admin.pdb-on-errz'distributed.scheduler.default-data-sizez<stimulus_id unset>)locksZmulti_lockspublishzreplay-tasksZqueues	variablesZpubsubZ
semaphoreseventsammZmemory_samplershufflestealingc                   @  s   e Zd ZU dZded< ded< ded< ded	< d
ed< eeZdddddddZddddZdddddZ	ddddZ
ddddZddd d!d"d#d$ZdS )%ClientStatez3A simple object holding information about a client.str
client_keyint_hashset[TaskState]
wants_whatfloat	last_seendict[str, Any]r0   Nr/   dict[str, Any] | None)clientr0   c                C  s.   || _ t|| _t | _t | _|p&i | _d S N)r   hashr   setr   rI   r   r0   )selfr   r0    r   9/tmp/pip-unpacked-wheel-g426oqom/distributed/scheduler.py__init__   s
    
zClientState.__init__returnc                 C  s   | j S r   r   r   r   r   r   __hash__   s    zClientState.__hash__objectboolotherr   c                 C  s   t |tsdS | j|jkS )NF)
isinstancer}   r   r   r   r   r   r   __eq__   s    
zClientState.__eq__c                 C  s   d| j dS )Nz<Client >r   r   r   r   r   __repr__   s    zClientState.__repr__c                 C  s   | j S r   r   r   r   r   r   __str__   s    zClientState.__str__r   excludeContainer[str]dictr   r   c                C  s   t | t|dhB ddS Dictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        TaskState._to_dict
        r0   Tr   membersr`   r   r   r   r   r   r   _to_dict_no_nest   s
    
zClientState._to_dict_no_nest)__name__
__module____qualname____doc____annotations__tuple	__slots__r   r   r   r   r   r   r   r   r   r   r}      s   
r}   c                   @  s   e Zd ZU dZded< ded< ded< ded< eeZddddddd	Zed d d
ddZ	e
ddddZe
ddddZe
ddddZe
ddddZe
ddddZe
ddddZddddZdddd d!d"d#Zd$S )%MemoryStateaA  Memory readings on a worker or on the whole cluster.

    See :doc:`worker-memory`.

    Attributes / properties:

    managed_total
        Sum of the output of sizeof() for all dask keys held by the worker in memory,
        plus number of bytes spilled to disk

    managed
        Sum of the output of sizeof() for the dask keys held in RAM. Note that this may
        be inaccurate, which may cause inaccurate unmanaged memory (see below).

    spilled
        Number of bytes  for the dask keys spilled to the hard drive.
        Note that this is the size on disk; size in memory may be different due to
        compression and inaccuracies in sizeof(). In other words, given the same keys,
        'managed' will change depending on the keys being in memory or spilled.

    process
        Total RSS memory measured by the OS on the worker process.
        This is always exactly equal to managed + unmanaged.

    unmanaged
        process - managed. This is the sum of

        - Python interpreter and modules
        - global variables
        - memory temporarily allocated by the dask tasks that are currently running
        - memory fragmentation
        - memory leaks
        - memory not yet garbage collected
        - memory not yet free()'d by the Python memory manager to the OS

    unmanaged_old
        Minimum of the 'unmanaged' measures over the last
        ``distributed.memory.recent-to-old-time`` seconds

    unmanaged_recent
        unmanaged - unmanaged_old; in other words process memory that has been recently
        allocated but is not accounted for by dask; hopefully it's mostly a temporary
        spike.

    optimistic
        managed + unmanaged_old; in other words the memory held long-term by
        the process under the hopeful assumption that all unmanaged_recent memory is a
        temporary spike
    r   processunmanaged_oldmanagedspilledr   r   r   r   c                C  s0   || _ t| j || _|| _t||| j | _d S r   )r   minr   r   r   )r   r   r   r   r   r   r   r   r   2  s    zMemoryState.__init__)infosr   c                  G  sR   d}d}d}d}| D ],}||j 7 }||j7 }||j7 }||j7 }qt||||dS )Nr   r   )r   r   r   r   r   )r   r   r   r   r   msr   r   r   sumD  s    


zMemoryState.sumr   c                 C  s   | j | j S r   )r   r   r   r   r   r   managed_totalV  s    zMemoryState.managed_totalc                 C  s   | j | j S r   )r   r   r   r   r   r   	unmanagedZ  s    zMemoryState.unmanagedc                 C  s   | j | j | j S r   )r   r   r   r   r   r   r   unmanaged_recent_  s    zMemoryState.unmanaged_recentc                 C  s   | j | j S r   )r   r   r   r   r   r   
optimisticd  s    zMemoryState.optimisticc                 C  s   t dt | jS )Nz-managed_in_memory has been renamed to managed)warningswarnFutureWarningr   r   r   r   r   managed_in_memoryh  s    zMemoryState.managed_in_memoryc                 C  s   t dt | jS )Nz+managed_spilled has been renamed to spilled)r   r   r   r   r   r   r   r   managed_spilledm  s    zMemoryState.managed_spilledr~   c                 C  sB   dt | j dt | j dt | j dt | j dt | j dS )NzProcess memory (RSS)  : z
  - managed by Dask   : z
  - unmanaged (old)   : z
  - unmanaged (recent): z
Spilled to disk       : 
)r%   r   r   r   r   r   r   r   r   r   r   r  s    @zMemoryState.__repr__r   r   r   r   r   c                  s    fddt  D S )zDictionary representation for debugging purposes.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        c                   s*   i | ]"}| d s|dkr|t |qS )_>   r   r   r   )
startswithgetattr.0kr   r   r   
<dictcomp>  s
   
 z(MemoryState._to_dict.<locals>.<dictcomp>)dirr   r   r   r   _to_dict{  s    
zMemoryState._to_dictN)r   r   r   r   r   r   r   r   staticmethodr   propertyr   r   r   r   r   r   r   r   r   r   r   r   r      s.   
2	r   c                   @  s  e Zd ZU dZded< ded< ded< ded< ded	< ded
< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded < ded!< d"ed#< d$ed%< d$ed&< ded'< ded(< d)ed*< d+ed,< ded-< d.ed/< d0ed1< eeZd2d3d3d3d3d4dddd5dddddd6d7d7d8d9d:d;Zdd<d=d>Zd5d?d@dAdBZ	e
dCd<dDdEZe
dd<dFdGZe
dHd<dIdJZd d<dKdLZdd<dMdNZdd<dOdPZdd<dQdRZdSdTdUddVdWdXZe
dYd<dZd[Zd\d]d^d_d`Zd\d]d^dadbZd\d]d^dcddZd\d]d^dedfZd\d]d^dgdhZd\d]d^didjZd\d]d^dkdlZd\d]d^dmdnZe
dd<dodpZd3S )qWorkerStatezA simple object holding information about a worker.

    Not to be confused with :class:`distributed.worker_state_machine.WorkerState`.
    r~   addressr   pidr   namenthreadsmemory_limitlocal_directorydict[str, int]servicesr   r0   nannyr>   statusr   nbytes_memory_unmanaged_oldzdeque[tuple[float, int]]_memory_unmanaged_historymetricsr   r   
time_delay	bandwidthr   actorszdict[TaskState, None]	_has_whatrm   long_runningzdict[TaskState, float]	executingdict[str, float]	resourcesused_resourcesextra	server_idz"weakref.ref[SchedulerState] | Nonescheduler_refdefaultdict[str, int]task_prefix_count_network_occfloat | None_occupancy_cachezdict[TaskState, int]
needs_whatr   N)r   r   r0   r   	schedulerr   zdict[str, int] | Noner   zSchedulerState | None)r   r   r   r   r   r   r   r   r   r   r0   r   r   c                C  s   |	| _ || _|| _|| _|| _|| _|| _|
p0i | _|p:i | _|| _	|| _
t| j | _d| _d| _t | _i | _d| _d| _ttjd| _t | _i | _t | _t | _i | _i | _i | _ |pi | _!|rt"#|nd | _$t%t&| _'i | _(d| _)d | _*d S )Nr   distributed.scheduler.bandwidth)+r   r   r   r   r   r   r   r   r0   r   r   r   r   r   r   r   r   r   r   r   r(   daskconfiggetr   r   r   r   rm   r   r   r   r   r   weakrefrefr   r   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r0   r   r   r   r   r   r     s@    



zWorkerState.__init__r   c                 C  s   | j S r   r   r   r   r   r   r   +  s    zWorkerState.__hash__r   r   c                 C  s   t |to|j| jkS r   )r   r   r   r   r   r   r   r   .  s    zWorkerState.__eq__zSet[TaskState]c                 C  s
   | j  S )aq  An insertion-sorted set-like of tasks which currently reside on this worker.
        All the tasks here are in the "memory" state.
        This is the reverse mapping of :attr:`TaskState.who_has`.

        This is a read-only public accessor. The data is implemented as a dict without
        values, because rebalance() relies on dicts being insertion-sorted.
        )r   keysr   r   r   r   has_what1  s    	zWorkerState.has_whatc                 C  s
   t | jS r   )r8   r   r   r   r   r   host<  s    zWorkerState.hostr   c                 C  s8   t | jd td| j| jd d  | jd d | jdS )a
  Polished memory metrics for the worker.

        **Design note on managed memory**

        There are two measures available for managed memory:

        - ``self.nbytes``
        - ``self.metrics["managed_bytes"]``

        At rest, the two numbers must be identical. However, ``self.nbytes`` is
        immediately updated through the batched comms as soon as each task lands in
        memory on the worker; ``self.metrics["managed_bytes"]`` instead is updated by
        the heartbeat, which can lag several seconds behind.

        Below we are mixing likely newer managed memory info from ``self.nbytes`` with
        process and spilled memory from the heartbeat. This is deliberate, so that
        managed memory total is updated more frequently.

        Managed memory directly and immediately contributes to optimistic memory, which
        is in turn used in Active Memory Manager heuristics (at the moment of writing;
        more uses will likely be added in the future). So it's important to have it
        up to date; much more than it is for process memory.

        Having up-to-date managed memory info as soon as the scheduler learns about
        task completion also substantially simplifies unit tests.

        The flip side of this design is that it may cause some noise in the
        unmanaged_recent measure. e.g.:

        1. Delete 100MB of managed data
        2. The updated managed memory reaches the scheduler faster than the
           updated process memory
        3. There's a blip where the scheduler thinks that there's a sudden 100MB
           increase in unmanaged_recent, since process memory hasn't changed but managed
           memory has decreased by 100MB
        4. When the heartbeat arrives, process memory goes down and so does the
           unmanaged_recent.

        This is OK - one of the main reasons for the unmanaged_recent / unmanaged_old
        split is exactly to concentrate all the noise in unmanaged_recent and exclude it
        from optimistic memory, which is used for heuristics.

        Something that is less OK, but also less frequent, is that the sudden deletion
        of spilled keys will cause a negative blip in managed memory:

        1. Delete 100MB of spilled data
        2. The updated managed memory *total* reaches the scheduler faster than the
           updated spilled portion
        3. This causes the managed memory to temporarily plummet and be replaced by
           unmanaged_recent, while spilled memory remains unaltered
        4. When the heartbeat arrives, managed goes back up, unmanaged_recent
           goes back down, and spilled goes down by 100MB as it should have to
           begin with.

        https://github.com/dask/distributed/issues/6002 will let us solve this.
        rn   r   spilled_bytesdisk)r   r   r   r   )r   r   maxr   r   r   r   r   r   rn   @  s    :zWorkerState.memoryc                 C  sV   t | j| j| j| j| j| j| j| j| j	| j
| jd}| j|_dd | j D |_|S )zEReturn a version of this object that is appropriate for serialization)r   r   r   r   r   r   r   r   r   r   r   c                 S  s   i | ]\}}|j |qS r   key)r   tsdurationr   r   r   r     s     z%WorkerState.clean.<locals>.<dictcomp>)r   r   r   r   r   r   r   r   r   r   r   r   	occupancyr   r   itemsr   wsr   r   r   clean  s$    zWorkerState.cleanc              
   C  sP   | j | jkrd| j  nd}d| j| d| jj  dt| j dt| j d
S )Nz, name:  z<WorkerState z
, status: z
, memory: z, processing: r   )r   r   r   lenr   rm   r   r   r   r   r   r     s    2zWorkerState.__repr__c                 C  s$   t dj| j| j| jj| j| jdS )Nzworker_state.html.j2)r   r   r   r   rm   )r+   renderr   r   r   r   rm   r   r   r   r   _repr_html_  s    zWorkerState._repr_html_c                 C  s@   d| j | j| j| j| j | j| j| j| j| j| j	j | j
d| jS )NZWorker)typeidr   r   r   r   r   r   r   r   r   r   r   )r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   identity  s     zWorkerState.identityr   r   r   r   c                C  s   t | t|dhB ddS r   r   r   r   r   r   r     s
    
zWorkerState._to_dict_no_nestSchedulerStatec                 C  s   | j s
t|   }|st|S r   )r   AssertionError)r   sr   r   r   r     s    
zWorkerState.scheduler	TaskStateNoner  r   c                 C  sv   | j jr|| jkst|j}| j|j  d7  < | j j|j  d7  < | j| |j	D ]}| |j
krX| | qXdS )z)Assign a task to this worker for compute.   N)r   validaterm   r  prefixr   r   _task_prefix_count_globaladddependencieswho_has_inc_needs_replica)r   r  tpdtsr   r   r   add_to_processing  s    

zWorkerState.add_to_processingc                 C  s>   | j jr$|| jkst|| jks$t| | | j| d S r   )r   r  rm   r  r   _remove_from_task_prefix_countr  r   r  r   r   r   add_to_long_running  s
    
zWorkerState.add_to_long_runningc                 C  sh   | j jr|| jkst|| jkr.| j| n
| | | j| |jD ]}|| j	krJ| 
| qJdS )z'Remove a task from a workers processingN)r   r  rm   r  r   discardr"  remover  r   _dec_needs_replica)r   r  r   r   r   r   remove_from_processing  s    



z"WorkerState.remove_from_processingc                 C  sn   | j |jj d }|r&|| j |jj< n| j |jj= | jj|jj d }|r\|| jj|jj< n| jj|jj= d S Nr  )r   r  r   r   r  )r   r  countr   r   r   r"    s    z*WorkerState._remove_from_task_prefix_countc                 C  s\   | j jr2| |jkst|| jks$t|| jks2t|  j| 8  _| j|= |j	|  dS )z)The worker no longer has a task in memoryN)
r   r  r  r  r   r   r   
get_nbytesr   r&  r#  r   r   r   remove_replica  s    zWorkerState.remove_replicac                 C  sv   | j jr$| |jkst|| jks$t|| jkr`d| j|< | }|  j|7  _| j  j|7  _n| j|  d7  < dS )zAAssign a task fetch to this worker and update network occupanciesr  N)	r   r  r  r  r   r   r+  r   _network_occ_globalr   r  r   r   r   r   r    s    

zWorkerState._inc_needs_replicac                 C  sh   | j jr|| jkst| j|  d8  < | j| dkrd| j|= | }|  j|8  _| j  j|8  _d S )Nr  r   )r   r  r   r  r+  r   r-  r.  r   r   r   r'    s    zWorkerState._dec_needs_replicac                 C  s   | j jr$| |jkst|| jks$t| }|| jkr\| j|= |  j|8  _| j  j|8  _|j	|  |  j
|7  _
d| j|< dS )z%The worker acquired a replica of taskN)r   r  r  r  r   r+  r   r   r-  r  r   r   r.  r   r   r   add_replica)  s    
zWorkerState.add_replicac                 C  s   | j p| j| j| jS r   )r   r   _calc_occupancyr   r   r   r   r   r   r  8  s     zWorkerState.occupancy)r   r   r   r   r   r   r   r   r   r   r   r   r   rn   r  r   r  r  r   r   r!  r$  r(  r"  r,  r  r'  r/  r  r   r   r   r   r     s   
	*2
@		r   c                   @  s:   e Zd ZU dZded< ded< ded< ded	< ded
< dS )	ErredTaskzLightweight representation of an erred task without any dependency information
    or runspec.

    See also
    --------
    TaskState
    r   r  r   	timestampset[str]erred_onr~   exception_texttraceback_textNr   r   r   r   r   r   r   r   r   r1  ?  s   
r1  c                   @  s   e Zd ZU dZded< ded< ded< ded	< eeZd
d ZeddddZ	eddddZ
ddddZddddZdS )ComputationzCollection tracking a single compute or persist call

    See also
    --------
    TaskPrefix
    TaskGroup
    TaskState
    r   startset[TaskGroup]groupsr   codez	uuid.UUIDr  c                 C  s&   t  | _t | _t | _t | _d S r   )	rI   r9  r   r;  r   r<  uuidZuuid4r  r   r   r   r   r   a  s    zComputation.__init__r   c                 C  s"   | j rtdd | j D S dS d S )Nc                 s  s   | ]}|j V  qd S r   )stopr   tgr   r   r   	<genexpr>j  s     z#Computation.stop.<locals>.<genexpr>)r;  r   r   r   r   r   r>  g  s    zComputation.stopdict[TaskStateState, int]c                 C  s   t tdd | jD S )Nc                 s  s   | ]}|j V  qd S r   statesr?  r   r   r   rA  p  s     z%Computation.states.<locals>.<genexpr>r   r   r;  r   r   r   r   rE  n  s    zComputation.statesr~   c                 C  s4   d| j  dd ddd t| j D  d S )Nz<Computation : zTasks: , c                 s  s"   | ]\}}|rd ||f V  qdS z%s: %dNr   r   r   vr   r   r   rA  v  s     z'Computation.__repr__.<locals>.<genexpr>r   )r  joinsortedrE  r  r   r   r   r   r   r  s    
zComputation.__repr__c                 C  s&   t dj| j| j| j| j| j| jdS )Nzcomputation.html.j2)r  r9  r>  r;  rE  r<  )r+   r  r  r9  r>  r;  rE  r<  r   r   r   r   r  |  s    zComputation._repr_html_N)r   r   r   r   r   r   r   r   r   r>  rE  r   r  r   r   r   r   r8  P  s   
	
r8  c                   @  s  e Zd ZU dZded< ded< ded< ded	< ded
< ded< ded< eeZddddZdddddZdddddddZ	e
ddddZe
ddddZe
dddd Zddd!d"Ze
ddd#d$Zddd%d&Ze
ddd'd(Ze
d)dd*d+Zd,S )-
TaskPrefixzCollection tracking all tasks within a group

    Keys often have a structure like ``("x-123", 0)``
    A group takes the first section, like ``"x"``

    See Also
    --------
    TaskGroup
    r~   r   r   duration_averager   
suspiciousdefaultdict[str, float]all_durationsmax_exec_timezlist[TaskGroup]r;  r   state_countsr   c                 C  s^   || _ g | _tt| _tt| _tj	d}| j |krHt
|| j  | _nd| _d| _d| _d S )Nz,distributed.scheduler.default-task-durationsrB  r   )r   r;  r   r   rR  r   rT  r   r   r   r)   rO  rS  rP  )r   r   Ztask_durationsr   r   r   r     s    


zTaskPrefix.__init__r  )r  r   c                 C  s&   t || j| _|d| j kr"d| _d S )N   rB  )r   rS  rO  )r   r  r   r   r   add_exec_time  s    zTaskPrefix.add_exec_timeactionr9  r>  r   c                 C  sN   || }| j |  |7  < |dkrJ| j}|dk r8|| _nd| d|  | _d S )Ncomputer         ?)rR  rO  )r   rY  r9  r>  r  oldr   r   r   add_duration  s    zTaskPrefix.add_durationr   r   c                 C  s   t tdd | jD S )zpThe number of tasks in each state,
        like ``{"memory": 10, "processing": 3, "released": 4, ...}``
        c                 S  s   g | ]
}|j qS r   rD  r?  r   r   r   
<listcomp>  s     z%TaskPrefix.states.<locals>.<listcomp>rF  r   r   r   r   rE    s    zTaskPrefix.statesc                 C  s   dd | j D S )Nc                 S  s(   g | ] }t d d |j D r|qS )c                 s  s"   | ]\}}|d ko|dkV  qdS )rp   r   Nr   rJ  r   r   r   rA    s     z/TaskPrefix.active.<locals>.<listcomp>.<genexpr>)anyrE  r  r?  r   r   r   r^    s   z%TaskPrefix.active.<locals>.<listcomp>r;  r   r   r   r   active  s    zTaskPrefix.activec                 C  s   t tdd | jD S )Nc                 S  s   g | ]
}|j qS r   rD  r?  r   r   r   r^    s     z,TaskPrefix.active_states.<locals>.<listcomp>)r   r   ra  r   r   r   r   active_states  s    zTaskPrefix.active_statesc                 C  s0   d| j  d ddd t| j D  d S )N<rG  rH  c                 s  s"   | ]\}}|rd ||f V  qdS rI  r   rJ  r   r   r   rA    s     z&TaskPrefix.__repr__.<locals>.<genexpr>r   r   rL  rM  rE  r  r   r   r   r   r     s    
zTaskPrefix.__repr__c                 C  s   t dd | jD S )Nc                 s  s   | ]}|j V  qd S r   )nbytes_totalr?  r   r   r   rA    s     z*TaskPrefix.nbytes_total.<locals>.<genexpr>r   r;  r   r   r   r   re    s    zTaskPrefix.nbytes_totalc                 C  s   t tt| jS r   )r   mapr
  r;  r   r   r   r   __len__  s    zTaskPrefix.__len__c                 C  s   t dd | jD S )Nc                 s  s   | ]}|j V  qd S r   )r  r?  r   r   r   rA    s     z&TaskPrefix.duration.<locals>.<genexpr>rf  r   r   r   r   r    s    zTaskPrefix.durationr3  c                 C  s   dd | j D S )Nc                 S  s   h | ]}|j D ]}|qqS r   )types)r   r@  typr   r   r   	<setcomp>  s       z#TaskPrefix.types.<locals>.<setcomp>r`  r   r   r   r   ri    s    zTaskPrefix.typesN)r   r   r   r   r   r   r   r   rW  r]  r   rE  ra  rb  r   re  rh  r  ri  r   r   r   r   rN    s4   

rN  c                   @  s   e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< d
ed< d
ed< ded< eeZddddZdd
d
ddddZdddd d!Z	dd"d#d$Z
dd"d%d&Zd'd(d)d*d+d,d-Zd.S )/	TaskGroupzCollection tracking all tasks within a group

    Keys often have a structure like ``("x-123", 0)``
    A group takes the first section, like ``"x-123"``

    See also
    --------
    TaskPrefix
    r~   r   rC  rE  r:  r  r   re  r   r  r3  ri  WorkerState | Nonelast_workerlast_worker_tasks_leftzTaskPrefix | Noner  r9  r>  rQ  rR  rU  c                 C  s\   || _ d | _ttd| _t | _d| _d| _	t | _
d| _d| _tt| _d | _d| _d S )Nr           )r   r  r   fromkeysru   rE  r   r  re  r  ri  r9  r>  r   r   rR  rn  ro  r  r   r   r   r   #  s    
zTaskGroup.__init__r  rX  c                 C  sn   || }| j |  |7  < |dkr>| j|k r2|| _| jp:|| _|  j|7  _| jd k	sZt| j||| d S )NrZ  )rR  r>  r9  r  r  r  r]  )r   rY  r9  r>  r  r   r   r   r]  1  s    
zTaskGroup.add_durationr  r   c                 C  s   | j |j  d7  < | |_d S r)  )rE  stategroupr   r   r   r   r  <  s    zTaskGroup.addr   c                 C  s4   d| j p
d d ddd t| j D  d S )Nrc  zno-grouprG  rH  c                 s  s"   | ]\}}|rd ||f V  qdS rI  r   rJ  r   r   r   rA  E  s     z%TaskGroup.__repr__.<locals>.<genexpr>r   rd  r   r   r   r   r   @  s    
zTaskGroup.__repr__c                 C  s   t | j S r   )r   rE  valuesr   r   r   r   rh  K  s    zTaskGroup.__len__r   r   r   r   r   c                C  s   t | |ddS )r   Tr   r`   r   r   r   r   r   N  s    
zTaskGroup._to_dict_no_nestN)r   r   r   r   r   r   r   r   r]  r  r   rh  r   r   r   r   r   rl    s(   
rl  c                   @  s  e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded < ded!< d"ed#< d$ed%< ded&< d$ed'< d$ed(< d)ed*< ded+< ded,< d-ed.< ded/< d0ed1< d0ed2< ded3< d4Zd5ed6< eeZe	 Z
d7ed8< ddd
d9d:d;Zdd<d=d>Zddd?d@dAZed
d<dBdCZejd
dDdEdFdCZd dDd?dGdHZdd<dIdJZddDdKdLdMZdd<dNdOZdd<dPdQZdDd<dRdSZdd<dTdUZdVdWdXd0dYdZd[Zd4S )\r  zA simple object holding information about a task.

    Not to be confused with :class:`distributed.worker_state_machine.TaskState`, which
    holds similar information on the Worker side.
    r~   r  rN  r  r   run_specztuple[int, ...]priorityrq   _stater   r  
dependentsr   has_lost_dependencies
waiting_onwaiterszset[ClientState]	who_wantsset[WorkerState]r  rm  processing_onr   retriesr   r  Serialized | None	exception	tracebackr5  r6  TaskState | Noneexception_blamer3  r4  rP  host_restrictionsworker_restrictionsr   resource_restrictionsloose_restrictionsactorrl  rs  	group_keyr   metadatar   r   Nr   __weakref__z$ClassVar[weakref.WeakSet[TaskState]]
_instances)r  rv  rr  c                 C  s   || _ t|| _|| _|| _d | _d | _d | _d| _d| _	d| _
d| _d| _d | _t | _t | _t | _t | _t | _t | _d | _d| _d | _d | _i | _d| _d| _d | _d | _t|| _d | _ i | _!i | _"t | _#t$j%&|  d S )Nr	  r   rB  F)'r  r   r   rv  rx  r  r  r  r5  r6  rP  r  r   rw  r   r}  r  ry  r{  r|  r  r  rz  r  r  r  r  r  r  r  r]   r  rs  r  r   r4  r  r  r  )r   r  rv  rr  r   r   r   r   +  sD    

zTaskState.__init__r   c                 C  s   | j S r   r   r   r   r   r   r   O  s    zTaskState.__hash__r   c                 C  s   t |to| j|jkS r   r   r  r  r   r   r   r   r   R  s    zTaskState.__eq__c                 C  s   | j S )a1  This task's current state.  Valid states are ``released``, ``waiting``,
        ``no-worker``, ``processing``, ``memory``, ``erred`` and ``forgotten``.  If it
        is ``forgotten``, the task isn't stored in the ``tasks`` dictionary anymore and
        will probably disappear soon from memory.
        )rx  r   r   r   r   rr  U  s    zTaskState.stater  )valuer   c                 C  sH   | j j| j  d8  < | j j|  d7  < || _| jj|  d7  < d S r)  )rs  rE  rx  r  rT  )r   r  r   r   r   rr  ^  s    c                 C  s,   | j | | jj |j |j|  dS )z-Add another task as a dependency of this taskN)r  r  rs  ry  r   r   r   r   add_dependencye  s    zTaskState.add_dependencyc                 C  s   | j dkr| j S tS Nr   )r   DEFAULT_DATA_SIZEr   r   r   r   r+  k  s    zTaskState.get_nbytes)r   r   c                 C  sN   |}| j }|dkr||8 }| j j|7  _| jD ]}| j |7  _ q0|| _ d S r  )r   rs  re  r  )r   r   ZdiffZ
old_nbytesr  r   r   r   
set_nbytesn  s    
zTaskState.set_nbytesc                 C  s   d| j d| j dS )Nz<TaskState  r   )r  rx  r   r   r   r   r   x  s    zTaskState.__repr__c                 C  s   t dj| j| j| jdS )Nztask_state.html.j2)rr  r   r  )r+   r  rr  r   r  r   r   r   r   r  {  s
    zTaskState._repr_html_c              
   C  s   z| j D ] }t|tstt|| j fq| jD ] }t|ts0tt|| jfq0| jD ] }t|tsXtt|| jfqX| j	D ] }t|tstt|| j	fqt
|  W n@ tk
r } z"t| trdd l}|  W 5 d }~X Y nX d S r  )r}  r   r}   r  reprr  r   r  r  ry  validate_task_state	Exceptionloggerr  LOG_PDBpdb	set_trace)r   csr  r  er  r   r   r   r    s    




zTaskState.validatec                 C  s   t dd | jD S )Nc                 s  s   | ]}|  V  qd S r   r+  r   r  r   r   r   rA    s     z,TaskState.get_nbytes_deps.<locals>.<genexpr>)r   r  r   r   r   r   get_nbytes_deps  s    zTaskState.get_nbytes_depsr   r   r   r   c                C  s   t | |ddS )a  Dictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict

        Notes
        -----
        This class uses ``_to_dict_no_nest`` instead of ``_to_dict``.
        When a task references another task, or when a WorkerState.tasks contains tasks,
        this method is not executed for the inner task, even if the inner task was never
        seen before; you get a repr instead. All tasks should neatly appear under
        Scheduler.tasks. This also prevents a RecursionError during particularly heavy
        loads, which have been observed to happen whenever there's an acyclic dependency
        chain of ~200+ tasks.
        Tr   ru  r   r   r   r   r     s    zTaskState._to_dict_no_nest)r   r   r   r   r   r  r   r   r   WeakSetr  r   r   r   r   rr  setterr  r+  r  r   r  r  r  r   r   r   r   r   r  [  sh   
	
			$
r  c                   @  sB   e Zd ZU dZded< ded< ded< ded< ded	< d
ed< dS )
Transitionz1An entry in :attr:`SchedulerState.transition_log`r~   r  rq   r9  finishrr   recommendationsstimulus_idr   r2  Nr7  r   r   r   r   r    s   
r  c                   @  s  e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded < ded!< d"ed#< d$ed%< d&ed'< ded(< d)ed*< d+ed,< d-ed.< ded/< ded0< d1ed2< d3ed4< d5ed6< d5ed7< d5ed8< d9ed:< d5ed;< d5ed<< d5ed=< d5ed>< eeZddddAd
ddddddBd1dCdDdEdFZedGdHdIdJZ	eddHdKdLZ
dd9dNdOdPdQdRdSdTZdUdHdVdWZed5dHdXdYZdZd5d5d[d\d]Zd9dOd9dCd^d_d`daZdbdcdcd9dUdddedfZd9d9d^dgdhdiZd9d9d^dgdjdkZdQdldmdndoZdldHdpdqZdQdldmdrdsZd9d9d^dgdtduZdMdMdMdvd9d9dwdxdyd9dCd^dzd{d|ZdMdMdMdMd}d9d9dwdxdyd9d~dCd^d	ddZd@dd9d9dd^dddZd9d9d^dgddZd9d9d^dgddZd9d9d^dgddZd9d9d^dgddZdMdMdMdMdMdd9d9d9dydddydydCd^d
ddZd9d9d^dgddZd9d9d^dgddZ d9d9d^dgddZ!d9d9d^dgddZ"d9d9d^dgddZ#d9dUdddZ$d9d9d^dgddZ%d9d9d^dgddZ&eeee!e ee"e#eeeeee&e%eeedZ'ded< dddddZ(dQddmddZ)ddd5dUdddZ*dd5dddddZ+dQdd5dddZ,dQd5dmddZ-dQddmddZ.dQddUdddZ/dQddUdddZ0dd9dÜddńZ1dQdddddȄZ2dQddUdddʄZ3dQddUddd̄Z4dQdUdmdd΄Z5ddbdϜddфZ6dQdUdmddӄZ7dQddcdddՄZ8dQdldmddׄZ9ddQddbdcdxdydUd؜ddڄZ:dQdbdUdۜdd݄Z;dQdbdcd9dUdޜddZ<dddbdUdddZ=ddQd5ddddZ>dMS )r  a  Underlying task state of dynamic scheduler

    Tracks the current state of workers, data, and computations.

    Handles transitions between different task states. Notifies the
    Scheduler of changes by messaging passing through Queues, which the
    Scheduler listens to responds accordingly.

    All events are handled quickly, in linear time with respect to their
    input (which is often of constant size) and generally within a
    millisecond.

    Users typically do not interact with ``Transitions`` directly. Instead
    users interact with the ``Client``, which in turn engages the
    ``Scheduler`` affecting different transitions here under-the-hood. In
    the background ``Worker``s also engage with the ``Scheduler``
    affecting these state transitions as well.
    r   r   dict[str, ClientState]clientsr   
extensionszdict[str, SchedulerPlugin]pluginszdict[str, dict[str, Any]]	host_infor   r  dict[str, WorkerState]workerszdict[Hashable, str]aliasesr~  runningidleidle_task_count	saturatedtotal_nthreadszdict[str, dict[str, float]]r   n_tasksdict[str, TaskState]taskszHeapSet[TaskState]rl   r   
unrunnablereplicated_taskszdict[str, set[TaskState]]unknown_durationszdict[str, TaskGroup]task_groupszdict[str, TaskPrefix]task_prefixestask_metadatazdeque[Computation]computationszdeque[ErredTask]erred_taskszdeque[Transition]transition_logtransition_counter_idle_transition_counterzint | Literal[False]transition_counter_maxr   r  r   r-  UNKNOWN_TASK_DURATIONMEMORY_RECENT_TO_OLD_TIMEr~   MEMORY_REBALANCE_MEASUREMEMORY_REBALANCE_SENDER_MINMEMORY_REBALANCE_RECIPIENT_MAXMEMORY_REBALANCE_HALF_GAPWORKER_SATURATIONr   FzSortedDict[str, WorkerState]zIterable[SchedulerPlugin]r   )r  r  r  r  r   r  r  rl   r  r  r  kwargsc                 K  s  t d || _ttjd| _|| _t	d| jd< i | _
|| _t | _t | _d| _|| _t | _|| _dd | j D | _ttjdd| _ttjd	d| _i | _i | _i | _d| _i | _|| _|| _|	| _ || _!t"t#| _$d
| _%dd | j! D | _&|
si ndd |
D | _'ttjdd| _(d| _)d| _*|| _+t,tjd| _-t,tjd| _.tjd| _/tjd| _0tjd| _1tjdd | _2tjd| _3| j3dkrt4j5| _3t6| j3t#t7fr| j3dkrt8dt9| j3 d S )NzState startr   fire-and-forgetr   c                 S  s   h | ]}t |jd kr|qS r  )r
  r  r  r   r   r   rk  Z  s     z*SchedulerState.__init__.<locals>.<setcomp>z0distributed.diagnostics.computations.max-historymaxlenz/distributed.diagnostics.erred-tasks.max-historyrp  c                 S  s   h | ]}|j tjkr|qS r   r   r>   r  r   r  r   r   r   rk  n  s     c                 S  s   i | ]}t ||qS r   )rD   r   pr   r   r   r   q  s      z+SchedulerState.__init__.<locals>.<dictcomp>z+distributed.scheduler.transition-log-lengthz+distributed.scheduler.unknown-task-durationz,distributed.worker.memory.recent-to-old-timez+distributed.worker.memory.rebalance.measurez.distributed.worker.memory.rebalance.sender-minz1distributed.worker.memory.rebalance.recipient-maxz8distributed.worker.memory.rebalance.sender-recipient-gap       @z'distributed.scheduler.worker-saturationinfzC`distributed.scheduler.worker-saturation` must be a float > 0; got ):r  infor  r(   r   r   r   r   r  r}   r  r  r   r  r   r  r  r   r  r  rt  r  r   r  r  r  r  r  r  r  rl   r  r  r  r   r   r  r-  r  r  r  r  r  r  r)   r  r  r  r  r  r  r  mathr  r   r   
ValueErrorr  )r   r  r  r  r  r   r  r  rl   r  r  r  r  r   r   r   r   >  s    







zSchedulerState.__init__r   r   c                 C  s   t jdd | j D  S )Nc                 s  s   | ]}|j V  qd S r   )rn   r   wr   r   r   rA    s     z(SchedulerState.memory.<locals>.<genexpr>)r   r   r  rt  r   r   r   r   rn     s    zSchedulerState.memoryc                 C  sR   | j | j| j| j| j| j| j| j| j| j	| j
| j| j| j| j| j| j| j| jdS )Nr   r   r  r  rl   r  r  r  r  r  r  r  total_occupancyr  r  r  r  r  r  r  r   r   r   r   	__pdict__  s(    zSchedulerState.__pdict__Nr   rq   zComputation | Noner  )r  specrr  computationr   c           
      C  s   t |||}t|}| j|}|dkr:t| | j|< }||_|j}| j|}	|	dkrt| | j|< }	|r||j	
|	 ||	_|j	|	 |	
| || j|< |S )z(Create a new task, and associated statesN)r  r'   r  r   rN  r  r  r  rl  r;  r  appendr  )
r   r  r  rr  r  r  Z
prefix_keyr  r  r@  r   r   r   new_task  s"    

zSchedulerState.new_taskr  c                 C  s@   t d | j| j| j| j| j| j| j| j	fD ]}|
  q.d S )NzClear task state)r  debugr  r  r  r  r  r  r  r  clear)r   Z
collectionr   r   r   _clear_task_state  s    

z SchedulerState._clear_task_statec                 C  s   |  | j| jS r   )r0  r  r-  r   r   r   r   r    s    zSchedulerState.total_occupancyr   )r   network_occr   c           	      C  s   d}|  D ]T\}}| j| }|d k	s*t|j}|dk rT|jdkrNd|j }n| j}||| 7 }q||| j  }|dkst||S )Nrp  r   rV  )r  r  r  rO  rS  r  r   )	r   r   r  resZprefix_namer*  r  r  occr   r   r   r0    s    

zSchedulerState._calc_occupancyrt   r  r  r  r  r   c                 K  s  zJ| j |}|dkr$i i i fW S |j}||kr>i i i fW S |  jd7  _| jrb| j| jk sbti }i }i }	| jrt|j}
t|j	}| j
||f}|dk	r|| ||f|\}}	}nDd||fkr|rt|||f| |d|\}}}|||}| j
d|f }|| ||\}}}|| | D ]\}}|	|g | q,| D ]\}}||g | qR|| | D ]\}}|	|g | q| D ]\}}||g | qd}n2td| d| d|d|d|d	| | |st}|j}| jt|||||t  | jr\|tkrDtd
td||||t| | jr|jdkr|
|_||_	|| j |j< t| j D ]B}z|j|||f| W n$ tk
r   tj ddd Y nX q|j!dkr| j |j= |j"}|j!dkrB|j#| j$krBt%dd |j& D rB|j'j()| | j$|j#= ||	|fW S  tk
r   t*d||| t+rddl,}|-   Y nX dS )a#  Transition a key from its current state to the finish state

        Examples
        --------
        >>> self._transition('x', 'waiting')
        {'x': 'processing'}, {}, {}

        Returns
        -------
        Tuple of:

        - Dictionary of recommendations for future transitions {key: new state}
        - Messages to clients {client address: [msg, msg, ...]}
        - Messages to workers {worker address: [msg, msg, ...]}

        See Also
        --------
        Scheduler.transitions : transitive version of this function
        Nr  ri   zImpossible transition from z to z for z: stimulus_id=z	, kwargs=, story=z/stimulus_id not set during Scheduler transitionz5Transitioned %r %s->%s (actual: %s).  Consequence: %srp   Plugin failed with exceptionTexc_infoc                 s  s"   | ]\}}|d kp|dkV  qdS )r   rp   Nr   rJ  r   r   r   rA    s     z-SchedulerState._transition.<locals>.<genexpr>z$Error transitioning %r from %r to %rr   ).r  r   rx  r  r  r  r  r   ry  r  _TRANSITIONS_TABLE_transitionupdater  
setdefaultextendRuntimeErrorstorySTIMULUS_ID_UNSETr  r  r  rI   r  r  r  r   r  listrt  
transitionr  r  rr  rs  r   r  allrE  r  r;  r&  r  r  r  r  )r   r  r  r  r  r  r9  r  worker_msgsclient_msgsry  r  funcZa_recsZa_cmsgsZa_wmsgsrK  Zb_recsZb_cmsgsZb_wmsgscnew_msgsr  Zactual_finishpluginr@  r  r   r   r   r    s    

    


,     


zSchedulerState._transitionrr   rs   )r  r  r  r  r   c                 C  s   t  }| }|r| \}}|| | |||\}}	}
|| |	 D ]\}}||g | qN|
 D ]\}}||g | qrq| j	rt
t| }|D ]}|| qdS zProcess transitions until none are left

        This includes feedback from previous transitions and continues until we
        reach a steady state
        N)r   copypopitemr  r  r  r  r  r  r  r   	Schedulerra   )r   r  r  r  r  r   r  r  Znew_recsZ	new_cmsgsZ	new_wmsgsr  r  r  r   r   r   r   _transitions  s    


zSchedulerState._transitionsr  r  r   c                 C  s   | j | }| jrR|jst|jr$t|jr.t|jr8t|jD ]}|jdks>tq>|j	rf|dii i fS d|_i }|jD ]8}|js|j
| |jdkrd||j< qv|j
| qvdd |jD |_|jsd||< |i i fS )N>   rp   ro   rp   rj   ri   c                 S  s   h | ]}|j d kr|qS rj   )rr  r   r   r   r   r   rk    s     
 z=SchedulerState.transition_released_waiting.<locals>.<setcomp>rm   )r  r  rv  r  r{  r  r  r  rr  rz  r  r  r|  ry  )r   r  r  r  r   r  r   r   r   transition_released_waiting  s,    







z*SchedulerState.transition_released_waitingc                 C  sf   | j | }i }| jr6|jr(td| || jks6t| | }r\| j| | ||}i i |fS )Nz Actors can't be in `no-worker`: )r  r  r  r  r  decide_worker_non_rootishr%  _add_to_processing)r   r  r  r  r  r  r   r   r   transition_no_worker_processing  s    
z.SchedulerState.transition_no_worker_processingrm  r  c                 C  s  | j rt| jst| jr&| j n| j}|s4dS |j}|j	}|rn|j
rn|jtjkrn| j|j|krn|}n0t|t| j|d}tt|| j |j |_
|jd |jd  dkr|nd|_	| j
d8  _
| j r|dk	r| j|j|kst|| jkst|| jf|S )aV  Pick a worker for a runnable root-ish task, without queuing.

        This attempts to schedule sibling tasks on the same worker, reducing future data
        transfer. It does not consider the location of dependencies, since they'll end
        up on every worker anyway.

        It assumes it's being called on a batch of tasks in priority order, and
        maintains state in `SchedulerState.last_root_worker` and
        `SchedulerState.last_root_worker_tasks_left` to achieve this.

        This will send every runnable task to a worker, often causing root task
        overproduction.

        Returns
        -------
        ws: WorkerState | None
            The worker to assign the task to. If there are no workers in the cluster,
            returns None, in which case the task should be transitioned to
            ``no-worker``.
        Nr   ri   rj   r  )r  r  isinfr  r  r  rt  r  rs  rn  ro  r   r>   r  r   r   r   r   worker_objectivefloorr
  r  r   rE  )r   r  poolr@  Zlwsr  r   r   r   &decide_worker_rootish_queuing_disabled  s6    
z5SchedulerState.decide_worker_rootish_queuing_disabledc                 C  s   | j rt| jrt| js dS t| jdd d}| j rpt|| jrXt|t|| jf|| j	kspt|| j	f| j r|dk	r| j
|j|kst|| j	kst|| j	f|S )al  Pick a worker for a runnable root-ish task, if not all are busy.

        Picks the least-busy worker out of the ``idle`` workers (idle workers have fewer
        tasks running than threads, as set by ``distributed.scheduler.worker-saturation``).
        It does not consider the location of dependencies, since they'll end up on every
        worker anyway.

        If all workers are full, returns None, meaning the task should transition to
        ``queued``. The scheduler will wait to send it to a worker until a thread opens
        up. This ensures that downstream tasks always run before new root tasks are
        started.

        This does not try to schedule sibling tasks on the same worker; in fact, it
        usually does the opposite. Even though this increases subsequent data transfer,
        it typically reduces overall memory use by eliminating root task overproduction.

        Returns
        -------
        ws: WorkerState | None
            The worker to assign the task to. If there are no idle workers, returns
            None, in which case the task should be transitioned to ``queued``.

        Nc                 S  s   t | j| j S r   )r
  rm   r   r  r   r   r   <lambda>C      zFSchedulerState.decide_worker_rootish_queuing_enabled.<locals>.<lambda>r   )r  r  r
  r  r  r  r   _worker_full_task_slots_availabler  r  r   r   r  r   r   r   %decide_worker_rootish_queuing_enabled  s$    
z4SchedulerState.decide_worker_rootish_queuing_enabledc           
      C  sH  | j s
dS | |}|dkr@t| j t| jk r@| j s:dS | j }|jsN|dk	rht|| j |t| j|}n| jpr| j}t	d|
 }t|}|dk rt|tdd}|st|jdkr| j| }t|D ]&}||| |  }	|	jdkr|	} qqn|| j|  }| jrD|dk	rD| j|j|ks*t|| j ksDt|| j f|S )aa  Pick a worker for a runnable non-root task, considering dependencies and
        restrictions.

        Out of eligible workers holding dependencies of ``ts``, selects the worker
        where, considering worker backlong and data-transfer costs, the task is
        estimated to start running the soonest.

        Returns
        -------
        ws: WorkerState | None
            The worker to assign the task to. If no workers satisfy the restrictions of
            ``ts`` or there are no running workers, returns None, in which case the task
            should be transitioned to ``no-worker``.
        NzSequence[WorkerState]   r  r   r   )r  valid_workersr
  r  r  decide_workerr   r  r  r   rt  r   operator
attrgetterr  r  r  ranger  r   r   )
r   r  r  r  Zworker_poolZwp_vals	n_workersr9  iZwp_ir   r   r   r  R  s@    




z(SchedulerState.decide_worker_non_rootishc                 C  s   | j | }| |r^t| jr@| | }s\|jdii i fS q||   }s||jdii i fS n| | }s||jdii i fS | 	||}i i |fS )zPossibly schedule a ready task. This is the primary dispatch for ready tasks.

        If there's no appropriate worker for the task (but the task is otherwise
        runnable), it will be recommended to ``no-worker`` or ``queued``.
        rk   rl   )
r  
is_rootishr  r
  r  r  r  r  r  r  )r   r  r  r  r  r  r   r   r   transition_waiting_processing  s    

z,SchedulerState.transition_waiting_processing)r   r  typename
int | Nonezbytes | None
str | None)r  r  r   r  r  workerr  r   c          	      K  s<   | j | }| jr2|jrt|js$t|jdks2ti i i fS )aF  This transition exclusively happens in a race condition where the scheduler
        believes that the only copy of a dependency task has just been lost, so it
        transitions all dependents back to waiting, but actually a replica has already
        been acquired by a worker computing the dependency - the scheduler just doesn't
        know yet - and the execution finishes before the cancellation message from the
        scheduler has a chance to reach the worker. Shortly, the cancellation request
        will reach the worker, thus deleting the data from memory.
        rj   )r  r  r  r  r{  rr  )	r   r  r  r   r  r  r"  r  r  r   r   r   transition_waiting_memory  s    


z(SchedulerState.transition_waiting_memory)r   r  r  
startstopszlist[dict] | None)	r  r  r   r  r  r"  r$  r  r   c                K  s  | j | }	|stt|ts t| jr|	js0t|	j}
|
s>t|	|
jksLt~
|	jrXt|	jrlt|	|	jf|	j	rvt|	j
dkst| j|}|d kr|dii i fS ||	jkr|	jsttd|	jd| d|	j d|d| |	 
|r|D ]"}|	jj|d |d	 |d
 d q| j|	jjt }| jd}|r\|D ]}|jrB|| qB|d k	rp|	| | |	 i }i }| j|	|||||d | jr|	jrt|	jrt||i fS )Nrm   ri   zTask z2 transitioned from processing to memory on worker z, while it was expected from z). This should be impossible. stimulus_id=r  r>  r9  rY  )r>  r9  rY  r|   )r  r  )r  r  r   r~   r  r  rm   r{  r  r  rr  r  r   r  r  r  rs  r]  r  popr  r   r   r  Zrecalculate_costr  _exit_processing_common_add_to_memory)r   r  r  r   r  r  r"  r$  r  r  wssr  Z	startstopr  stealZttsr  r  r   r   r   transition_processing_memory  sf    





*	


     z+SchedulerState.transition_processing_memory)safe)r  r  r+  r   c                C  sx  | j | }| jr2|jrt|jr$t|r2|jr2t|jr~|jD ]}|j	| q>|j
r~||_tttd |_|jdii i fS i }i }i }d|g|d}	|jD ]}|	g||j< q| | d|_d|d}
|j
D ]}|
g||j< q|jsd||< n*|jrd||< n|j
s|jrd	||< |jD ]6}|jd
kr>d	||j< n|jd	kr"|j| q"| jrn|jrnt|||fS )NzWorker holding Actor was lostro   	free-keysopr   r  ri   z	lost-datar.  r  rp   rj   rk   rm   )r  r  r{  r  r  r|  r  r  r   r%  r}  r  rO   rP   r  r  r  r   remove_all_replicasrr  r   rv  rz  r  )r   r  r  r+  r  r  r  r  r  Z
worker_msg
report_msgr  r   r   r   r   transition_memory_released	  sV    












z)SchedulerState.transition_memory_releasedc           
   	   C  s   | j | }i }i }| jrVttd. |js.t|jr8t|jrBt|jrLtW 5 Q R X |j}|sdt|j	D ]}||_|jsjd||j
< qjd||j|jd}|jD ]}	|g||	j< qd|_||i fS )Nr  ro   
task-erredr.  r  r  r  )r  r  r^   r  r  r  r  r{  r|  ry  r  r  r  r}  r   rr  )
r   r  r  r  r  r  
failing_tsr   r2  r  r   r   r   transition_released_erredV	  s0    





z(SchedulerState.transition_released_erredc              	   C  s   | j | }i }i }i }| jrZttd. |js2t|jr<t|jrFt|jrPtW 5 Q R X d |_	d |_d |_
|jD ]}|jdkrrd||j< qrd|g|d}|jD ]}	|g||	< q|j  d|d}
|jD ]}|
g||j< qd|_|||fS )	Nr4  ro   rj   r,  r-  ztask-retriedr/  ri   )r  r  r^   r  r  r  r  r{  r|  r  r  ry  rr  r  r4  r  r}  r   )r   r  r  r  r  r  r  r   Zw_msgZws_addrr2  r  r   r   r   transition_erred_releasedx	  s8    









z(SchedulerState.transition_erred_releasedc                 C  s   | j | }i }| jr(|jrt|jr(t|jD ]0}||jkr.|j| |js.|js.d||j	< q.|j
  d|_|jrd||< n&|js|js|jrd||< n
|j  |i i fS Nri   rp   rj   )r  r  r  r  r  r  r|  r%  r}  r  r{  r  rr  rz  r  )r   r  r  r  r  r   r   r   r   transition_waiting_released	  s$    








z*SchedulerState.transition_waiting_releasedc                 C  s~   | j | }i }i }| jrD|js"t|jr,t|jr6t|jdksDt| |}|rhd|g|dg||j< | 	|| |i |fS )Nrm   r,  r-  )
r  r  r  r  r  r{  rr  r&  r   _propagate_releasedr   r  r  r  r  r  r  r   r   r   transition_processing_released	  s"    





z-SchedulerState.transition_processing_released)causer  r  r5  r6  r  )
r  r  r"  r?  r  r  r5  r6  r  r   c                K  s  | j | }
i }i }| jrD|s&|
js&t|
js0t|
jr:t|
jrDt|
jrd|
j}|sXt|j	|
 | 
|
 |
j| |dk	r||
_||
_|dk	r||
_||
_|dk	r| j | }||
_n|
j}| jt|
jt |
j |pd|pd |
jD ]}||_d||j< q|
jD ],}|j|
 |js|jsd||j< q|
j  d|
_d||j|jd}|
jD ]}|g||j< qd| jd }|
|j kr| j!||g|d | jr|
jrt||i fS )	a2  Processed a recommended transition processing -> erred.

        Parameters
        ----------
        key
           Key of the task to transition
        stimulus_id
            ID of the stimulus causing the transition
        worker
            Address of the worker where the task erred.
            Not necessarily ``ts.processing_on``.
        cause
            Address of the task that caused this task to be transitioned to erred
        exception
            Exception caused by the task
        traceback
            Traceback caused by the task
        exception_text
            String representation of the exception
        traceback_text
            String representation of the traceback

        Returns
        -------
        Recommendations, client messages and worker messages to process
        Nr	  ro   ri   r5  r6  r  r  r   r  )"r  r  r  r  r  r  r{  r  r   r&  r&  r4  r  r  r5  r  r6  r  
appendleftr1  r  rI   r  ry  r  r|  r%  r}  r  rr  r   r  r   _client_releases_keys)r   r  r  r"  r?  r  r  r5  r6  r  r  r  r  r  r7  r   r2  r  r   r   r   transition_processing_erred	  sv    '











z*SchedulerState.transition_processing_erredc                 C  sv   | j | }| jr8| j | jdks$t|jr.t|jr8t| j| d|_|jD ]}|j	
| qP|j	  i i i fS )Nrk   ri   )r  r  rr  r  r  r{  r  r&  r  r|  r%  r  )r   r  r  r  r   r   r   r   transition_no_worker_releasedB
  s    




z,SchedulerState.transition_no_worker_releasedc                 C  sJ   | j | }| jr.| jr$t|| jf| | d|_| j| i i i fS )Nrl   )r  r  r  r  _validate_readyrr  rl   r  r   r  r  r  r   r   r   transition_waiting_queuedT
  s    

z(SchedulerState.transition_waiting_queuedc                 C  s6   | j | }| jr| | d|_| j| i i i fS )Nrk   )r  r  rE  rr  r  r  rF  r   r   r   transition_waiting_no_worker`
  s    

z+SchedulerState.transition_waiting_no_workerc                 C  sN   | j | }| jr(|| jkst|jr(t| j| i }| || |i i fS r   )r  r  rl   r  r  r&  r<  )r   r  r  r  r  r   r   r   transition_queued_releasedk
  s    

z)SchedulerState.transition_queued_releasedc                 C  sh   | j | }i }i }| jr:|jr,td| || jks:t|   }r^| j| | ||}|i |fS )NzActors can't be queued: )r  r  r  r  rl   r  r%  r  r=  r   r   r   transition_queued_processingx
  s    
z+SchedulerState.transition_queued_processing)r  r   c                 C  sr   | j |}|jdkst| j| |jD ]}|j| q,|j	  d |_
d  |_ |_|_| j|d  d S )Nrp   )r  r%  rr  r  r  r%  r}  r   r&  r  r  r  r  r  r  )r   r  r  r  r   r   r   _remove_key
  s    

zSchedulerState._remove_keyc                 C  s   | j | }| jr`|jdkst|jr(t|jr2t|js:n&|jrBn|jsV|j	sV|j
sVn
td||jr~|jD ]}|j| qli }i }| |||| t|}| | |||fS )Nrn   Unreachable)r  r  rr  r  r  r{  rv  rz  r}  r|  ry  r  r  r   r%  _propagate_forgotten_task_to_client_msgsrK  )r   r  r  r  r  r  r  r  r   r   r   transition_memory_forgotten
  s*    





z*SchedulerState.transition_memory_forgottenc                 C  s   | j | }| jr|jdkst|jr(t|jr2t|| jks@t|jrTt||jf|js\n*|j	rdn"|j
sx|jsx|jsxntdt|i }i }| |||| t|}| | |||fS )Nri   ro   rL  )r  r  rr  r  r  r  rl   r{  rv  rz  r}  r|  ry  r~   rM  rN  rK  )r   r  r  r  r  r  r  r   r   r   transition_released_forgotten
  s(    



z,SchedulerState.transition_released_forgotten))ri   rj   )rj   ri   )rj   rm   )rj   rk   )rj   rl   )rj   rn   )rl   ri   )rl   rm   )rm   ri   )rm   rn   )rm   ro   )rk   ri   r0  )ri   rp   )rn   rp   )ro   ri   )rn   ri   rP  zQClassVar[Mapping[tuple[TaskStateState, TaskStateState], Callable[..., RecsMsgs]]]r  zstr | TaskStatelist[Transition])keys_or_tasks_or_stimulir   c                 G  s   dd |D }t || jS )zEGet all transitions that touch one of the input keys or stimulus_id'sc                 S  s    h | ]}t |tr|jn|qS r   r  r   r  r   r   r   rk  
  s   z'SchedulerState.story.<locals>.<setcomp>)r1   r  )r   rS  keys_or_stimulir   r   r   r  
  s    zSchedulerState.storyc                 C  sP   |j s|js|jrdS |j}t|| jd koNt|jdk oNttt|jdk S )z
        Whether ``ts`` is a root or root-like task.

        Root-ish tasks are part of a group that's much larger than the cluster,
        and have few or no dependencies.
        FrV     )	r  r  r  rs  r
  r  r  r   rg  )r   r  r@  r   r   r   r  
  s    zSchedulerState.is_rootish      r   )r  r  r   c                 C  s  | j dks|jtjkrdS |dk r(|j}t|j}| j}| j}|	| | 
|||rn|jtjkr|||j< n^||jd |j}||kr|||  ||  }d|  k rd| j| j   krn n
|| t|| js|jtjkr| j| n| j	| dS )a  Update the status of the idle and saturated state

        The scheduler keeps track of workers that are ..

        -  Saturated: have enough work to stay busy
        -  Idle: do not have enough work to stay busy

        They are considered saturated if they both have enough tasks to occupy
        all of their threads, and if the expected runtime of those tasks is
        large enough.

        If ``distributed.scheduler.worker-saturation`` is not ``inf``
        (scheduler-side queuing is enabled), they are considered idle
        if they have fewer tasks processing than the ``worker-saturation``
        threshold dictates.

        Otherwise, they are considered idle if they have fewer tasks processing
        than threads, or if their tasks' total expected runtime is less than half
        the expected runtime of the same number of average tasks.

        This is useful for load balancing and adaptivity.
        r   Ng?gffffff?)r  r   r>   closedr  r
  rm   r  r  r%  is_unoccupiedr  r   r%  r   r  r  r  r  r  )r   r  r  r  r  r  Zncpendingr   r   r   check_idle_saturated  s*    

$
z#SchedulerState.check_idle_saturated)r  r  nprocessingr   c                 C  s&   |j }||k p$||| j| j  d k S NrV  )r   r  r  )r   r  r  r\  r   r   r   r   rY  @  s    zSchedulerState.is_unoccupied)r  r  r   c                   sX   dt |j t  jk r. fdd|jD }n|j j}tdd |D }|| j S )zo
        Get the estimated communication cost (in s.) to compute the task
        on the given worker.
        
   c                   s   h | ]}| j kr|qS r   r   r   depr  r   r   rk  U  s     
 z/SchedulerState.get_comm_cost.<locals>.<setcomp>c                 s  s   | ]}|j V  qd S r   r   r  r   r   r   rA  X  s     z/SchedulerState.get_comm_cost.<locals>.<genexpr>)r
  r  r   
differencer   r   )r   r  r  depsr   r   r  r   get_comm_costI  s
    zSchedulerState.get_comm_costc                 C  sP   |j j}|dkr|S | j|j j}|dkr@t  | j|j j< }|| | jS )aK  Get the estimated computation cost of the given task (not including
        any communication cost).

        If no data has been observed, value of
        `distributed.scheduler.default-task-durations` are used. If none is set
        for this task, `distributed.scheduler.unknown-task-duration` is used
        instead.
        r   N)r  rO  r  r   r   r   r  r  )r   r  r  r  r   r   r   get_task_duration[  s    	
z SchedulerState.get_task_durationset[WorkerState] | Nonec                   s|  d}|j r fdd|j D }|jr fdd|jD }g }|D ]&} j|}|dk	r@||d  q@|rvtj| nt }|dkr|}n||O }|jr,i }|j D ]\\}	}
 j	|	}|dkri   j	|	< }t }| D ]\}}||
kr|
| q|||	< qtj|  }|dkr$|}n||M }|dkr:dS |sFt S  fdd|D }t jt jk rx| jM }|S )a  Return set of currently valid workers for key

        If all workers are valid then this returns ``None``, in which case
        any *running* worker can be used.
        Otherwise, the subset of running workers valid for this task
        is returned.
        This checks tracks the following state:

        *  worker_restrictions
        *  host_restrictions
        *  resource_restrictions
        Nc                   s   h | ]}| j kr|qS r   r  r   addrr   r   r   rk  ~  s     
 z/SchedulerState.valid_workers.<locals>.<setcomp>c                   s   g | ]}  |qS r   )coerce_hostname)r   hr   r   r   r^    s     z0SchedulerState.valid_workers.<locals>.<listcomp>	addressesc                   s   h | ]} j | qS r   rh  ri  r   r   r   rk    s     )r  r  r  r   r  r   unionr  r  r   r  intersectionrt  r
  r  r  )r   r  r  hrslrl  dhssZdwresourcerequireddrswrj  ZsuppliedwwZs_wsr   r   r   r  n  sJ    



zSchedulerState.valid_workersc                 C  s*   |j  D ]\}}|j|  |7  < q
d S r   r  r  r   r   r  r  rru  r   r   r   acquire_resources  s    z SchedulerState.acquire_resourcesc                 C  s*   |j  D ]\}}|j|  |8  < q
d S r   ry  rz  r   r   r   release_resources  s    z SchedulerState.release_resourcesr   )r   r   c                 C  s:   | j |}|dk	r$| j| }|jS t|ts2t|S dS )z2
        Coerce the hostname of a worker.
        N)r  r   r  r   r   r~   r  )r   r   aliasr  r   r   r   rk    s    
zSchedulerState.coerce_hostnamer   c                   sX   t  fdd|jD } j j }||| j  }|jrJt j| jfS | jfS dS )zObjective function to determine which worker should get the task

        Minimize expected start time.  If a tie then break with data storage.
        c                 3  s    | ]} |j kr| V  qd S r   )r  r+  r  r  r   r   rA    s    
 z2SchedulerState.worker_objective.<locals>.<genexpr>N)	r   r  r  r   r   r  r
  r   r   )r   r  r  Z
comm_bytesZ
stack_time
start_timer   r  r   r    s    zSchedulerState.worker_objectivec                 C  s(   | | t|jdkr$| j| dS )z@Note that a worker holds a replica of a task with state='memory'rV  N)r/  r
  r  r  r  r   r  r  r   r   r   r/    s    
zSchedulerState.add_replicac                 C  s(   | | t|jdkr$| j| dS )z6Note that a worker no longer holds a replica of a taskr  N)r,  r
  r  r  r&  r  r   r   r   r,    s    
zSchedulerState.remove_replicac                 C  sR   |  }|jD ]}| j|8  _|j|= qt|jdkrD| j| |j  dS )z.Remove all replicas of a task from all workersr  N)r+  r  r   r   r
  r  r&  r  )r   r  r   r  r   r   r   r1    s    

z"SchedulerState.remove_all_replicasr  r   c                 C  sV   g }| j D ](}| |}|dks(||kr
|| q
|jtddd dd |D S )zSend ``no-worker`` tasks to ``processing`` that this worker can handle.

        Returns priority-ordered recommendations.
        Nrw  Tr  reversec                 S  s   i | ]}|j d qS rm   r   r  r   r   r   r     s      zOSchedulerState.bulk_schedule_unrunnable_after_adding_worker.<locals>.<dictcomp>)r  r  r  sortr  r  )r   r  Zrunnabler  Zvalidr   r   r   ,bulk_schedule_unrunnable_after_adding_worker  s    

z;SchedulerState.bulk_schedule_unrunnable_after_adding_workerc                 C  sj   |j r
t|jrt|jrt|jr(t|jr2t|| jks@t|| jksNttdd |j	D sftdS )z;Validation for ready states (processing, queued, no-worker)c                 s  s   | ]}|j V  qd S r   r  r  r   r   r   rA    s     z1SchedulerState._validate_ready.<locals>.<genexpr>N)
r{  r  r  r  r  rz  r  rl   r  r  r#  r   r   r   rE    s    




zSchedulerState._validate_readyc                 C  s   | j rF| | || jks$t| j| j|j }|ksFt||f|| ||_d|_	| 
|| | | |  jd7  _|jr|j| |j| |giS )zKSet a task as processing on a worker and return the worker messages to sendrm   r  )r  rE  r  r  r  r   r   r!  r  rr  r|  r[  r  r  r   r  _task_to_msg)r   r  r  or   r   r   r    s    
"

z!SchedulerState._add_to_processingc                 C  sN   |j }|std|_ || | j|j|k	r4dS | | | || |S )a  Remove *ts* from the set of processing tasks.

        Returns
        -------
        Worker state of the worker that processed *ts* if the worker is current,
        None if the worker is stale.

        See also
        --------
        Scheduler._set_duration_estimate
        N)r  r  r(  r  r   r   r[  r}  r  r   r   r   r&    s    

z&SchedulerState._exit_processing_common)r  r  r  r  r  r  r   c                 C  s>  | j r||jkst| || t|j}t|dkrJ|jt	ddd |D ]*}|j
}	||	krN|	| |	sNd||j< qN|jD ](}|j}	|	| |	s|jsd||j< q|js|jsd||j< n4d|jd}
|d	k	r||
d
< |jD ]}|
g||j< qd|_||_|jj| | jd }||jkr:| j||jg|d d	S )z$Add ts to the set of in-memory tasksr  rw  Tr  rm   ri   key-in-memoryr/  Nr  rn   r  r@  )r  r   r  r/  r  ry  r
  r  r  r  r{  r%  r  r  r|  r}  r   rr  r  rs  ri  r  r  r   rB  )r   r  r  r  r  r  r  rd  r   r  r2  r  r   r   r   r'  1  sD    







zSchedulerState._add_to_memory)r  r  r   c                 C  s   d|_ |j}|jrd||< n|js(|jr0d||< ||dkr|jD ]0}|j dkrD|j| |jsD|jsDd||j< qD|j  | j	r|j
rt|| jkstd S r:  )rr  r  rz  r|  r}  r   r  r%  r  r  r  r  rl   )r   r  r  r  r   r   r   r   r<  f  s     




z"SchedulerState._propagate_released)r  r  r  r  r   c                 C  s   d|_ |jD ]6}d|_|j| |j| |j dkrd||j< q|j  |j	  |jD ]>}|j| |j	| |js^|j
s^||k	std||j< q^|j  |j  |jD ](}|j| jkrd|jg|dg||j< q| | d S )Nrp   Trn   ro   r,  r-  )rr  ry  rz  r  r&  r{  r%  r  r  r|  r}  r  r  r   r  r1  )r   r  r  r  r  r   r  r   r   r   rM  {  s2    







z#SchedulerState._propagate_forgottenzCollection[str]r}   )r   r  r  r   c                 C  s   t d|j| |D ]l}| j|}|dk	r||jkr|j| |j| |js|jsfd||j	< q|j
dkr|jsd||j	< qdS )$Remove keys from client desired listzClient %s releases keys: %sNrp   ro   ri   )r  r  r   r  r   r   r&  r}  ry  r  rr  r|  )r   r   r  r  r  r  r   r   r   rB    s    z$SchedulerState._client_releases_keysrB  )r  r  r   c                 C  s   |dk r|  |}d|j|j|dt  dd |jD dd |jD dddd|j|j|jd}| jrxt	|d	 
 sxtt|jtr||j n
|j|d
< |S )z0Convert a single computational task to a messager   zcompute-taskzcompute-task-c                 S  s    i | ]}|j d d |jD qS )c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^    s     z:SchedulerState._task_to_msg.<locals>.<dictcomp>.<listcomp>)r  r  r  r   r   r   r     s     z/SchedulerState._task_to_msg.<locals>.<dictcomp>c                 S  s   i | ]}|j |jqS r   )r  r   r  r   r   r   r     s      N)r.  r  rw  r  r  r  r   rv  functionargsr  r  r  r   r  rv  )rf  r  rw  rI   r  r  r  r   r  r  rt  r  r   rv  r   r  )r   r  r  msgr   r   r   r    s2    


zSchedulerState._task_to_msg)r   F)N)rW  )NN)rB  )?r   r   r   r   r   r   r   r   r   rn   r  r  r  r  r0  r  r  r  r	  r  r  r  r  r#  r*  r3  r8  r9  r;  r>  rC  rD  rG  rH  rI  rJ  rK  rO  rQ  r  r  r  r[  rY  re  rf  r  r|  r}  rk  r  r/  r,  r1  r  rE  r  r&  r'  r<  rM  rB  r  r   r   r   r   r    s  
	  &a  !#<7E !"O;"&$r *
2	B
  5'r  c                      s  e Zd ZU dZdZe Zded< dsddZ	dd Z
dd Zdd Zd
dddd fddZdddddZddddd d!d"d#Zdtddd$d%d&d'd(Z fd)d*Zdu fd+d,	Zdvd-dddddd.d$d/d0d1dd0d1dd2d3d4Zed
ddd-dddddddddddddd5d6dddd7dd d8d9d:Zdd;d<d=Zdwd>d?Zdxd@dAZdd dBdCdDZdydEdFZdzdGdHZd{dIdJZdd dKdLdMZedd-dNddd$d$dOdPdQdRZd|dSdTZd}dUdVZ d~dWdXZ!ddYdZZ"dd[d\Z#d]d^ Z$d_d` Z%dadb Z&dcdd Z'dedf Z(dgdh Z)didj Z*ddkdldmdnZ+dd$d dodpdqZ,dddkdrdsdtduZ-d6ddd dvdwdxZ.dddrd dydzd{Z/dd}d~dddZ0dd Z1dddd dddZ2ddd dddZ3dddd dddZ4ddd/dd dddZ5dddd dddZ6dddd dddZ7d6dd dddZ8ddddd$drdddZ9ddrdd dddZ:dddZ;ddd dddZ<dd Z=ddd dddZ>dddZ?dddZ@edddZAddddddddddd$dddddZBdddZCddddddĄZDdddd dŜddǄZEeddddrddɜdd˄ZFddddϜddфZGddddӜddՄZHdddׄZIdddddddddݜdd߄ZJedddd-ddddd$d$drdddddZKdddd$d$dddddZLdddZMedddddddZNeOddddrd dddZPeOddd}drd dddZPddddddZPedddZQdddd dd dZRdddZSdddZTddȐdddd	ZUdd
dZVdddZWdddZXdddZYdd;ddZZedddZ[dddZ\ddܐdd dddZ]e^fdܐdddZ_d d!d"d#Z`ed$d% Zadd&d'Zbdd(d)Zcdd*d+Zddd,d-Zedd.d/Zfd0d1 Zgdd2d3Zhd4d5 Zidd6ddd7d8d9d:Zjd7dd d;d<d=Zkdd>d?d@dAZldddrdd ddBdCZmddd!dDdEZndFdG ZoddHdIZpdJdK ZqddLdMZrddNd~dOdPdQdRdSZsdd~dUdVdWdXZtddYdZZuddd d[d\d]Zvd^d_ Zwd`da Zxdbdc ZyddddeZzddfdgZ{dhdi Z|djdk Z}ddldmZ~dddd dndodpZdddd dndqdrZ  ZS (  r  a  Dynamic distributed task scheduler

    The scheduler tracks the current state of workers, data, and computations.
    The scheduler listens for events and responds by controlling workers
    appropriately.  It continuously tries to use the workers to execute an ever
    growing dask graph.

    All events are handled quickly, in linear time with respect to their input
    (which is often of constant size) and generally within a millisecond.  To
    accomplish this the scheduler tracks a lot of state.  Every operation
    maintains the consistency of this state.

    The scheduler communicates with the outside world through Comm objects.
    It maintains a consistent and valid view of the world even when listening
    to several clients at once.

    A Scheduler is typically started either with the ``dask scheduler``
    executable::

         $ dask scheduler
         Scheduler started at 127.0.0.1:8786

    Or within a LocalCluster a Client starts up without connection
    information::

        >>> c = Client()  # doctest: +SKIP
        >>> c.cluster.scheduler  # doctest: +SKIP
        Scheduler(...)

    Users typically do not interact with the scheduler directly but rather with
    the client object ``Client``.

    The ``contact_address`` parameter allows to advertise a specific address to
    the workers for communication with the scheduler, which is different than
    the address the scheduler binds to. This is useful when the scheduler
    listens on a private address, which therefore cannot be used by the workers
    to contact it.

    **State**

    The scheduler contains the following state variables.  Each variable is
    listed along with what it stores and a brief description.

    * **tasks:** ``{task key: TaskState}``
        Tasks currently known to the scheduler
    * **unrunnable:** ``{TaskState}``
        Tasks in the "no-worker" state

    * **workers:** ``{worker key: WorkerState}``
        Workers currently connected to the scheduler
    * **idle:** ``{WorkerState}``:
        Set of workers that are not fully utilized
    * **saturated:** ``{WorkerState}``:
        Set of workers that are not over-utilized

    * **host_info:** ``{hostname: dict}``:
        Information about each worker host

    * **clients:** ``{client key: ClientState}``
        Clients currently connected to the scheduler

    * **services:** ``{str: port}``:
        Other services running on this scheduler, like Bokeh
    * **loop:** ``IOLoop``:
        The running Tornado IOLoop
    * **client_comms:** ``{client key: Comm}``
        For each client, a Comm object used to receive task requests and
        report task status updates.
    * **stream_comms:** ``{worker key: Comm}``
        For each worker, a Comm object from which we both accept stimuli and
        report results
    * **task_duration:** ``{key-prefix: time}``
        Time we expect certain functions to take, e.g. ``{'sum': 0.25}``
    iR"  z$ClassVar[weakref.WeakSet[Scheduler]]r  N500ms60sr   /r   Fc           0   9   K  s  |d k	rt jdtdd t  | _| _| t |d krFt	j
d}|| _|d kr`t	j
d}|| _|d krzt	j
d}t | _t|dd| _t|dd| _|pi | _|pi | _i | _|	| _|pt	j
d	}|rt|nd | _|pt	j
d
}|rt|| _nd | _t | _| j| _t | _t t!| _"t t!| _#|sJt	j
d}|s\t	j
d}t$%| ||| _&t'|
t(rt)f |
}
|
pt) | _*t'| j*t)st+| j*,d| _-ddi| j-d< t.|||||
| j/d| _0t	j
d}|p|d ko|}|r.zdd l1}W n$ t2k
r,   d}|3d Y nX t4| ||d}| j5||dd |rl|j6j7j8| j9| j:| |d || _;| j;rzddl<m=} W n t2k
r   t2dY nX ddl>m?}  |j@| dddd d!id"}!|!jAdd# |!| _B| j9C|!jD i | _Ei | _Fi }"d| _Gd | _Hd| _ItJ }#tKtLMd$d%}$i | _Ni }%tO }&i }'i }(i })|&|'|(|)g| _Pt tQtRt	j
d&d'| _St tT| _Ut tJ| _Vi | _Wi | _X| jY| jZ| j[| j\| j]| j^d(d) | j_| j`| jad*
}*| jb| jc| jd| je| jf| jg| jh| ji| jj| jkd+
}+| jl| jm| jn| jo| jp| jq| jr| js| jt| ju| jv| jw| jx| jy| jz| j{| j|| j}| j~| j| j| j| j| j| j| j| j| j\| j| j| j| j| je| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| j| jd,8| _t d },tj| |)|%|&|'|(|"|#|$|||d- tj| f| jt|*|+|,d| j-d.| | jrLt| j| jd/ }-|-| jd0< | jrtt| j| jd/ d }-|-| jd1< |d krt }t	j
d2sd3|kr|d3= | D ]\}.}/|/| | j|.< qtd4 tj|  d| j_d S )5Nz)the loop kwarg to Scheduler is deprecatedrV  )
stacklevelz%distributed.scheduler.contact-addressz&distributed.scheduler.allowed-failureszdistributed.scheduler.validater   defaultz distributed.scheduler.worker-ttlz"distributed.scheduler.idle-timeoutzdistributed.scheduler.preloadz"distributed.scheduler.preload-argvr   pickle-protocol   handshake_overrides)r   port	interfaceprotocolsecuritydefault_portz!distributed.scheduler.http.routesr   Fz(distributed.http.scheduler.missing_bokeh)servermodulesr  iS"  )r  )r  )	ServerAppzMIn order to use the Dask jupyter option you need to have jupyterlab installed)Configr  jupyterr	  T)base_urltokenZallow_remote_access)r   )Znew_httpserverrw  r   z'distributed.scheduler.events-log-lengthr  c                  _  s   d S r   r   )r  r  r   r   r   r    r  z$Scheduler.__init__.<locals>.<lambda>)
ztask-finishedr5  zrelease-worker-datazadd-keyszlong-runningZ
reschedulez
keep-alivez	log-eventworker-status-changezrequest-refresh-who-has)
zupdate-graphzupdate-graph-hlgzclient-desires-keyszupdate-dataz
report-keyzclient-releases-keyszheartbeat-clientzclose-clientzsubscribe-topiczunsubscribe-topic)8zregister-clientscatterzregister-workerZregister_nanny
unregistergathercancelretryfeed	terminate	broadcastproxyncoresZncores_runningr   r  rm   
call_stackr.   performance_reportget_logslogsZworker_logs	log_eventry   r   r0   add_keys	rebalance	replicaterun_functionrestartupdate_dataZset_resourcesretire_workersget_metadataset_metadataset_restrictionsheartbeat_workerget_task_statusget_task_streamget_task_prefix_statesregister_scheduler_pluginregister_worker_pluginunregister_worker_pluginregister_nanny_pluginunregister_nanny_pluginadaptive_targetworkers_to_closesubscribe_worker_statusstart_task_metadatastop_task_metadataget_cluster_statedump_cluster_state_to_urlbenchmark_hardware	get_story)r  r  r  r  r   r  r  rl   r  r  r  )handlersZstream_handlersconnection_limitZdeserializeconnection_argsi  z
worker-ttlzidle-timeoutz#distributed.scheduler.work-stealingr|   zdask scheduler [not started])r   r   DeprecationWarningr#   currentloopio_loopZ_setup_loggingr  r   r   r   contact_addressallowed_failurespsutilProcessprocr)   delete_intervalsynchronize_worker_intervalZservice_specsservice_kwargsr   scheduler_file
worker_ttlidle_timeoutrI   
idle_sincetime_startedasyncioLock_lockr   r   bandwidth_workersbandwidth_typesr-   Zprocess_preloadspreloadsr   r   rU   r  r  Zget_connection_argsr  r<   r  _start_addressZdistributed.dashboard.schedulerImportErrorr  rF   Zstart_http_server	dashboardr   connectZhttp_applicationZhttp_serverr  Zjupyter_server.serverappr  Ztraitlets.configr  instanceZ
initialize_jupyter_server_applicationZadd_applicationZweb_appclient_commsstream_comms
generationZ_last_client
_last_timer   r5   r  r  Zdatasetsr   Z_worker_collectionsr   r   ry   r   event_countsevent_subscriberworker_pluginsnanny_pluginshandle_task_finishedhandle_task_erredrelease_worker_datar  handle_long_running_reschedulelog_worker_eventhandle_worker_status_changehandle_request_refresh_who_hasupdate_graphupdate_graph_hlgclient_desires_keysr  report_on_keyclient_releases_keysclient_heartbeatremove_clientsubscribe_topicunsubscribe_topic
add_clientr  
add_worker	add_nannyremove_workerr  stimulus_cancelstimulus_retryr  closer  r  
get_ncoresget_ncores_runningget_has_whatget_who_hasget_processingget_call_stackget_profiler  r  get_worker_logsr  
get_eventsr+  r0   r  r  r  r  add_resourcesr  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r\   r  r   rK   r   r=   check_worker_ttlperiodic_callbacks
check_idleDEFAULT_EXTENSIONSr  r  r  rL   r  r  r  r@   allow_offload)0r   r  r  r  r   r  r  r  r  r  r  r  r  r  r   r  r  Zdashboard_addressr  Zhttp_prefixpreloadZpreload_argvr  r  r  r  r  Zhttp_server_modulesZshow_dashboarddistributedZroutesr  r  jr  r  rl   r  r  r  r   r  Zworker_handlersZclient_handlersr  pcr   	extensionr   r   r   r   )  s   

 






 
	     
 


;





zScheduler.__init__c              	   C  s.   d| j dt| j d| j dt| j d	S )Nz<Scheduler z, workers: z	, cores: z	, tasks: r   )Zaddress_safer
  r  r  r  r   r   r   r   r   o  s    ,zScheduler.__repr__c                 C  s   t dj| j| j| j| jdS )Nzscheduler.html.j2)r   r  threadsr  )r+   r  r   r  r  r  r   r   r   r   r  w  s    zScheduler._repr_html_c                 C  sF   t | jt| j| jdd | j D | jdd | j	 D d}|S )z1Basic information about ourselves and our clusterc                 S  s   i | ]\}}||j qS r   )r  )r   r  rK  r   r   r   r     s      z&Scheduler.identity.<locals>.<dictcomp>c                 S  s   i | ]}|j | qS r   )r   r  r   r"  r   r   r   r     s     )r  r  r   r   startedr  )
r  r   r~   r  r   r   r  r  r  rt  )r   dr   r   r   r    s    
zScheduler.identityr   r   r   r   c             
     sd   t  j d}| j| j| j| j| j| j| j| j	| j
d	} fdd| D }|t| d |S )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Server.identity
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        r   )	r  r  r  r  r  r  rn   ry   r  c                   s   i | ]\}}| kr||qS r   r   rJ  r   r   r   r     s       z&Scheduler._to_dict.<locals>.<dictcomp>)superr   r  r  r  r  r  r  rn   ry   r  r  r  r`   )r   r   r  r   	__class__r   r   r     s    
zScheduler._to_dictz'Collection[str]'c                   s|   t | jd|ddd| jddidd}z| j|d}|I d	H \}}W 5 |  X d
d | D }|||  |ddS )z3Produce the state dict used in a cluster state dumpZ
dump_state)r.  r   r   )r  on_errorr.  r0   ignorer   Nc                 S  s(   i | ] \}}|t |tr t|n|qS r   )r   r  r  rJ  r   r   r   r     s    z/Scheduler.get_cluster_state.<locals>.<dictcomp>)r   r  )r   r  r0   )r  r  r  r  r   r  r0   )r   r   Zworkers_futureZscheduler_stateZworker_statesZworker_versionsr   r   r   r    s*    

zScheduler.get_cluster_stater~   zLiteral[('msgpack', 'yaml')]r   r  )urlr   formatstorage_optionsr   c                   s$   t jt| j|||f|I dH  dS )z7Write a cluster state dump to an fsspec-compatible URL.N)r,   Zwrite_stater   r  )r   r/  r   r0  r1  r   r   r   r    s    
  z#Scheduler.dump_cluster_state_to_urlr   ztuple[str, int] | str | None)r"  service_namer  r   c                 C  sR   | j | }|j|}|dkr"dS |rDd|jdd |j|d S |j|fS dS )a  
        Get the (host, port) address of the named service on the *worker*.
        Returns None if the service doesn't exist.

        Parameters
        ----------
        worker : address
        service_name : str
            Common services include 'bokeh' and 'nanny'
        protocol : boolean
            Whether or not to include a full address with protocol (True)
            or just a (host, port) pair
        Nz %(protocol)s://%(host)s:%(port)dz://r   )r  r   r  )r  r   r   r   splitr   )r   r"  r2  r  r  r  r   r   r   get_worker_service_addr  s    
z!Scheduler.get_worker_service_addrc           	   	     s  t   I dH  t    jD ]N}j|fdddddjdI dH  tj	_
j
}|dkr$d}q$jd	rd
}| jD ]}td|j qj D ] \}}td|d||jf  qjr&tjd}tj |dd W 5 Q R X j  fdd}t| jD ]:}z| I dH  W n  tk
rb   td Y nX q,j rjdrdI dH  jd jt!j"d< t#j$fddt%j&' D  I dH  (  t)dj d S )z6Clear out old state and restart all running coroutinesNFr  )r  compression)r   r  r   z0.0.0.0r	  z	inproc://	localhostz  Scheduler at: %25sz%11s at: %25sz%s:%dr  rV  )indentc                     s   t j rt   d S r   )ospathexistsr&  r   )fnr   r   del_scheduler_file  s    z2Scheduler.start_unsafe.<locals>.del_scheduler_filezFailed to start preloadztls://ztcp://localhost:0rB  ZDASK_SCHEDULER_ADDRESSc                   s   g | ]}|  qS r   r9  r   r  r   r   r   r^  /  s     z*Scheduler.start_unsafe.<locals>.<listcomp>zdask scheduler [])*r*  start_unsaferf   r  r  listenr  Zget_listen_argsr8   Zlisten_addressipr   r   Zstart_servicesZ	listenersr  r  r  r   r  r  r  openjsondumpr  r   finalizer  r9  r  r  r  r8  environr  r  r  r  rt  Zstart_periodic_callbacksrL   )	r   rj  Z	listen_ipZlistenerr   rK  fr<  r!  r+  )r;  r   r   r@    sX    





zScheduler.start_unsafec           	   
     s>  |dk	s|dk	rt dt | jtjtjfkr@|  I dH  dS dd  tj	 fddt
| j D  I dH  tj| _td td | jD ]6}z| I dH  W q tk
r   td	 Y qX qtj	 fd
dt
| j D  I dH  | j D ]}|  q| j  |   | j D ]"}tt |  W 5 Q R X q$td g }t
| j D ]T\}}| s|ddd |ddi tt ||  W 5 Q R X qdtj	| I dH  | j r| j!" I dH  | j# D ]}|$  q| j% I dH  tj| _|   t&  I dH  td t'  dS )zSend cleanup signal to all coroutines then wait until finished

        See Also
        --------
        Scheduler.cleanup
        NzThe 'fast' and 'close_workers' parameters in Scheduler.close have no effect and will be removed in a future version of distributed.c                   s4   z|  I d H  W n t k
r.   td Y nX d S )Nz)Plugin call failed during scheduler.close)r  r  r  )r  r   r   r   r^   G  s    z#Scheduler.close.<locals>.log_errorsc                   s   g | ]} |j qS r   )Zbefore_closer>  r^   r   r   r^  N  s     z#Scheduler.close.<locals>.<listcomp>zScheduler closing...zdask scheduler [closing]zFailed to tear down preloadc                   s   g | ]} |j qS r   )r  r>  rI  r   r   r^  ]  s     zScheduler closing all commsr  zscheduler-closer.  reasonr.  zclose-streamzdask scheduler [closed])(r   r   r   r   r>   closingrX  finishedr  r  r  r  rt  r  r  rL   r  teardownr  r  r  r>  r  Zstop_servicesr  r   AttributeErrorr  r  sendr  r  r  r  _cleanupr  abortr@   r*  re   )	r   fastclose_workersr!  r$  extfuturesr   commr+  rI  r   r  7  sb    









zScheduler.closeT)r:   nowr   r  r   r  r   zdict[str, float] | Nonezdict | None)r:   rX  r   r  r   r   r  r   c                C  s  |  ||}t|}| j|}
|
d krBtd|d ddiS t|}t }|pVi }| j	|i }||d< dt
| j }| jd|  |d d |  | _|d d	  D ]b\}\}}||f| jkr|| | j||f< qd| | }| j||f | |d|   | j||f< q|d d
  D ]V\}\}}|| jkrH|| | j|< n,d| | }| j| | |d|   | j|< q ||
_|d k	ri |
_| D ]6\}}|| jkr| j| }||
j|< |j| q||
_|| j }|
j}|
jr$|
jd \}}||krq$|
j  ||krd}qtd|d |
j |d d  }|
j||f |snttt|
j|
_n||k r~||
_|r| j	|i }|| |r|| |
_ |r| j!||d |	r|	 D ]\}}| j"| #|
| qd|t$t
| jdS )Nz,Received heartbeat from unregistered worker .r   missingz	last-seenr  r   totalr  ri  r   rn   r   )r"  r   OK)r   rI   heartbeat-interval)%coerce_addressr9   r  r   r  warningr8   rI   r  r  r
  r   r  r  r  r   r   r  r  rW  r   r  r   r   popleftr   r   r  r   rg  r!   r  r   r  r  Z	heartbeatheartbeat_interval)r   rW  r   r:   rX  r   r  r   r   r  r  r   Z	local_nowrr  fracr   Zbwr*  alpharj  r  r  r  Z!max_memory_unmanaged_old_hist_ageZmemory_unmanaged_oldr2  sizer   datar   r   r   r    s    







 


zScheduler.heartbeat_worker)r   r   r   r:   r   ri  rX  r   r  r   r   r   r   r   r0   r   r   r6   r   )rW  r   r   r   r0   r  r   c          !        s  |  ||}t|}t|}|| jkr2td| |	r|d|dt|	 dt|	 }t| |	d|t
 dI dH  dS || jkrtd| dd	| t
 d}|	|I dH  dS | |d
di | dd|d t|tj| |||pd|||||||| d | j|< }|jtjkr*| j| | j|}|dkrNi  | j|< }|d}|dkrxt  |d< }d|d< || |d  |7  < |  j|7  _|| j|< | j||||||d | | td| jd| j|< t| j D ]h}z2|j| |d}|dk	r t !|r |I dH  W n. t"k
rP } zt#| W 5 d}~X Y nX q|jtjkr| $| %|| | j&|d t'd| dt
 t(t| j| j)d}t*j+t*, dd | j- D |t.|j/d} |0|  |	|I dH  | 1||I dH  dS )zAdd a new worker to the clusterzWorker already exists %sWorker z connected with z? key(s) in memory! Worker reconnection is not supported. Keys: error)r   messagerI   Nz1Worker tried to connect with a duplicate name: %szname taken, %srY  z
add-workerr  )rY  r"  r   )r   r   r   r   r   r   r   r   r0   r   r   r   r   rm  r   )r   r:   rX  r   r  r   5msintervalr  r   r"  r  zRegister worker %sr\  )r   rI   r]  zworker-pluginsc                 S  s   i | ]\}}||j qS r   r/   r   r  r  r   r   r   r   }  s      z(Scheduler.add_worker.<locals>.<dictcomp>)Zsource_name)2r^  r9   r8   r  r  r
  r  r  rg  writerI   r  r_  r  r   r>   lookupr   r  r  r  r   r   r  r  r[  r4   r  r  r  rt  r  inspectisawaitabler  r  transitionsr  !stimulus_queue_slots_maybe_openedr  ra  r  version_moduleerror_messageget_versionsr  r~   r   r  handle_worker)!r   rW  r   r   r   r   r   r   r:   r   ri  rX  r   r  r   r   r   r   r   r0   r   r   r  r   errr  r  rr  dh_addressesr  resultr  version_warningr   r   r   r    s    








  
zScheduler.add_workerr   c                   s   d| j d}|S )Nr\  )r   znanny-plugins)r  r   r  r   r   r   r    s    zScheduler.add_nannyc                   s   t |}|d }|d }|d }| D ]*\}}t|}||krL|| |||< q*|d krt|  fdd| D }tjj||d}| j|||||||||	|
|||||dt  dS )	Ndskrd  r   c                   s$   i | ]\}}| kr||  qS r   )ro  rJ  Zdsk_keysr   r   r     s    z.Scheduler.update_graph_hlg.<locals>.<dictcomp>)r  update-graph-)r<  r  )	r$   Z__dask_distributed_unpack__r  r   r&  r   orderr  rI   )r   r   Zhlgr   r  restrictionsrw  r  r   submitting_taskr  user_priorityr   fifo_timeoutr<  Zunpacked_graphr~  r   r   rK  rd  Zstripped_depsr   r  r   r    sB    




zScheduler.update_graph_hlgc           ,        s  |pdt   }t  }t|}t|}tdkrLd|gdtd tD ]}| |krT|= qT|pri }jdkrjrjd }nt }j	| |r||j
kr|j
| d}t|krXt}t| D ]r\}}tfd	d
|D rtd| |= ||= ||kr.|| jd|d|d j|g||d qqt }| D ]<\}}|rf|jkrfj| }|jdkrf|| qf|rtj|}t|}t| |r| }z|| }W n" tk
r   j| j}Y nX |D ]|}||kr || }n |jkr:j| j}nt }t fdd
|D r|jkr| kr | |	| qqƈ D ]}|d ||d qt|}t }g } |rJ| }||krܐqj|}|dkrj||d|d}n|js ||_|| | 	| |||d qj ||d | D ]L\}}j|}|dks`|jrq`|D ]}j| }!|!|! qq`t"t#ṙfddD |pi }|pi }|pg }|pi }|
pi }
|rd|kr$|d  d|kr4|$|d  d|krZ|dd
 |d  D  d|krr|
$|d  d|kr|$|d  | D ]>\}"}#|# D ]*\}}j|}|dk	r||j%|"< qq|dkrt|}|pg D ]}$j|$ }d|_&q|ptj''}|	rLj|	}|dk	rD|j(d d  }%nj)}%n2j*| |k rx j)d7  _)j)}%|_*nj)}%t||@ D ]6}j| }|j(dkr|d |%|| f|_(qd!d" | D }&|&D ]&}|j(dkr|jrԈj)df|_(q|r| D ]\}}|dkr q
j|}|dkr:q
t |_+t |_,t"|tt-tfsb|g}|D ]D}'z.|'}'W n" t/k
r   |j+|' Y nX |j,|' qfq
|r|D ]}j| }d|_0q|r*| D ]H\}}|dkrqt"|t1st2j|}|dkr q||_3q|
rt|
 D ]:\}}t"|t4sPt2j|}|dkrjq8||_5q8i }(t6|&t78ddd#D ]$}|jdkr|jrd$|(|j9< q| D ]4}|jD ]&}!|!j:r|!j:|_:d%|(|j9<  qqqtj;< D ]^})z(|)j=|||pi |||||d&
 W n. t>k
rT }* zt?|* W 5 d}*~*X Y nX q@|(| | D ] }|jdkrjjA||d' qjt  }+Bd(|+|  dS ))z
        Add new computations to the internal dask graph

        This happens whenever the Client calls submit, map, get, or compute.
        r  r  r  r  rY  r*  g&.>rB  r   c                 3  s    | ]}| j ko|kV  qd S r   r  r`  )r   r  r   r   rA    s    z)Scheduler.update_graph.<locals>.<genexpr>z+User asked for computation on lost data, %scancelled-keyr/  r   r   r   r  r  c                 3  s   | ]}| kV  qd S r   r   )r   r)  )doner   r   rA  $  s     Nri   )r  r   r   r   c                   s   i | ]
}| qS r   r   r   )r  r   r   r   M  s      z*Scheduler.update_graph.<locals>.<dictcomp>rw  r  Zallow_other_workersc                 s  s   | ]\}}|r|V  qd S r   r   rJ  r   r   r   rA  ^  s     r  r   T{Gz?c                 S  s   g | ]}|j r|qS r   )rv  r  r   r   r   r^    s      z*Scheduler.update_graph.<locals>.<listcomp>r  rj   ro   )	r   r  r   r  r  rw  r  r   r   r  r   zupdate-graph-duration)CrI   r)   r   r
  r  r  r  r  r8  r  r<  r  r  r_  r  r  r&  reportr  r  rr  r   coreZreverse_dictr%  KeyErrorr  r  r   r  rv  r  r  r  r   r   r  r   r  r  rw  r  r  r  r  r   r^  r  r  r   r  r  r   r  rM  r  r  r  r  r  rt  r  r  r  rs  r  Zdigest_metric),r   r   r  r   r  r  rw  r  r   r  r  r  r   r  r   r<  r  r9  r   r  nrd  Zalready_in_memoryrK  r  ry  stackr  ra  Z
child_depsr)  Ztouched_keysZtouched_tasksr   akvr  r  Z	runnablesr  r  r  r  endr   )r  r   r  r  r   r    s    

  







































 zScheduler.update_graph)r  r   c                  s    j s
dS t fdd jD }|dkr.dS i } j |D ]d} jr|jdks\t|j|jrpt||jf|jrt||jf|j	s|j
st|d||j< q> || dS )a  Respond to an event which may have opened spots on worker threadpools

        Selects the appropriate number of tasks from the front of the queue according to
        the total number of task slots available on workers (potentially 0), and
        transitions them to ``processing``.

        Notes
        -----
        Other transitions related to this stimulus should be fully processed beforehand,
        so any tasks that became runnable are already in ``processing``. Otherwise,
        overproduction can occur if queued tasks get scheduled before downstream tasks.

        Must be called after `check_idle_saturated`; i.e. `idle_task_count` must be up
        to date.
        Nc                 3  s   | ]}t | jV  qd S r   )r  r  r  r   r   r   rA    s   z>Scheduler.stimulus_queue_slots_maybe_opened.<locals>.<genexpr>r   rl   rm   )rl   r   r  Zpeeknr  rr  r  r  r{  r}  r|  r  rs  )r   r  Zslots_availabler  Zqtsr   r   r   rt    s     z+Scheduler.stimulus_queue_slots_maybe_openedc                 K  s   t d|| i }i }i }| j| }| j|}	|	dksB|	jdkr~t d||	rT|	jnd||	rb|	jni  d|g|dg||< nh|	jdkr| j||gd	 nL|	j	|d
  | j
|d|fd|i|}
|
\}}}|	jdkr||	jkst|||fS )z>Mark that a task has finished execution on a particular workerzStimulus task finished %s, %sN)ri   rl   zKReceived already computed task, worker: %s, state: %s, key: %s, who_has: %srp   r,  r-  rn   )r"  r   r  r"  )r  r  r  r  r   rr  r  r  r  r  r  r  )r   r  r"  r  r  r  r  r  r  r  r{  r   r   r   stimulus_task_finished  sF    



  

z Scheduler.stimulus_task_finishedc                 K  s   t d|| | j|}|dks,|jdkr6i i i fS |jdkr\| jd8  _| |d|S | j|d|f||||d|S dS )	z1Mark that a task has erred on a particular workerzStimulus task erred %s, %sNrm   r   r  rj   ro   )r?  r  r  r"  )r  r  r  r   rr  r  r  )r   r  r"  r  r  r  r  r  r   r   r   stimulus_task_erred6  s&    


zScheduler.stimulus_task_erredc           
      C  s   t d|t| |r,| |dt|d t|}t }g }|r| }|| | j| }dd |j	D }|r~|
| q>|| q>dd |D }	| |	dt   | jr|D ]}| j| jrtqt|S )	Nz#Client %s requests to retry %d keysr  r  c                 S  s   g | ]}|j d kr|jqS )ro   )rr  r  r  r   r   r   r^  c  s     
 z,Scheduler.stimulus_retry.<locals>.<listcomp>c                 S  s   i | ]
}|d qS r  r   rT  r   r   r   r   i  s      z,Scheduler.stimulus_retry.<locals>.<dictcomp>zstimulus-retry-)r  r  r
  r  r  r   r%  r  r  r  r  r  rs  rI   r  r  r  r   )
r   r   r   r  seenrootsr  r  Z
erred_depsr  r   r   r   r  U  s(    

zScheduler.stimulus_retry)r"  r   c                 C  s@   || j krdS td| | |ddi | |ddd dS )aY  Ask a worker to shut itself down. Do not wait for it to take effect.
        Note that there is no guarantee that the worker will actually accept the
        command.

        Note that :meth:`remove_worker` sends the same command internally if close=True.

        See also
        --------
        retire_workers
        remove_worker
        NzClosing worker %srY  zclose-workerr  zscheduler-close-workerrJ  )r  r  r  r  worker_send)r   r"  r   r   r   close_workerr  s
    
zScheduler.close_worker)r+  r  z"Literal[('OK', 'already-removed')])r   r  r+  r  r   c                  sP  j tjkrdS    jkr(dS t }j  }ddd |jD d} |   |d< d| t	
d| |rttt j  d	d
d W 5 Q R X   j| }|d }	|	  |d  |j8  <  j|j8  _|	sj|= j  j = j|j= j|jd j| j| j = tj|_ j| i }
t|jD ]}|j }d|
|< |sv| j!d7  _!|j" j!d7  _!|j!j#krv|
|= t$%t&||' j#d}j(|d||| d}|
)| t	
d|j j# qvt|j*D ]8}+|| |j,s|j-rFd|
|j < n
d|
|j < qj.|
|d tj/0 D ]^}z(|j1 d}t23|r|I dH  W n. t4k
r } zt	5| W 5 d}~X Y nX qpjst	
d jD ]*}j6 |fd j6| fd q fdd}t7t8j9:d}j;<|| t	=d| dS )a  Remove worker from cluster.

        We do this when a worker reports that it plans to leave or when it appears to be
        unresponsive. This may send its tasks back to a released state.

        See also
        --------
        retire_workers
        close_worker
        zalready-removedzremove-workerc                 S  s   h | ]
}|j qS r   r   r  r   r   r   rk    s     z*Scheduler.remove_worker.<locals>.<setcomp>)rY  zprocessing-tasksr"  r  zRemove worker %sr  zscheduler-remove-workerrJ  rm  r   Nri   r  taskrn  r  ro   )r  r?  r  r"  zGTask %s marked as failed because %d workers died while trying to run itrp   rm  rl  zLost all workersc                     s     j kr jkrj = d S r   )r  ry   r   r   r   r   r   remove_worker_from_events  s    z:Scheduler.remove_worker.<locals>.remove_worker_from_events*distributed.scheduler.events-cleanup-delayzRemoved worker %sr\  )>r   r>   rX  r^  r  r8   rm   r  r  r  r  r   rO  r7   r  rP  remove_resourcesr  r&  r   r  r@   r  r   r  r%  r   r  r%  r  r  r  r  rP  r  r  picklerM   KilledWorkerr  r  r  r   r,  r  rv  rs  r  rt  r  rq  rr  r  r  r  r)   r   r   r   _ongoing_background_tasks
call_laterr  )r   r   r  r+  r  r   r  Z	event_msgrr  rz  r  r  r   r  r{  r  r{  r  r  cleanup_delayr   r  r   r    s    








 


 zScheduler.remove_workerc                   sT   t d t|  r. dt|d tj fdd|D  I dH  dS )z Stop execution on a list of keysz$Client %s requests to cancel %d keysr  )rY  r*  forcec                   s   g | ]}j | d qS )r  )_cancel_keyrT  r   r  r   r   r   r^    s     z-Scheduler.stimulus_cancel.<locals>.<listcomp>N)r  r  r
  r  r  r  )r   r   r   r  r   r  r   r    s     zScheduler.stimulus_cancelc                   s  j |}zj  }W n tk
r0   Y dS X t }|dksF|jsvtdI dH  j |}t | dkr8dS q8s|j|hkrtj fdd|j	D  I dH  t
d| d|d rt|jn|g}|D ] }j|g|jd	t  d
 qdS )z*Cancel a particular key and all dependentsN皙?r  c                   s   g | ]}j |j d qS r  )r  r  r  r  r   r   r^  -  s   z)Scheduler._cancel_key.<locals>.<listcomp>z#Scheduler cancels key %s.  Force=%sr  r/  zcancel-key-r  )r  r   r  r  rI   r}  r  sleepr  ry  r  r  r  r  r  r   )r   r  r   r  r  r  r9  r  r   r  r   r    s4    
  
zScheduler._cancel_keyc                 C  s   | j |}|d kr&t| | j |< }|D ]V}| j|}|d krP| |d d}|j| |j| |jdkr*| j	||d q*d S )Nri   r  r  )
r  r   r}   r  r  r}  r  r   rr  r  )r   r   r   r  r   r  r   r   r   r  :  s    
zScheduler.client_desires_keysc                 C  s\   |pdt   }t|ts"t|}| j| }i }| j|||d | || | j|d dS )r  zclient-releases-keys-)r   r  r  rm  N)rI   r   r  r  rB  rs  rt  )r   r   r   r  r  r  r   r   r   r  J  s    

zScheduler.client_releases_keysc                 C  s   | j | }t |_dS )zHandle heartbeats from ClientN)r  rI   r   )r   r   r  r   r   r   r  W  s    
zScheduler.client_heartbeatc                   s|   | j |   jdkst jr"t jr,t jr6t jr@tt fdd jD r\t | j	ksjt | j
ksxtd S )Nri   c                   s   g | ]} |j kqS r   )r|  r  r  r   r   r^  g  s     z/Scheduler.validate_released.<locals>.<listcomp>)r  rr  r  r|  r{  r  r  r_  r  r  rl   )r   r  r   r  r   validate_released`  s    




zScheduler.validate_releasedc                 C  sz   | j | }|jst|jrt|jr(t|| jks6t|| jksDt|jD ]*}t|j||jkksft||j	ksJtqJd S r   )
r  r{  r  r  r  r  rl   r  r   r|  r   r  r  r   r   r   r   validate_waitingk  s    




zScheduler.validate_waitingc                 C  st   | j | }|| jkst|jr"t|jr,t|jr6t|jsH|jsH|jrLt|j	D ]}|js`t||j
ksRtqRd S r   )r  rl   r  r{  r  r  r  r  r  r  r|  r  r   r   r   validate_queuedw  s    





zScheduler.validate_queuedc                 C  sp   | j | }|jrt|j}|s"t||jks0t|jr:t|| jksHt|jD ]}|js\t||jksNtqNd S r   )	r  r{  r  r  rm   r  rl   r  r|  )r   r  r  r  r   r   r   r   validate_processing  s    




zScheduler.validate_processingc                 C  s   | j | }|jstt|| jkt|jdkks4t|jr>t|jrHt|| jksVt|| j	ksdt|j
D ]*}||jk|jdkkst||jksjtqjd S )Nr  rj   rl   rm   rk   )r  r  r  r   r  r
  r  r{  r  rl   ry  r|  rr  r  r   r   r   validate_memory  s    

 


zScheduler.validate_memoryc                 C  sl   | j | }|| jkst|jr"t|| jks0t|jr:t|jrDt|| jksRt|jD ]}|jsXtqXd S r   )r  r  r  r{  r  r  rl   r  r  r   r   r   validate_no_worker  s    




zScheduler.validate_no_workerc                 C  s0   | j | }|jst|jrt|| jks,td S r   )r  r  r  r  rl   )r   r  r  r   r   r   validate_erred  s    


zScheduler.validate_erredr  r  c              
   C  s   z|d kr| j |}|d kr,td| nX|  zt| d|jdd }W n* tk
rz   t	d|jdd Y n
X || W nB t
k
r } z$t| trdd l}|   W 5 d }~X Y nX d S )NzKey lost: %sZ	validate_-r   zself.validate_%s not foundr   )r  r   r  r  r  r   rr  replacerO  rg  r  r  r  r  r  )r   r  r  r  r  r  r   r   r   ra     s(     

zScheduler.validate_key)allow_overlapr   c                 C  s  t | j| j| j t| jt| jks.td| j| j	
 sXt| jt| j	
 ftt}| j D ]R\}}t|tstt||ft|tstt||f|j|kst|jtjkr|j| j	kst|j|jst|js|jrt|jtjkr|j| j	kst|j |j@ r,ttt}|jD ]0}|jD ]"}||jkrD||  d7  < qDq:||jks|t|jtjk|| jkkst|j D ]\}}	||  |	7  < qqj| | j  kst| j  D ]6\}}
|| |
kst| d||  d|
 dq| jD ]*}|jtjks6t|j| jks tq | j D ]d\}}t|t!sztt||f|j"|kstt#|| j$kt%|j&dkkst| '|| qV| j$D ](}|j(dkst|j"| jkstq| j D ]`\}}|d ks&t|tks&tt||ft|t)ksDtt||f|j*|kstqdd | j D }d	d | j D }||kst||f| j+r| j,| j+k std S )
Nz'Workers not the same in all collectionsr  rG  z (wss), z	 (global)rn   c                 S  s   i | ]\}}||j qS r   rb  rn  r   r   r   r     s      z,Scheduler.validate_state.<locals>.<dictcomp>c                 S  s&   i | ]\}}|t d d |jD qS )c                 s  s   | ]}|  V  qd S r   r  r  r   r   r   rA    s     z6Scheduler.validate_state.<locals>.<dictcomp>.<genexpr>)r   r   rn  r   r   r   r     s    )-validate_stater  r  r  r   r  r  r  
issupersetr  rt  r  r  r   r   r  r   r~   r  r   r   r   r>   r   issubsetrm   r  r   r   r   r  r   r  r  r  r   r  r
  r  ra   rr  r}   r   r  r  )r   r  Ztask_prefix_countsr  r  Zactual_needs_whatr  tssr   r*  Zglobal_countr   r  r  r  br   r   r   r    sr    




"
(zScheduler.validate_stater!  )r  r  r   c           
   
     s   |dkr*| d}|dk	r*| j}| |}| j}|dkrBt|}n8 dkr\dd |jD }n fdd|jD }|  |D ]Z}| |}	|	dkrq~z|	| W q~ tk
r   | jt	j
krtjd|	|dd Y q~X q~dS )	z
        Publish updates to all listening Queues and Comms

        If the message contains a key then we only send the message to those
        comms that care about the key.
        Nr  c                 S  s   g | ]
}|j qS r   r   r   r  r   r   r   r^  &  s     z$Scheduler.report.<locals>.<listcomp>c                   s   g | ]}|j  kr|j qS r   r   r  r  r   r   r^  )  s    
 'Closed comm %r while trying to write %sTr  )r   r  r  r  r}  r  rP  r7   r   r>   r  r  critical)
r   r  r  r   Zmsg_keyr  r  Zclient_keysr   r  r   r  r   r    s8    





   zScheduler.report)rW  r   r0   r   c           	        s  |dk	st d|_td| | d|gd|d t||d| j|< t| j	 D ]D}z|j
| |d W qT tk
r } zt| W 5 d}~X Y qTX qTztd| jd}|| || j|< d	di}tt dd | j D |}|| || z| j#|d|idI dH  W 5 | j |dt!  d t"d| X W 5 | s`| j| d	d
i zDt s| j|  I dH  | j|= | jtjkrtd| W n tk
r   Y nX X dS )zXAdd client to network

        We listen to all future messages from this Comm.
        NzScheduler->ClientzReceive client connection: %sr  z
add-clientrY  r   r/   r   r   r.  zstream-closedzClose client connection: %sZ2msrj  zstream-startc                 S  s   i | ]\}}||j qS r   r/   rn  r   r   r   r   V  s      z(Scheduler.add_client.<locals>.<dictcomp>remove-client-)r   r  zFinished handling client %sr   rW  r   )$r  r   r  r  r  r}   r  r  r  rt  r  r  r  rX  r  rP  sysis_finalizingr  r   r>   r  	TypeErrorr4   r  r9  ru  rv  rw  r  r  r  r  rI   r  handle_stream)	r   rW  r   r0   r  r  bcommr  r|  r   r   r   r  <  sH    





zScheduler.add_client)r   r  r   c                   s  |pdt   }jtjkr(td  d gd d zj  }W n tk
r`   Y n|X j	dd |j
D |j|d j = tj D ]D}z|j d	 W q tk
r } zt| W 5 d
}~X Y qX q fdd}ttjd}jjsj|| d
S )zRemove client from networkr  zRemove client %sr  zremove-clientr  c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^  z  s     z+Scheduler.remove_client.<locals>.<listcomp>r  r  Nc                     s     j kr jkrj = d S r   )r  ry   r   r   r   r   r   remove_client_from_events  s    z:Scheduler.remove_client.<locals>.remove_client_from_eventsr  )rI   r   r>   r  r  r  r  r  r  r  r   r   r  r  rt  r  r  r  r)   r   r   r   r  rX  r  )r   r   r  r  r  r  r  r  r   r  r   r  m  s8    

 zScheduler.remove_clientrB  r  r   )r  r  c              
   C  sd   z|  ||}| || W nB tk
r^ } z$t| trLddl}|   W 5 d}~X Y nX dS )z,Send a single computational task to a workerr   N)r  r  r  r  r  r  r  r  )r   r"  r  r  r  r  r  r   r   r   send_task_to_worker  s    
zScheduler.send_task_to_workerc                 K  s   t tf |d  d S r)  )r  r  r?   r}  r   r   r   handle_uncaught_error  s    zScheduler.handle_uncaught_error)r  r"  r  r   c           	      K  sd   || j krd S t| | jf |||d|}|\}}}| |||| | || | j|d d S )N)r  r"  r  rm  )r  ra   r  r  send_allrt  )	r   r  r"  r  r  r{  r  r  r  r   r   r   r    s    
  
zScheduler.handle_task_finishedr  c                 K  sL   | j f ||d|}|\}}}| |||| | || | j|d d S )N)r  r  rm  )r  r  r  rt  )r   r  r  r  r{  r  r  r  r   r   r   r    s
    
zScheduler.handle_task_erredc                 C  sT   | j |}| j|}|r*|r*||jkr.d S | || |jsP| |di| d S )Nri   )r  r   r  r  r,  rs  )r   r  r"  r  r  r  r   r   r   r    s    zScheduler.release_worker_data)r  r"  compute_durationr  r   c           	      C  s   || j krtd| dS | j | }| jd}|dk	rB|| |j}|dkr^td dS |dk	r|jj}|dk r||j_n|| d |j_|	| | 
| | j|d dS )zA task has seceded from the thread pool

        We stop the task from being stolen in the future, and change task
        duration accounting as if the task has stopped.
        z7Skipping long_running since key %s was already releasedNr|   z;Received long-running signal from duplicate task. Ignoring.r   rV  rm  )r  r  r  r  r   Zremove_key_from_stealabler  r  rO  r$  r[  rt  )	r   r  r"  r  r  r  r)  r  Zold_durationr   r   r   r    s&    






zScheduler.handle_long_runningzstr | Statuszstr | WorkerState)r   r"  r  r   c                 C  s   t |tr| j|n|}|s"d S |j}t |tr:t| n||_|j|krNd S | |jd|j|jjd t	
d|j d| d|  |jtjkr| j| | | | | || | j|d n(| j| | j|jd  | j| d S )Nr  )rY  zprev-statusr   zWorker status z -> z - rm  )r   r~   r  r   r   r>   r  r   r   r  r  r  r  r[  rs  r  rt  r%  r  r%  r  )r   r   r"  r  r  prev_statusr   r   r   r     s4    

 z%Scheduler.handle_worker_status_changezIterable[str])r   r"  r  r   c                   s~   i }g }|D ]4}|| j kr6dd | j | jD ||< q|| q|r^| j| d||d |rz| j| d||d dS )zAsynchronous request (through bulk comms) from a Worker to refresh the
        who_has for some keys. Not to be confused with scheduler.who_has, which is a
        synchronous RPC request from a Client.
        c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     z<Scheduler.handle_request_refresh_who_has.<locals>.<listcomp>zrefresh-who-has)r.  r  r  r,  r-  N)r  r  r  r  rP  )r   r   r"  r  r  	free_keysr  r   r   r   r    s(    


z(Scheduler.handle_request_refresh_who_has)rW  r"  r   c              
     sx   d|_ | j| }|| td| z| j|d|idI dH  W 5 || jkrr|  | j|dt  dI dH  X dS )z
        Listen to responses from a single worker

        This is the main loop for scheduler-worker interaction

        See Also
        --------
        Scheduler.handle_client: Equivalent coroutine for clients
        zScheduler connection to workerz"Starting worker compute stream, %szhandle-worker-cleanup-rm  Nr"  r  )	r   r  r9  r  r  rR  r  rI   r  )r   rW  r"  Zworker_commr   r   r   rx  '  s    



 
zScheduler.handle_worker)
idempotentr   rC   )r  r  r   c                K  sF   |dkrt |}|| jkr8|r"dS tjd| dtd || j|< dS )a  Add external plugin to scheduler.

        See https://distributed.readthedocs.io/en/latest/plugins.html

        Parameters
        ----------
        plugin : SchedulerPlugin
            SchedulerPlugin instance to add
        idempotent : bool
            If true, the plugin is assumed to already exist and no
            action is taken.
        name : str
            A name for the plugin, if None, the name attribute is
            checked on the Plugin instance and generated if not
            discovered.
        Nz.Scheduler already contains a plugin with name z; overwriting.)category)rD   r  r   r   UserWarning)r   r  r  r   r  r   r   r   
add_plugin>  s    

zScheduler.add_pluginzSchedulerPlugin | None)r   r  r   c                 C  sB   |dk	st z| j|= W n$ tk
r<   td|dY nX dS )zRemove external plugin from scheduler

        Parameters
        ----------
        name : str
            Name of the plugin to remove
        NzCould not find plugin z$ among the current scheduler plugins)r  r  r  r  r   r   r  r   r   r   remove_pluginc  s    
zScheduler.remove_pluginc                   s   t jdstdt|ts&t|}|dkr6t|}|| jkrH|rHdS t	|drp|
| }t|rp|I dH  | j|||d dS )z#Register a plugin on the scheduler.distributed.scheduler.picklezCannot register a scheduler plugin as the scheduler has been explicitly disallowed from deserializing arbitrary bytestrings using pickle via the 'distributed.scheduler.pickle' configuration setting.Nr9  )r   r  )r   r   r   r  r   rC   rN   rD   r  hasattrr9  rq  rr  r  )r   r  r   r  r{  r   r   r   r  x  s    




z#Scheduler.register_scheduler_plugin)r"  r  r   c              	   C  sR   | j }z|| | W n4 ttfk
rL   | jj| j|dt  d Y nX dS )zSend message to worker

        This also handles connection failures by adding a callback to remove
        the worker on the next cycle.
        zworker-send-comm-fail-r   r  N)r  rP  r7   rO  r  	call_soonr  rI   )r   r"  r  r  r   r   r   r    s    
zScheduler.worker_sendc              	   C  sb   | j }||}|dkrdS z|| W n2 tk
r\   | jtjkrXtjd||dd Y nX dS )zSend message to clientNr  Tr  )	r  r   rP  r7   r   r>   r  r  r  )r   r   r  r  r  r   r   r   client_send  s    
   zScheduler.client_sendrs   )r  r  r   c              
   C  s   |  D ]`\}}| j|}|dkr&qz|j|  W q tk
rf   | jtjkrbtj	d||dd Y qX q|  D ]h\}}z| j
| }|j|  W qr tk
r   Y qr ttfk
r   | jj| j|dt  d Y qrX qrdS )z#Send messages to client and workersNr  Tr  zsend-all-comm-fail-r  )r  r  r   rP  r7   r   r>   r  r  r  r  r  rO  r  r  r  rI   )r   r  r  r   msgsr  r"  r  r   r   r   r    s4    

zScheduler.send_allrV  c                   s  t  }|dkr j}n2 fdd|D } fdd|D }dd |D }|rNqvt  || krdtdtdI dH  qd	d
 |D }	t|tstt|	| j	ddI dH \}
}} j
|||d |r|dkrt|	n|} j|
||dI dH   |dgd|t|d |
S )zaSend data out to workers

        See also
        --------
        Scheduler.broadcast:
        Nc                   s   g | ]}  |qS r   r^  r  r   r   r   r^    s     z%Scheduler.scatter.<locals>.<listcomp>c                   s   h | ]} j | qS r   rh  r  r   r   r   rk    s     z$Scheduler.scatter.<locals>.<setcomp>c                 S  s   h | ]}|j tjkr|qS r   r  r  r   r   r   rk    s      zNo valid workers foundr  c                 S  s   i | ]}|j |jqS r   )r   r   r  r   r   r   r     s      z%Scheduler.scatter.<locals>.<dictcomp>F)r@   r  )r  r   r   T)r   r  r  r  r  )rY  r   r*  )rI   r  rZ   r  r  r   r   r  rd   r@   r  r
  r  r  )r   rW  re  r  r   r  timeoutr9  r(  r   r   r  r   r  r   r   r   r    s8        zScheduler.scatterc              	     s"  dt   t|}i }|D ]6} j|}|dk	rJdd |jD ||< qg ||< qt| jd|dI dH \}}}|sd|d}	n fd	d|D }
td
||
| d|d}	t	 N t
j fdd|D  I dH  | D ]\}}tdt|t| qW 5 Q R X  ddt|d |	S )z*Collect data from workers to the schedulerzgather-Nc                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     z$Scheduler.gather.<locals>.<listcomp>F)r@   r  serializersr\  )r   re  c                   s&   g | ]}| j kr j | jnd qS r   r  rr  rT  r   r   r   r^    s   z-Couldn't gather keys %s state: %s workers: %srg  r   r   c                 3  s   | ]} j |d dV  qdS )T)r   r  r  Nr  r'  r   r  r   r   rA  $  s     z#Scheduler.gather.<locals>.<genexpr>z6Shut down workers that don't have promised key: %s, %sr  r  r  )rI   r  r  r   r  rb   r@   r  r  r^   r  r  r  r~   r  r
  )r   r   r  r  r  r  re  missing_keysZmissing_workersr{  Zmissing_statesr  r   r  r   r    sP    
   


zScheduler.gather   c                   s|  dt   td j D ]"}jdd |jD |jd q   j	rVt
ddi tj D ]@}z| W qr tk
r } zt| W 5 d}~X Y qrX qrtj}d	d
 j D  tj fddjD  I dH  td  t 4 I dH tjfdd  D  I dH }t }	tjfdd|D ddiI dH }
dd t |
D }|rtjfdd|D  I dH  tt| dt| d dW 5 Q I dH R X |dgd|d |rntj|k rnt |	 k rtdI dH  nNd| d dtj d}t  }|k r`|d||  d7 }t|dqtd  dS )!a"  
        Restart all workers. Reset local state. Optionally wait for workers to return.

        Workers without nannies are shut down, hoping an external deployment system
        will restart them. Therefore, if not using nannies and your deployment system
        does not automatically restart workers, ``restart`` will just shut down all
        workers, then time out!

        After ``restart``, all connected workers are new, regardless of whether ``TimeoutError``
        was raised. Any workers that failed to shut down in time are removed, and
        may or may not shut down on their own in the future.

        Parameters
        ----------
        timeout:
            How long to wait for workers to shut down and come back, if ``wait_for_workers``
            is True, otherwise just how long to wait for workers to shut down.
            Raises ``asyncio.TimeoutError`` if this is exceeded.
        wait_for_workers:
            Whether to wait for all workers to reconnect, or just for them to shut down
            (default True). Use ``restart(wait_for_workers=False)`` combined with
            :meth:`Client.wait_for_workers` for granular control over how many workers to
            wait for.

        See also
        --------
        Client.restart
        Client.restart_workers
        zrestart-z*Restarting workers and releasing all keys.c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^  Y  s     z%Scheduler.restart.<locals>.<listcomp>r  r.  r  Nc                 S  s   i | ]\}}|j r||j qS r   )r   )r   rj  r  r   r   r   r   i  s      z%Scheduler.restart.<locals>.<dictcomp>c                 3  s$   | ]}| krj |d V  qdS )r  Nr  ri  )nanny_workersr   r  r   r   rA  o  s   z$Scheduler.restart.<locals>.<genexpr>zSend kill signal to nannies: %sc                 3  s"   | ]} t| jd V  qdS ))r  N)enter_async_contextr@   r  )r   Znanny_address)r   r  r   r   rA  y  s   c                 3  s$   | ]}t |jd  d V  qdS )zscheduler-restart)rK  r  N)r  wait_forkill)r   r   )r  r   r   rA    s   return_exceptionsTc                 S  s   g | ]\}}|d k	r|qS r   r   )r   rj  respr   r   r   r^    s     c                 3  s   | ]} j |d V  qdS )rm  Nr  ri  r  r   r   rA    s   r  z* nanny worker(s) did not shut down within r  r  r  g?zWaited for z4 worker(s) to reconnect after restarting, but after zs, only zG have returned. Consider a longer timeout, or `wait_for_workers=False`.z The a=   worker(s) not using Nannies were just shut down instead of restarted (restart is only possible with Nannies). If your deployment system does not automatically re-launch terminated processes, then those workers will never come back, and `Client.restart` will always time out. Do not use `Client.restart` in that case.zRestarting finished.)rI   r  r  r  rt  r  r   r   r  r  r  r  r  r  r  r  r  r
  r  r  r  r  r  
contextlibAsyncExitStackrH   ziprZ   r  r  )r   r   r  Zwait_for_workersr  r  r  r  Znanniesr9  ZrespsZbad_nanniesr  Zn_nannyr   )r  r   r  r  r  r   r  5  s    




	

zScheduler.restartraise)r  hostsr   r  r-  z'list[str] | None'z7"Literal['raise', 'return', 'return_pickle', 'ignore']")r  r  r  r   r-  r   c                  s   |dkr |dkrt j}ng }|dk	rT|D ]&}j|}	|	dk	r,||	d  q,|rlfdd|D }
n|}
t   fddtfdd|
D I dH } fdd	t||D S )
z0Broadcast message to workers, return all resultsNrm  c                   s   g | ]} j | jqS r   )r  r   r  r   r   r   r^    s     z'Scheduler.broadcast.<locals>.<listcomp>c                   s   zPj | I d H }d|_z t|fddI d H }W 5 j | | X |W S  tk
r } z~td|  d|jj	 d|  dkr nNdkr| W Y @S d	krt
| W Y (S d
krΈ  W Y S tdW 5 d }~X Y nX d S )NzScheduler BroadcastT)r  r  zbroadcast to z	 failed: rG  r  r   Zreturn_pickler.  zFon_error must be 'raise', 'return', 'return_pickle', or 'ignore'; got )r@   r  r   ZreuserA   r  r  rg  r,  r   rM   r  )rj  rW  r  r  )ERRORr  r-  r   r  r   r   send_message  s6      z)Scheduler.broadcast.<locals>.send_messagec                   s   g | ]}|d k	r |qS r   r   r   r   )r  r   r   r^    s      c                   s   i | ]\}}| k	r||qS r   r   rJ  )r  r   r   r     s       z'Scheduler.broadcast.<locals>.<dictcomp>)r  r  r  r   r  r   rY   r  )r   rW  r  r  r  r   r  r-  r   rr  rm  resultsr   )r  r  r-  r   r  r  r   r    s$    
zScheduler.broadcastc                   s"   | j |||g|dI dH }|| S )z@Proxy a communication through the scheduler to some other worker)rW  r  r  r  Nr  )r   rW  r  r"  r  r)  r   r   r   r     s       zScheduler.proxyz'dict[str, list[str]]'r   )worker_addressr  r   c           
        sh  z t | j|dj|dI dH }W nN tk
rn } z0td| d|jj d|  t| W Y S d}~X Y nX | j	
|}|std| d t|S |d	 d
krt }| }nT|d	 dkrt|d }| | }td| d|d   ntd| d| |D ]T}| j
|}	|	dks4|	jdkrHtd|  q||	jkr| |	| q|S )a  Peer-to-peer copy of keys from multiple workers to a single worker

        Parameters
        ----------
        worker_address: str
            Recipient worker address to copy keys to
        who_has: dict[Hashable, list[str]]
            {key: [sender address, sender address, ...], key: ...}

        Returns
        -------
        returns:
            set of keys that failed to be copied
        rj  r  NCommunication with worker  failed during replication: rG  rf  z lost during replicationr   r\  partial-failr   z failed to acquire keys: zUnexpected message from rn   zKey lost during replication: )rc   r@   r  OSErrorr  r_  r,  r   r   r  r   r   r  r  rr  r  r/  )
r   r  r  r{  r  r  Zkeys_failedZkeys_okr  r  r   r   r   gather_on_worker  s@     
zScheduler.gather_on_worker)r  r   r  r   c                   s   z.t | j|djt|dt  dI dH  W nH tk
rv } z*td| d|jj	 d|  W Y dS d}~X Y nX | j
|}|sdS |D ]R}| j|}|dk	r||jkr|jdkst| || |js| |d	i| q| |jd
|d dS )a  Delete data from a worker and update the corresponding worker/task states

        Parameters
        ----------
        worker_address: str
            Worker address to delete keys from
        keys: list[str]
            List of keys to delete on the specified worker
        r  zdelete-data-)r   r  Nr  r  rG  rn   ri   zremove-worker-data)rY  r   )rc   r@   r  r  rI   r  r  r_  r,  r   r  r   r  r  rr  r  r,  rs  r  r   )r   r  r   r  r  r  r  r  r   r   r   delete_worker_data@  s,    
zScheduler.delete_worker_datazIterable[str] | None)r   r  r  r   c           	        s  |pdt   }|dk	r, fdd|D }n
 j }|sBddiS |dk	rt|ts\t|}|shddiS  fdd|D }|rd|d	S  ||}|sddiS  j4 I dH F  ||I dH }|d dkr|dkrddi}|W  5 Q I dH R  S Q I dH R X dS )
a  Rebalance keys so that each worker ends up with roughly the same process
        memory (managed+unmanaged).

        .. warning::
           This operation is generally not well tested against normal operation of the
           scheduler. It is not recommended to use it while waiting on computations.

        **Algorithm**

        #. Find the mean occupancy of the cluster, defined as data managed by dask +
           unmanaged process memory that has been there for at least 30 seconds
           (``distributed.worker.memory.recent-to-old-time``).
           This lets us ignore temporary spikes caused by task heap usage.

           Alternatively, you may change how memory is measured both for the individual
           workers as well as to calculate the mean through
           ``distributed.worker.memory.rebalance.measure``. Namely, this can be useful
           to disregard inaccurate OS memory measurements.

        #. Discard workers whose occupancy is within 5% of the mean cluster occupancy
           (``distributed.worker.memory.rebalance.sender-recipient-gap`` / 2).
           This helps avoid data from bouncing around the cluster repeatedly.
        #. Workers above the mean are senders; those below are recipients.
        #. Discard senders whose absolute occupancy is below 30%
           (``distributed.worker.memory.rebalance.sender-min``). In other words, no data
           is moved regardless of imbalancing as long as all workers are below 30%.
        #. Discard recipients whose absolute occupancy is above 60%
           (``distributed.worker.memory.rebalance.recipient-max``).
           Note that this threshold by default is the same as
           ``distributed.worker.memory.target`` to prevent workers from accepting data
           and immediately spilling it out to disk.
        #. Iteratively pick the sender and recipient that are farthest from the mean and
           move the *least recently inserted* key between the two, until either all
           senders or all recipients fall within 5% of the mean.

           A recipient will be skipped if it already has a copy of the data. In other
           words, this method does not degrade replication.
           A key will be skipped if there are no recipients available with enough memory
           to accept the key and that don't already hold a copy.

        The least recently insertd (LRI) policy is a greedy choice with the advantage of
        being O(1), trivial to implement (it relies on python dict insertion-sorting)
        and hopefully good enough in most cases. Discarded alternative policies were:

        - Largest first. O(n*log(n)) save for non-trivial additional data structures and
          risks causing the largest chunks of data to repeatedly move around the
          cluster like pinballs.
        - Least recently used (LRU). This information is currently available on the
          workers only and not trivial to replicate on the scheduler; transmitting it
          over the network would be very expensive. Also, note that dask will go out of
          its way to minimise the amount of time intermediate keys are held in memory,
          so in such a case LRI is a close approximation of LRU.

        Parameters
        ----------
        keys: optional
            allowlist of dask keys that should be considered for moving. All other keys
            will be ignored. Note that this offers no guarantee that a key will actually
            be moved (e.g. because it is unnecessary or because there are no viable
            recipient workers for it).
        workers: optional
            allowlist of workers addresses to be considered as senders or recipients.
            All other workers will be ignored. The mean cluster occupancy will be
            calculated only using the allowed workers.
        z
rebalance-Nc                   s   g | ]} j | qS r   rh  r  r   r   r   r^    s     z'Scheduler.rebalance.<locals>.<listcomp>r   r\  c                   s&   g | ]}| j ks j | js|qS r   r  r  r   r   r   r   r^    s    
  r  r  )	rI   r  rt  r   r   r   _rebalance_find_msgsr  _rebalance_move_data)	r   rW  r   r  r  r(  missing_datar  r{  r   r   r   r  j  s0    I



zScheduler.rebalancezSet[Hashable] | NoneIterable[WorkerState]z0list[tuple[WorkerState, WorkerState, TaskState]])r   r  r   c              	     s  g }g }g } fdd|D }t dd |D t| }|D ]\}}	|jrtt j|j }
 j|j } j|j }nd}
d}tj}|j	r|	||
 kr|	|kr||	 }||
 }|
||t||t|j	f q<|	||
 k r<|	|k r<|	| }||
 }|
||t||f q<|r|s6 ddt|t|dd	 g S t| t| |r|r|d \}}}}}|D ]H}|d
k	r|j|krql|j}|| dkrqlg }d}|r|s|d \}}}}|| dkrڐq||j	k}|s|
t| q|D ]}t|| q|s ql|
|||f ||7 }||7 }||7 }||7 }|dk rvt|||t|||f n
t| |dk rt|||t||f n
t|  qJqlt| qJ|S )a  Identify workers that need to lose keys and those that can receive them,
        together with how many bytes each needs to lose/receive. Then, pair a sender
        worker with a recipient worker for each key, until the cluster is rebalanced.

        This method only defines the work to be performed; it does not start any network
        transfers itself.

        The big-O complexity is O(wt + ke*log(we)), where

        - wt is the total number of workers on the cluster (or the number of allowed
          workers, if explicitly stated by the user)
        - we is the number of workers that are eligible to be senders or recipients
        - kt is the total number of keys on the cluster (or on the allowed workers)
        - ke is the number of keys that need to be moved in order to achieve a balanced
          cluster

        There is a degenerate edge case O(wt + kt*log(we)) when kt is much greater than
        the number of allowed keys, or when most keys are replicated or cannot be
        moved for some other reason.

        Returns list of tuples to feed into _rebalance_move_data:

        - sender worker
        - recipient worker
        - task to be transferred
        c                   s   g | ]}|t |j jfqS r   )r   rn   r  r  r   r   r   r^    s    z2Scheduler._rebalance_find_msgs.<locals>.<listcomp>c                 s  s   | ]\}}|V  qd S r   r   )r   r   mr   r   r   rA    s     z1Scheduler._rebalance_find_msgs.<locals>.<genexpr>r   rp  r  r  rY  senders
recipientsZ
moved_keysNF)r   r
  r   r   r  r  r  r  r  r   r  r  iterr  heapqheapifyr  r   heappopheappushheapreplace)r   r   r  r  r	  r  Zmemory_by_workerZmean_memoryr  Z	ws_memoryZhalf_gapZ
sender_minZrecipient_maxZsnd_bytes_maxZsnd_bytes_minZrec_bytes_maxZrec_bytes_minr   snd_wsZts_iterr  r   Zskipped_recipientsZuse_recipientrec_wsZ	recipientr   r   r   r    s    1

	








zScheduler._rebalance_find_msgsz2'list[tuple[WorkerState, WorkerState, TaskState]]')r  r  r   c                   s:  t dd }|D ]"\}}}||j |j |j qtt|tj fdd| D  I dH }t t	}|D ],\}}}|j||j krl||j |j qltj fdd| D  I dH  | D ]\}	}
 
|	d|
d qƈ 
d	dtt|tt|t|d
 dd | D }|r.dt	|dS ddiS dS )aB  Perform the actual transfer of data across the network in rebalance().
        Takes in input the output of _rebalance_find_msgs(), that is a list of tuples:

        - sender worker
        - recipient worker
        - task to be transferred

        FIXME this method is not robust when the cluster is not idle.
        c                   S  s   t tS r   r   r  r   r   r   r   r    r  z0Scheduler._rebalance_move_data.<locals>.<lambda>c                 3  s   | ]\}}  ||V  qd S r   r  r   r  r  r   r   r   rA    s   z1Scheduler._rebalance_move_data.<locals>.<genexpr>Nc                 3  s    | ]\}}  ||V  qd S r   )r   )r   r{  rK  r  r   r   rA    s     r  rY  r  r  r  c                 S  s   h | ]}|D ]}|qqS r   r   )r   r{  r   r   r   r   rk    s       z1Scheduler._rebalance_move_data.<locals>.<setcomp>r  r  r   r\  )r   r   r  r  r   r  r  r  r  r  r  r"   r
  rt  )r   r  r  Zto_recipientsr  r  r  Zfailed_keys_by_recipientZ
to_sendersr{  rK  r  r   r  r   r    sH    


zScheduler._rebalance_move_datac	                   s  pdt   |dkst|r& jnt4 I dH T |dk	rf fdd |D }dd |D }n j}|dkr~t|}nt|t|}|dkrtd fdd|D }	d	d
 |	D }
|
rd|
dW  5 Q I dH R  S |r\t	t
}|	D ]H}t|j|@ }t||krt|t|| D ]}|| | q qtj fdd
| D  I dH  |	rft	t}t|	D ]}|jdkr|	| qr|t|j|@  }|dkr|	| qrt||t|j }|dksttt||j |D ]"}dd
 |jD ||j |j< qqrtj fdd| D  I dH  | D ]\}} |d|d qDq\ ddt|t||d W 5 Q I dH R X dS )a  Replicate data throughout cluster

        This performs a tree copy of the data throughout the network
        individually on each piece of data.

        Parameters
        ----------
        keys: Iterable
            list of keys to replicate
        n: int
            Number of replications we expect to see within the cluster
        branching_factor: int, optional
            The number of workers that can copy data in each generation.
            The larger the branching factor, the more data we copy in
            a single step, but the more a given worker risks being
            swamped by data requests.

        See also
        --------
        Scheduler.rebalance
        z
replicate-r   Nc                   s   h | ]} j | qS r   rh  r  r   r   r   rk    s     z&Scheduler.replicate.<locals>.<setcomp>c                 S  s   h | ]}|j tjkr|qS r   r  r  r   r   r   rk    s      z$Can not use replicate to delete datac                   s   h | ]} j | qS r   r  r   r   r   r   rk    s     c                 S  s   g | ]}|j s|jqS r   )r  r  r  r   r   r   r^    s      z'Scheduler.replicate.<locals>.<listcomp>r  r  c                   s*   g | ]"\}}  |jd d |D qS )c                 S  s   g | ]
}|j qS r   r   )r   tr   r   r   r^    s     z2Scheduler.replicate.<locals>.<listcomp>.<listcomp>)r   r   )r   r  r  r  r   r   r^    s     rp   c                 S  s   g | ]
}|j qS r   r  r   wwsr   r   r   r^  '  s    c                 3  s   | ]\}}  ||V  qd S r   r  r  r   r   r   rA  ,  s   z&Scheduler.replicate.<locals>.<genexpr>zreplicate-addr  r  r  )rY  r  z	key-countzbranching-factor)rI   r  r  r[   workers_listr  r
  r   r  r   r   r   r  randomsampler  r  r  r  r   r  rr  r&  r   r  r  )r   rW  r   r  r  Zbranching_factordeletelockr  r  r  Zdel_worker_tasksr  Zdel_candidatesr  ZgathersZ	n_missingr*  r{  rK  r   r  r   r    s|     
 







zScheduler.replicater   zint | float | Noner   z0Callable[[WorkerState], Hashable] | bytes | Nonez	list[str])memory_ratior  r  minimumtarget	attributer   c              
     s2  |dk	r|dkrt | j| }|dk	r@|dk r2d}t | j| }|dkrT|dkrTd}t  |stdd | j D rg W  5 Q R  S |dkrtd}t|trt	j
drt|}t|| j dd	  D }d
d	  D t| }	t }
fdd}t|d}g }t | j}|r| }|dkrdtdd | D rdq|r|t |  |k rq|	|| 8 }	|dk	r|t |  |pdks|dk	r|	||
 kr|| |t | 8 }nqq0 fdd|D }|rtd| |W  5 Q R  S Q R X dS )a  
        Find workers that we can close with low cost

        This returns a list of workers that are good candidates to retire.
        These workers are not running anything and are storing
        relatively little data relative to their peers.  If all workers are
        idle then we still maintain enough workers to have enough RAM to store
        our data, with a comfortable buffer.

        This is for use with systems like ``distributed.deploy.adaptive``.

        Parameters
        ----------
        memory_ratio : Number
            Amount of extra space we want to have for our stored data.
            Defaults to 2, or that we want to have twice as much memory as we
            currently have data.
        n : int
            Number of workers to close
        minimum : int
            Minimum number of workers to keep around
        key : Callable(WorkerState)
            An optional callable mapping a WorkerState object to a group
            affiliation. Groups will be closed together. This is useful when
            closing workers must be done collectively, such as by hostname.
        target : int
            Target number of workers to have after we close
        attribute : str
            The attribute of the WorkerState object to return, like "address"
            or "name".  Defaults to "address".

        Examples
        --------
        >>> scheduler.workers_to_close()
        ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

        Group workers by hostname prior to closing

        >>> scheduler.workers_to_close(key=lambda ws: ws.host)
        ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

        Remove two workers

        >>> scheduler.workers_to_close(n=2)

        Keep enough workers to have twice as much memory as we we need.

        >>> scheduler.workers_to_close(memory_ratio=2)

        Returns
        -------
        to_close: list of worker addresses that are OK to close

        See Also
        --------
        Scheduler.retire_workers
        Nr   rV  c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     z.Scheduler.workers_to_close.<locals>.<listcomp>r   r  c                 S  s$   i | ]\}}|t d d |D qS )c                 s  s   | ]}|j V  qd S r   r   r  r   r   r   rA    s     8Scheduler.workers_to_close.<locals>.<dictcomp>.<genexpr>r   rJ  r   r   r   r     s     z.Scheduler.workers_to_close.<locals>.<dictcomp>c                 S  s$   i | ]\}}|t d d |D qS )c                 s  s   | ]}|j V  qd S r   rb  r  r   r   r   rA    s     r#  r$  rJ  r   r   r   r     s      c                   s*   t dd |  D  } |   }||fS )Nc                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     z<Scheduler.workers_to_close.<locals>._key.<locals>.<listcomp>)r_  )rs  Zis_idlebytes)group_bytesr;  r   r   _key  s    
z(Scheduler.workers_to_close.<locals>._keyr   c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     c                   s$   g | ]}| D ]}t | qqS r   )r   )r   gr  )r!  r;  r   r   r^    s     
  zSuggest closing workers: %s)r
  r  r^   r  rt  r  r  r   r%  r   r   r   r  rN   r   r  r   rM  r%  r_  r  r  r  )r   rW  r  r  r  r  r   r!  Zlimit_byteslimitr[  r'  r  to_closeZn_remainrs  r{  r   )r!  r&  r;  r   r  ?  sd    C



"

zScheduler.workers_to_close)namesrT  r&  r  zlist[str] | Nonezlist | Noner   )r  r+  rT  r&  r  r  r   c                  s  |pdt   }j4 I dH  |dk	rr|dk	r:td|rJtd| dd |D   fddj D }n6|dk	rfdd|D }nfd	djf |D }|si W  5 Q I dH R  S d
}jd }	|	j	st
t d
ddd}	d}zg }
|D ]v}td|j t|j}|	| |j}tj|| j|j d|jj|d |
j||||||d q|	  ttj|
 I dH }|dd W 5 |r|	  X W 5 Q I dH R X dd|d t|ddi |S )aK  Gracefully retire workers from cluster. Any key that is in memory exclusively
        on the retired workers is replicated somewhere else.

        Parameters
        ----------
        workers: list[str] (optional)
            List of worker addresses to retire.
        names: list (optional)
            List of worker names to retire.
            Mutually exclusive with ``workers``.
            If neither ``workers`` nor ``names`` are provided, we call
            ``workers_to_close`` which finds a good set.
        close_workers: bool (defaults to False)
            Whether or not to actually close the worker explicitly from here.
            Otherwise we expect some external job scheduler to finish off the
            worker.
        remove: bool (defaults to True)
            Whether or not to remove the worker metadata immediately or else
            wait for the worker to contact us.

            If close_workers=False and remove=False, this method just flushes the tasks
            in memory out of the workers and then returns.
            If close_workers=True and remove=False, this method will return while the
            workers are still in the cluster, although they won't accept new tasks.
            If close_workers=False or for whatever reason a worker doesn't accept the
            close command, it will be left permanently unable to accept new tasks and
            it is expected to be closed in some other way.

        **kwargs: dict
            Extra options to pass to workers_to_close to determine which
            workers we should drop

        Returns
        -------
        Dictionary mapping worker ID/address to dictionary of information about
        that worker for each retired worker.

        If there are keys that exist in memory only on the workers being retired and it
        was impossible to replicate them somewhere else (e.g. because there aren't
        any other running workers), the workers holding such keys won't be retired and
        won't appear in the returned dict.

        See Also
        --------
        Scheduler.workers_to_close
        zretire-workers-Nz(names and workers are mutually exclusivezRetire worker names %sc                 S  s   h | ]}t |qS r   )r~   r   r   r   r   r   rk    s     z+Scheduler.retire_workers.<locals>.<setcomp>c                   s   h | ]}t |j kr|qS r   )r~   r   r  )	names_setr   r   rk    s      c                   s    h | ]}| j kr j | qS r   rh  r  r   r   r   rk    s   
c                   s   h | ]} j | qS r   rh  r  r   r   r   rk    s    Frz   Tr  )Zpoliciesregisterr9  rk  zRetiring worker %sr  r.  r   r  )r  r  r&  r  r  zretire-workers)rY  r  rY  Zretired) rI   r  r  r  r  r  rt  r  r  r  r2   r   r>  r   r3   Z
add_policyr   r   r>   Zclosing_gracefullyr  rP  r   r  _track_retire_workerZrun_oncer   r  r  r%  r  r  )r   r  r+  rT  r&  r  r  r(  Zstop_ammrz   Zcorosr  policyr  Zworkers_infor   )r-  r   r   r    s    9



    

  zScheduler.retire_workersr   r3   r>   r   )r  r1  r  r  r&  r  r   c                   s   |  s4tdtdt|jd }t|I d H  q |jr^| j|j	 
d|j|d d i fS td|j	 |r| j|j	d||dI d H  n|r| |j	 td	|j	 |j	| fS )
Nr  r[  i  r  r/  z;All unique keys on worker %s have been replicated elsewhereT)r+  r  r  zRetired worker %s)r  r   r   r
  r   r  r  Zno_recipientsr  r   rP  r   r  r  r  r  r  r  )r   r  r1  r  r  r&  r  poll_intervalr   r   r   r0  U  s4    	    zScheduler._track_retire_workerc                 C  s   || j krdS | j | }g }|D ]D}| j|}|dk	rZ|jdkrZ||jkrd| || q || q |r|szdt  }| |d||d dS )z
        Learn that a worker has certain keys

        This should not be used in practice and is mostly here for legacy
        reasons.  However, it is sent by workers from time to time.
        z	not foundNrn   zredundant-replicas-remove-replicasr-  r\  )	r  r  r   rr  r  r/  r  rI   r  )r   r"  r   r  r  Zredundant_replicasr  r  r   r   r   r    s*    


	zScheduler.add_keysr  )r  r   c          
        s    fdd|  D }td| |  D ]\}} j|}|dkrT |dd}d|_||d}|dkrx|| |D ]$} j| }	|	|j	kr| 
||	 q| d|t|d	 q*|rЈ jt||d
 dS )z
        Learn that new data has entered the network from an external source

        See Also
        --------
        Scheduler.mark_key_in_memory
        c                   s$   i | ]\}}| fd d|D qS )c                   s   g | ]}  |qS r   r  )r   vvr   r   r   r^    s     z4Scheduler.update_data.<locals>.<dictcomp>.<listcomp>r   rJ  r   r   r   r     s      z)Scheduler.update_data.<locals>.<dictcomp>zUpdate data %sNrn   rB  r   r  )r.  r  r  r  )r  r  r  r  r   r  rr  r  r  r  r/  r  r  r  )
r   r  r   r   r  r  r  Z	ts_nbytesr  r  r   r   r   r    s"    


zScheduler.update_data)r  r   r   c                C  s   d S r   r   )r   r  r   r   r   r   r    s    zScheduler.report_on_key)r  r   r   c                C  s   d S r   r   )r   r  r   r   r   r   r    s    r  c                C  s   |d k|d kkr$t d|d||d krF|d k	s8t| j|}n|j}|d k	r^t|}n
d|d}|d k	r| j|||d d S )Nz0ts and key are mutually exclusive; received key=z, ts=r  r/  r  )r  r  r  r   r  _task_to_report_msgr  )r   r  r  r   r2  r   r   r   r    s    

1sc           	   	     s   t jdstd dS t|}|r0t|}|r>t|}|rLt|}|rX|| nd}t	|rp|I dH }zhzN| j
tjkr|dkr|| }n
|| |}||I dH  t|I dH  qtW n tk
r   Y nX W 5 |r|| | X dS )z
        Provides a data Comm to external requester

        Caution: this runs arbitrary Python code on the scheduler.  This should
        eventually be phased out.  It is mostly used by diagnostics.
        r  zTried to call 'feed' route with custom functions, but pickle is disallowed.  Set the 'distributed.scheduler.pickle'config value to True to use the 'feed' route (this is mostly commonly used with progress bars)N)r   r   r   r  r_  r)   r  rN   rq  rr  r   r>   r  ro  r  r  r  )	r   rW  r  setuprN  rk  r  rr  responser   r   r   r    s4    








zScheduler.feedzstr | Collection[str])r"  topicr  r   c                 C  s"   t |tr||d< | || d S )Nr"  )r   r   r  )r   r"  r9  r  r   r   r   r  	  s    
zScheduler.log_worker_eventc                 C  s4   t | | |  }|d  D ]}|d= |d= q|S )Nr  r   r   )WorkerStatusPluginr  rt  )r   rW  identrK  r   r   r   r    s    
z!Scheduler.subscribe_worker_statusc                   sB   |d k	r*t t j|} fdd|D S dd  j D S d S )Nc                   s$   i | ]}|d d  j | jD qS )c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^    s     7Scheduler.get_processing.<locals>.<dictcomp>.<listcomp>)r  rm   r  r   r   r   r     s      z,Scheduler.get_processing.<locals>.<dictcomp>c                 S  s"   i | ]\}}|d d |j D qS )c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^    s     r<  r  rn  r   r   r   r     s     )r   rg  r^  r  r  r   r  r   r   r   r    s    zScheduler.get_processingzdict[str, list[str]])r   r   c                   s2   |d k	r fdd|D S dd  j  D S d S )Nc                   s2   i | ]*}|| j kr*d d  j | jD ng qS )c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^  $  s     4Scheduler.get_who_has.<locals>.<dictcomp>.<listcomp>r  rT  r   r   r   r   #  s
   z)Scheduler.get_who_has.<locals>.<dictcomp>c                 S  s"   i | ]\}}|d d |j D qS )c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^  +  s     r>  r  )r   r  r  r   r   r   r   *  s     )r  r  r   r   r   r   r   r  !  s    
zScheduler.get_who_hasc                   s>   |d k	r&t  j|} fdd|D S dd  j D S d S )Nc                   s2   i | ]*}|| j kr*d d  j | jD ng qS )c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^  2  s     5Scheduler.get_has_what.<locals>.<dictcomp>.<listcomp>)r  r   r  r   r   r   r   1  s
   z*Scheduler.get_has_what.<locals>.<dictcomp>c                 S  s"   i | ]\}}|d d |j D qS )c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^  8  s     r@  r_  rn  r   r   r   r   8  s      rg  r^  r  r  r=  r   r   r   r  .  s    
zScheduler.get_has_whatc                   s>   |d k	r&t  j|} fdd|D S dd  j D S d S )Nc                   s$   i | ]}| j kr| j | jqS r   )r  r   r  r   r   r   r   =  s     
  z(Scheduler.get_ncores.<locals>.<dictcomp>c                 S  s   i | ]\}}||j qS r   r   rn  r   r   r   r   ?  s      rA  r=  r   r   r   r  :  s    zScheduler.get_ncoresc                   s"    j |d} fdd| D S )Nrh  c                   s(   i | ] \}} j | jtjkr||qS r   )r  r   r>   r  )r   r  r  r   r   r   r   C  s      z0Scheduler.get_ncores_running.<locals>.<dictcomp>)r  r  )r   r  r  r   r   r   r  A  s    
zScheduler.get_ncores_runningc           	        s   |d k	rt |}t }|rd| } j| }|jdkrN|dd |jD  q|jdkr|| qtt }|D ]}|j	rp||j	j
 |j qpndd  jD }|si S tj fdd| D  I d H }d	d t||D }|S )
Nrj   c                 S  s   g | ]
}|j qS r   r   r  r   r   r   r^  O  s     z,Scheduler.get_call_stack.<locals>.<listcomp>rm   c                 S  s   i | ]
}|d qS r   r   r  r   r   r   r   X  s      z,Scheduler.get_call_stack.<locals>.<dictcomp>c                 3  s$   | ]\}}  |j|d V  qdS )r   N)r@   r  )r   r  rK  r   r   r   rA  ^  s     z+Scheduler.get_call_stack.<locals>.<genexpr>c                 S  s   i | ]\}}|r||qS r   r   )r   r  r{  r   r   r   r   `  s       )r  r   r%  r  rr  r  r  r  r   r  r   r  r  r  r  r  r  r  )	r   r   r  rm   r  r  r  r  r8  r   r   r   r  G  s,    



zScheduler.get_call_stackz'dict[str, dict[str, float]]'c           
        s6  dd dD } j ddidI dH }| D ](}| D ]\}}|d | | q8q, j dd	idI dH }| D ](}| D ]\}}|d
 | | qqtt j}t|  fddtd|D }t	j
| I dH }|D ](}| D ]\}}|d | | qqi }|D ] }	dd ||	  D ||	< q|S )ay  
        Run a benchmark on the workers for memory, disk, and network bandwidths

        Returns
        -------
        result: dict
            A dictionary mapping the names "disk", "memory", and "network" to
            dictionaries mapping sizes to bandwidths.  These bandwidths are
            averaged over many workers running computations across the cluster.
        c                 S  s   i | ]}|t tqS r   r  r,  r   r   r   r   n  s     z0Scheduler.benchmark_hardware.<locals>.<dictcomp>)r   rn   networkr.  Zbenchmark_diskr  Nr   Zbenchmark_memoryrn   c                   s"   g | ]\}}  |j|d qS )r  )r@   Zbenchmark_network)r   r  r  r   r   r   r^    s    z0Scheduler.benchmark_hardware.<locals>.<listcomp>rV  rD  c                 S  s"   i | ]\}}|t |t| qS r   )r   r
  )r   rd  Z	durationsr   r   r   r     s    )r  rt  r  r  r  r  r  r{   r   r  r  )
r   outr{  r)  rd  r  r  rV  	responsesmoder   r   r   r  c  s4    



zScheduler.benchmark_hardwarec                   sr   |d k	r fdd|D }ndd  j  D }|rntdd }| D ]\}}|t|  |7  < qHt|}|S )Nc                   s   i | ]}| j | jqS r   )r  r   r   r   r   r   r     s      z(Scheduler.get_nbytes.<locals>.<dictcomp>c                 S  s"   i | ]\}}|j d kr||j qS )r   rb  )r   r   r  r   r   r   r     s     
  c                   S  s   dS r  r   r   r   r   r   r    r  z&Scheduler.get_nbytes.<locals>.<lambda>)r  r  r   r'   r   )r   r   summaryr{  rF  r   rK  r   r   r   r+    s    zScheduler.get_nbytesc                 C  sN   ddl m} tjds td|p&i }| dd|d || |||||dS )	znRun a function within this process

        See Also
        --------
        Client.run_on_scheduler
        r   )runr  zCannot run function as the scheduler has been explicitly disallowed from deserializing arbitrary bytestrings using pickle via the 'distributed.scheduler.pickle' configuration setting.r  zrun-function)rY  r  )r  r  r  wait)Zdistributed.workerrJ  r   r   r   r  r  )r   rW  r  r  r  rK  rJ  r   r   r   r    s    zScheduler.run_functionr   )r   r  r   c                 C  sR   | j }|d d D ].}||ks0t|| ttfs8i ||< || }q|||d < d S )NrB  )r  r   r   r  )r   r   r  r  r  r   r   r   r    s    
zScheduler.set_metadatarC  c                 C  sJ   | j }z|D ]}|| }q|W S  tk
rD   |tkr>| Y S  Y nX d S r   )r  r  r_   )r   r   r  r  r  r   r   r   r    s    
zScheduler.get_metadataz dict[str, Collection[str] | str])r"  c                 C  s:   |  D ],\}}| j| }t|tr*|h}t||_qd S r   )r  r  r   r~   r   r  )r   r"  r  r  r  r   r   r   r    s
    

zScheduler.set_restrictionsc                   s^   i }| j  D ]J}|j t fdddD r d  d  d  d  d d	||j< q|S )
Nc                 3  s   | ]}  |V  qd S r   )r   )r   r  rb  r   r   rA    s   z3Scheduler.get_task_prefix_states.<locals>.<genexpr>>   rm   rj   ri   rn   ro   rn   ro   ri   rm   rj   )rn   ro   ri   rm   rj   )r  rt  rb  r_  r   )r   rr  r  r   rL  r   r    s    z Scheduler.get_task_prefix_statesc                   s    fdd|D S )Nc                   s(   i | ] }|| j kr  j | jnd qS r   r  rT  r   r   r   r     s     z-Scheduler.get_task_status.<locals>.<dictcomp>r   r?  r   r   r   r    s    
zScheduler.get_task_statusc                 C  sB   ddl m} |j| jkr&| ||  | j|j }|j|||dS )Nr   )TaskStreamPlugin)r9  r>  r*  )#distributed.diagnostics.task_streamrM  r   r  r  Zcollect)r   r9  r>  r*  rM  r  r   r   r   r    s
    zScheduler.get_task_streamc                 C  s   t | |d}| | d S )Nr   r   )CollectTaskMetaDataPluginr  r  r   r   r   r    s    zScheduler.start_task_metadatac                   sf    fddt | j D }t|dkrBtd  dt| d|d }| j|jd |j|jd	S )
Nc                   s$   g | ]}t |tr|j kr|qS r   )r   rP  r   r  rU  r   r   r^    s   
 
z0Scheduler.stop_task_metadata.<locals>.<listcomp>r  zAExpected to find exactly one CollectTaskMetaDataPlugin with name z but found rY  r   rU  )r  rr  )	r  r  rt  r
  r  r  r   r  rr  )r   r   r  r  r   rU  r   r    s    
zScheduler.stop_task_metadatac                   s*   || j |< | jtd||ddI dH }|S )z;Registers a worker plugin on all running and future workersz
plugin-addr.  r  r   rE  N)r  r  r   r   rW  r  r   rG  r   r   r   r    s
    
z Scheduler.register_worker_pluginc                   sT   z| j | W n$ tk
r4   td| dY nX | jtd|ddI dH }|S )Unregisters a worker pluginzThe worker plugin  does not existzplugin-remover.  r   rE  N)r  r%  r  r  r  r   r   rW  r   rG  r   r   r   r    s    z"Scheduler.unregister_worker_pluginc                   s,   || j |< | jtd||dddI dH }|S )z7Registers a setup function, and call it on every workerZ
plugin_addrQ  Tr  r   N)r  r  r   rR  r   r   r   r  !  s    
zScheduler.register_nanny_pluginc                   sV   z| j | W n$ tk
r4   td| dY nX | jtd|dddI dH }|S )rS  zThe nanny plugin rT  Zplugin_removerU  TrW  N)r  r%  r  r  r  r   rV  r   r   r   r  +  s    
 z!Scheduler.unregister_nanny_pluginrq   rr   r  c                 K  s(   | j |||f|\}}}| || |S )as  Transition a key from its current state to the finish state

        Examples
        --------
        >>> self.transition('x', 'waiting')
        {'x': 'processing'}

        Returns
        -------
        Dictionary of recommendations for future transitions

        See Also
        --------
        Scheduler.transitions: transitive version of this function
        )r  r  )r   r  r  r  r  r  r  r  r   r   r   r  7  s      
zScheduler.transition)r  r  r   c                 C  s(   i }i }|  |||| | || dS r  )r  r  )r   r  r  r  r  r   r   r   rs  S  s    zScheduler.transitionsrR  )rU  r   c                   s
   | j | S )zRPC hook for :meth:`SchedulerState.story`.

        Note that the msgpack serialization/deserialization round-trip will transform
        the :class:`Transition` namedtuples into regular tuples.
        )r  )r   rU  r   r   r   r  ^  s    zScheduler.get_storyc                C  sv   z| j | }W n( tk
r6   td| d Y dS X |jdkrFdS |r`|jr`|jj|kr`dS | j|di|d dS )uM  Reschedule a task.

        This function should only be used when the task has already been released in
        some way on the worker it's assigned to — either via cancellation or a
        Reschedule exception — and you are certain the worker will not send any further
        updates about the task to the scheduler.
        zAttempting to reschedule task z<, which was not found on the scheduler. Aborting reschedule.Nrm   ri   rm  )r  r  r  r_  rr  r  r   rs  )r   r  r"  r  r  r   r   r   r  f  s    


zScheduler._reschedulec                 C  sn   | j | }|r|j| i |_|j D ]>\}}d|j|< | j|d }|d kr`i  | j|< }|||< q*dS )Nr   r\  )r  r   r  r   r  r   )r   r"  r   r  rt  Zquantityrv  r   r   r   r    s    


zScheduler.add_resourcesc                 C  sD   | j | }|jD ].}| j|d }|d kr8i  | j|< }||= qd S r   )r  r   r   )r   r"  r  rt  rv  r   r   r   r    s    

zScheduler.remove_resourcesc                 C  sX   || j kr| j | }t|tr&t| }t|ts>td||rLt|}nt|}|S )z
        Coerce possible input addresses to canonical form.
        *resolve* can be disabled for testing with fake hostnames.

        Handles strings, tuples, or aliases.
        z+addresses should be strings or tuples, got )r  r   r   r;   r~   r  r:   r9   )r   rj  resolver   r   r   r^    s    




zScheduler.coerce_addressc                   sX   |dkrt | jS t }|D ]2 d kr4|  q| fdd| jD  qt |S )z
        List of qualifying workers

        Takes a list of worker addresses or hostnames.
        Returns a list of all worker addresses that match
        N:c                   s   h | ]} |kr|qS r   r   )r   rx  r  r   r   rk    s      z)Scheduler.workers_list.<locals>.<setcomp>)r  r  r   r  r  )r   r  rF  r   rZ  r   r    s    
zScheduler.workers_listc	                   s   |d krj }ntj t|@ }|r:tjjjdS tj fdd|D ddiI d H }	dd |	D }	|rtj|	 }
ntt	||	}
|
S )Nr9  r>  c                 3  s&   | ]} |j d V  qdS ))r9  r>  r  r  N)r@   r.   r  r  r   r  r9  r>  r   r   rA    s   z(Scheduler.get_profile.<locals>.<genexpr>r  Tc                 S  s   g | ]}t |ts|qS r   r   r  r   r{  r   r   r   r^    s     
 z)Scheduler.get_profile.<locals>.<listcomp>)
r  r   r.   r  r  r  r  r   r   r  )r   rW  r  r   r  Zmerge_workersr9  r>  r  r  r8  r   r\  r   r    s"    zScheduler.get_profilez'Iterable[str] | None'z'float | None'z'str | float | None')r  r9  r>  profile_cycle_intervalc                   sN  |pt jd t dd |d kr,j}ntjt|@ }tjfdd|D ddiI d H }dd	 |D }d
d	 t	t
dd |D   fddD }dd |D }dd	 |D }tt
|dti}	d}
|	D ]l\}}|    }||
kr|}
| D ]}||dg q | D ]"\}}|| d d  |7  < qq||dS )Nz distributed.worker.profile.cycler   r  c                 3  s"   | ]}  |jd V  qdS )r[  N)r@   Zprofile_metadatar  )r   r9  r>  r   r   rA    s     z1Scheduler.get_profile_metadata.<locals>.<genexpr>r  Tc                 S  s   g | ]}t |ts|qS r   r]  r^  r   r   r   r^    s     
 z2Scheduler.get_profile_metadata.<locals>.<listcomp>c                 S  s"   g | ]\}}|t td |fqS r  )r   r    )r   rI   rs  r   r   r   r^    s   c                 s  s   | ]}|d  V  qdS )countsNr   r   rK  r   r   r   rA    s     c                   s   | d     S r  r   )r  )dtr   r   r     r  z0Scheduler.get_profile_metadata.<locals>.<lambda>c                 S  s.   i | ]&}|d  D ]\}}|D ]
}|g qqqS rC  r   )r   rK  r  r)  r   r   r   r   r     s    
     z2Scheduler.get_profile_metadata.<locals>.<dictcomp>c                 S  s   g | ]}|d  qS rC  r   ra  r   r   r   r^    s     r  r   rB  r  )r`  r   )r   r   r   r)   r  r   r  r  	itertoolsr   r   r  r   rt  r  r  )r   r  r9  r>  r_  r  r`  r   Zgroups1Zgroups2lastr  r)  ttrK  r   r   )rb  r   r9  r>  r   get_profile_metadata  sH    



zScheduler.get_profile_metadatar	  r   )r9  
last_countc           .        sv  t  }tj| j|d| jd|d| jd|dg I d H \}}}ddlm   fdd}	t|	|||f\}}}| j|d}
t|
}t	t
}|
D ]2}|d	 D ]$}||d
   |d |d  7  < qqd}t| D ]"}|d| dt||  d7 }qddlm} ddlm} ||
}|dd\}}
|j| ddlm}m} || dd}|  || dd}|  ddlm} || |dd}|  ddlm}m} || |d}ddlm}m} dd l} ddlm}! djt|| ||| j t| j!t"dd | j!# D t$t"dd | j!# D |t%j&| j&d
}"|f d |"i|}"|!|"d!d"}"|!|d#d"}|!|d$d"}|!|d%d"}|!|
d&d"}
|!|j'd'd"}|!|j'd(d"}|!|j'd)d"}#|!|j'd*d"}||"|
|#||||||g	dd+}$dd,l(m)}% dd-l*m+}&m,}' t-d.d/|}(|&|(d0|d1 t.j/0t.j/1t.j/2t3d2d3})|% }*|*j4j56|) |*7d4}+|'|$|(|+d5 t8|(},|,9 }-W 5 Q R X W 5 Q R X |-S )6Nr=  T)r   r9  )r  r9  r   r.   c                   s      | } j|dd\}}|S )Nstretch_bothsizing_mode)Z	plot_dataZplot_figure)rr  re  figuresourcerh  r   r   profile_to_figure%  s    
z7Scheduler.performance_report.<locals>.profile_to_figurer$  rY  r>  r9  r	  z
<li> z time: z </li>)task_stream_figure)
rectanglesri  rj  )BandwidthTypesBandwidthWorkers)SystemMonitor)rg  rk  )_BOKEH_STYLES_KWARGSSchedulerLogs)DivTabs)TabPanela  
        <h1> Dask Performance Report </h1>

        <i> Select different tabs on the top for additional information </i>

        <h2> Duration: {time} </h2>
        <h2> Tasks Information </h2>
        <ul>
         <li> number of tasks: {ntasks} </li>
         {tasks_timings}
        </ul>

        <h2> Scheduler Information </h2>
        <ul>
          <li> Address: {address} </li>
          <li> Workers: {nworkers} </li>
          <li> Threads: {threads} </li>
          <li> Memory: {memory} </li>
          <li> Dask Version: {dask_version} </li>
          <li> Dask.Distributed Version: {distributed_version} </li>
        </ul>

        <h2> Calling Code </h2>
        <pre>
{code}
        </pre>
        c                 s  s   | ]}|j V  qd S r   rB  r  r   r   r   rA    s     z/Scheduler.performance_report.<locals>.<genexpr>c                 s  s   | ]}|j V  qd S r   r"  r  r   r   r   rA    s     )
rI   Zntaskstasks_timingsr   Znworkersr&  rn   r<  Zdask_versionZdistributed_versiontextZSummary)childtitlezWorker Profile (compute)zWorker Profile (administrative)z"Scheduler Profile (administrative)zTask StreamzBandwidth (Workers)zBandwidth (Types)SystemzScheduler Logs)tabsrk  )get_env)output_filesavez.html)r%  zDask Performance Report)filenamer|  rH  r  Z	templateszperformance_report.html)r  template):rI   r  r  r  r"  r.   rg  r  r
  r   r   rM  r   r&   Z*distributed.dashboard.components.schedulerro  rN  rp  re  r  rq  rr  Z'distributed.dashboard.components.sharedrs  rt  ru  Zbokeh.modelsrv  rw  Zdistributed.dashboard.corerx  r0  r   r  r   rt  r%   r   __version__rootZbokeh.core.templatesr  Zbokeh.plottingr  r  r*   r8  r9  rL  dirnameabspath__file__loaderZ
searchpathr  r+   rC  read).r   r9  rg  r<  rH  r>  rZ  r   r  rn  Ztask_streamZtotal_tasksZ	timespentr)  xry  r   ro  rp  Zrectsrm  rq  rr  r  r  rs  Zsysmonrt  ru  r  rv  rw  r"  rx  htmlsystemr~  r  r  r  r;  Ztemplate_directoryZtemplate_environmentr  rH  re  r   rh  r   r    s    
 
$ 
&     

zScheduler.performance_reportc                   s    | j d|d||dI d H }|S )Nr  )r.  r  )r  r  r   r  )r   r  r  r   r  r   r   r   r    s      zScheduler.get_worker_logs)r9  r  r   c              	   C  s   t  |f}t|tsN|D ]2}| j| | | j|  d7  < | || qnv| j| | | j|  d7  < | || t| j	 D ]8}z|
|| W q tk
r   tjddd Y qX qd S )Nr  r  Tr  )rI   r   r~   ry   r  r  _report_eventr  r  rt  r  r  r  r  )r   r9  r  eventr  r  r   r   r   r    s    

zScheduler.log_eventc                   s6   d||d  fdd| j | D }| j|i d d S )Nr  )r.  r9  r  c                   s   i | ]}| gqS r   r   )r   r   rE  r   r   r     s      z+Scheduler._report_event.<locals>.<dictcomp>)r  )r  r  )r   r   r  r  r   rE  r   r    s    zScheduler._report_eventc                 C  s   | j | | d S r   )r  r  r   r9  r   r   r   r   r	    s    zScheduler.subscribe_topicc                 C  s   | j | | d S r   )r  r%  r  r   r   r   r
    s    zScheduler.unsubscribe_topicc                 C  s&   |d k	rt | j| S tt | jS d S r   )r   ry   r"   )r   r9  r   r   r   r    s    zScheduler.get_eventsc                   s@   d kri t j fddjD  I d H }ttj|S )Nc                 3  s*   | ]"} |j |d dV  qdS )r   )recentr9  N)r@   Zget_monitor_infor   r  r  r   startsr   r   rA    s   z4Scheduler.get_worker_monitor_info.<locals>.<genexpr>)r  r  r  r   r  )r   r  r  r  r   r  r   get_worker_monitor_info  s    
z!Scheduler.get_worker_monitor_infoc                   sv   t  }d| }| j D ]V}|j|| j k r|j|dtt| j  k rtd| j| | j	|j
|dI d H  qd S )Nzcheck-worker-ttl-r^  z9Worker failed to heartbeat within %s seconds. Closing: %sr  )rI   r  rt  r   r  ra  r
  r  r_  r  r   )r   rX  r  r  r   r   r   r    s    
zScheduler.check_worker_ttlc                 C  s   | j s
t| jtjtjfkr d S | j| jkr>| j| _d | _d S | j	sb| j
sbtdd | j D rld | _d S | jszt | _t | j| j  kr| jsttdt| j  | j| j d S )Nc                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^    s     z(Scheduler.check_idle.<locals>.<listcomp>z)Scheduler closing after being idle for %s)r  r  r   r>   rL  rX  r  r  r  rl   r  r_  r  rt  rI   r  r  r&   r  r  r  r   r   r   r   r    s0    

zScheduler.check_idlec                 C  s,  |dkrt jd}t|}t| j| j }t| j	| | }t| j}| j
 D ]}|t|j7 }||krT q~qTt||}| js| jr| j
std|}tdd | j
 D }tdd | j
 D }d}|d| kr|dkrd	t| j
 }t||}	|	t| j
kr|	S |  }
t| j
t|
 S dS )
a  Desired number of workers based on the current workload

        This looks at the current running tasks and memory use, and returns a
        number of desired workers.  This is often used by adaptive scheduling.

        Parameters
        ----------
        target_duration : str
            A desired duration of time for computations to take.  This affects
            how rapidly the scheduler will ask to scale.

        See Also
        --------
        distributed.deploy.Adaptive
        Nz$distributed.adaptive.target-durationr  c                 s  s   | ]}|j V  qd S r   r"  r  r   r   r   rA  J  s     z,Scheduler.adaptive_target.<locals>.<genexpr>c                 s  s   | ]}|j V  qd S r   rb  r  r   r   r   rA  K  s     r   g333333?rV  )r   r   r   r)   r
  rl   r  r  ceilr  r  rt  rm   r   r  r   r   r  )r   Ztarget_durationZqueued_occupancycpuZtasks_readyr  r)  usedrn   r   r*  r   r   r   r     s2    



zScheduler.adaptive_target)rj  r   r  r   c                C  sb   i }i }|D ]6}| j | }|js$tdd |jD ||< |j||< q| j| d|||d dS )zAsynchronously ask a worker to acquire a replica of the listed keys from
        other workers. This is a fire-and-forget operation which offers no feedback for
        success or failure, and is intended for housekeeping and not for computation.
        c                 S  s   g | ]
}|j qS r   r  r  r   r   r   r^  c  s     z6Scheduler.request_acquire_replicas.<locals>.<listcomp>zacquire-replicas)r.  r  r   r  N)r  r  r  r   r  rP  )r   rj  r   r  r  r   r  r  r   r   r   request_acquire_replicasW  s    


z"Scheduler.request_acquire_replicasc                C  s^   | j | }|D ]2}| j| }| jr4t|jdks4t| || q| j| d||d dS )ae  Asynchronously ask a worker to discard its replica of the listed keys.
        This must never be used to destroy the last replica of a key. This is a
        fire-and-forget operation, intended for housekeeping and not for computation.

        The replica disappears immediately from TaskState.who_has on the Scheduler side;
        if the worker refuses to delete, e.g. because the task is a dependency of
        another task running on it, it will (also asynchronously) inform the scheduler
        to re-add itself to who_has. If the worker agrees to discard the task, there is
        no feedback.
        r  r3  r-  N)	r  r  r  r
  r  r  r,  r  rP  )r   rj  r   r  r  r  r  r   r   r   request_remove_replicaso  s    


z!Scheduler.request_remove_replicas)Nr  r  NNNNNNNNNNNr   NNNr  Nr   r   NFF)F)NN)N)NNNNNNNNNNr   Nr   N)NNNNNNNNNNr   Nr   NNN)NNN)NNNNN)N)F)F)NN)NNN)N)N)F)NN)N)rB  )NN)NN)NNNNFrV  )N)Nr  T)N)NNNN)NNNN)NNNNrV  TTN)NNNNNNr   )N)Nr   N)N)NNNr6  )N)N)N)N)N)N)N)NT)r   NT)N)N)NNN)N)N)N)N)N)N)T)NNFFTNNN)Nr   NN)r	  N)NNF)N)FN)N)r   r   r   r   r  r   r  r  r   r   r   r  r  r   r  r  r4  r@  r  r  r^   r  r  r  r  rt  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  ra   r  r  r  r  r  r  r  r  r  r  r   r  rx  r  r  r  r  r  r  r  r  r  r  r  r  r   r  r  r  r  r  r  r0  r  r  r   r  r  r  r  r  r  r  r  r  r  r  r+  r  r  r_   r  r  r  r  r  r  r  r  r  r  r  r  rs  r  r  r  r  r^  r  r  rf  r  r  r  r  r	  r
  r  r  r  r  r  r  r  __classcell__r   r   r+  r   r    sL  
K                        
  H%AW l  	             
<               
  %$)    
  !	I)1&
!! ' &     
11 @9*   h ?<       
v         *!#      *	7
	

 	       %   "4   #	7r  r   r  c                 C  s`   | j dkrd| jdS | j dkr,d| jdS | j dkrX| j}|sDtd| j|j|jdS d S d S )	Nrp   r  r/  rn   r  ro   r5  r6  )rr  r  r  r  r  r  )r  r7  r   r   r   r5    s    


r5  c                   s.   | j r*t|   d k	r* fdd| j D S i S )Nc                   s   i | ]}|j  gqS r   r   r  r2  r   r   r     s      z(_task_to_client_msgs.<locals>.<dictcomp>)r}  r5  r  r   r  r   rN    s
    rN  r  rg  zCallable[[WorkerState], Any]rm  )r  all_workersr  	objectiver   c                 C  s   t dd | jD st| jr(t|}ndd | jD }|dkrN|svt|}n(||M }|sv|}|sv| jrvt| |d|S |s~dS t|dkrtt	|S t
||dS dS )aK  
    Decide which worker should take task *ts*.

    We choose the worker that has the data on which *ts* depends.

    If several workers have dependencies then we choose the less-busy worker.

    Optionally provide *valid_workers* of where jobs are allowed to occur
    (if all workers are allowed to take the task, pass None instead).

    If the task requires data communication because no eligible worker has
    all the dependencies already, then we choose to minimize the number
    of bytes sent between workers.  This is determined by calling the
    *objective* function.
    c                 s  s   | ]}|j V  qd S r   r  r  r   r   r   rA    s     z decide_worker.<locals>.<genexpr>c                 S  s   h | ]}|j D ]}|qqS r   r  )r   r   r  r   r   r   rk    s       z decide_worker.<locals>.<setcomp>Nr  r   )r  r  r  r  r   r  r  r
  nextr
  r   )r  r  r  r  
candidatesr   r   r   r    s$    

r  r  c                   s   j tkst  jr@ j js@tdt jt jf jrn j jsntdt jt jf jD ]@}|j	rtdt t|f|j dksttdt t|fqt jD ]p} |jkstdt t|t|jf j dkr| jks|j	stdt t|f|j d	kstq jD ](}|j dks4td
t t|fq4 jD ]@} |jkstdt t|t|jf|j d	ksdtqd j
dk	 j dkkstt j	 j dkkst  j	 j f j dkr: j
r t j	rttdd  jD s:tdt t jf j dkrtdd  jD sttdt t jf jrt j	r, js jstdt t j	f jrֈ jstt jtstt fdd jD rt j	D ]0} |jkstdt t|t|jfq jD ]0} |jks2tdt t|t|jfq2 jrԈ j dkrt fdd j	D dkst j dkrĈ j
st  j
jkst j dkstdS )zValidate the given TaskStatez"waiting not subset of dependenciesz waiters not subset of dependentszwaiting on in-memory depri   zwaiting on released depznot in dependency's dependentsr  zdep missingrp   zwaiter not in playznot in dependent's dependenciesNrm   rn   rl   c                 s  s   | ]}|j V  qd S r   r  r  r   r   r   rA    s     z&validate_task_state.<locals>.<genexpr>ztask queued without all depsc                 s  s   | ]}|j V  qd S r   r  r  r   r   r   rA    s     z task processing without all depszunneeded task in memoryc                   s   g | ]} |j kqS r   )r{  r  r  r   r   r^  '  s     z'validate_task_state.<locals>.<listcomp>znot in who_has' has_whatznot in who_wants' wants_whatc                 3  s   | ]} |j kV  qd S r   )r   r  r  r   r   rA  :  s     r  )rr  ru   r  r{  r  r  r~   r|  ry  r  r  r   r  r}  rv  r  r   r_  r   r   r  r   r   )r  r   r  r  r   r  r   r    s    
"



(



"r  r  c                 C  sR   | j D ],}| |jkstdt| t|t|jfq| jD ]}|jdks:tq:d S )Nznot in has_what' who_has)rn   rm   )r   r  r  r~   r   rr  )r  r  r   r   r   validate_worker_stateA  s    

r  r  r  r  )r  r  r  r   c                 C  sr   |   D ]}t| q|  D ]}t| q|  D ]8}|jD ],}||jks>tdt|t|t|jfq>q4dS )zValidate a current runtime state.

    This performs a sequence of checks on the entire graph, running in about linear
    time. This raises assert errors if anything doesn't check out.
    znot in wants_what' who_wantsN)rt  r  r  r   r}  r  r~   )r  r  r  r  r  r  r   r   r   r  N  s    



r  r   r   )r  r   c                 C  s4   | dkrdS | dk rdS | dk r$dS | d d S dS )zHInterval in seconds that we desire heartbeats based on number of workersr^  r[  2   r     rV  Nr   )r  r   r   r   ra  h  s    ra  )r  saturation_factorr   c                 C  s8   t |rttt || j dt| jt| j  S )zINumber of tasks that can be sent to this worker without oversaturating itr  )	r  r
  r  r   r  r   r
  rm   r   r  r  r   r   r   r  u  s    r  r   c                 C  s   t |rdS t| |dkS )NFr   )r  r
  r  r  r   r   r   r  }  s    
r  c                      sj   e Zd Zdddd fddZedddd	Zeddd
dZeddddZddddZ  Z	S )r  r~   r   r   r  c                   s   t  ||| d S r   )r*  r   )r   r  rn  r  r+  r   r   r     s    zKilledWorker.__init__r   c                 C  s
   | j d S r  r  r   r   r   r   r    s    zKilledWorker.taskc                 C  s
   | j d S r)  r  r   r   r   r   rn    s    zKilledWorker.last_workerc                 C  s
   | j d S r]  r  r   r   r   r   r    s    zKilledWorker.allowed_failuresc                 C  s    d| j  d| j d| jj dS )NzAttempted to run task z on zr different workers, but all those workers died while running it. The last worker that attempt to run the task was z. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.)r  r  rn  r   r   r   r   r   r     s    zKilledWorker.__str__)
r   r   r   r   r   r  rn  r  r   r  r   r   r+  r   r    s   r  c                   @  sh   e Zd ZU dZdZded< ded< ddd	d
dZddddddZddddddZddddZ	dS )r:  zA plugin to share worker status with a remote observer

    This is used in cluster managers to keep updated about the status of the scheduler.
    zworker-statuszClassVar[str]r   r4   r  r  r6   )r   rW  c                 C  s&   t dd| _| j| ||  d S )Nri  )rk  )r4   r  r9  r  )r   r   rW  r   r   r   r     s    zWorkerStatusPlugin.__init__r~   r  )r   r"  r   c                 C  s^   |j |  }|d= |d= z| jdd||iig W n" tk
rX   |j| jd Y nX d S )Nr   r   r  r  rU  )r  r  r  rP  r7   r  r   )r   r   r"  r;  r   r   r   r    s    zWorkerStatusPlugin.add_workerc                 C  s<   z| j d|g W n" tk
r6   |j| jd Y nX d S )Nr&  rU  )r  rP  r7   r  r   )r   r   r"  r   r   r   r    s    z WorkerStatusPlugin.remove_workerr   c                 C  s   | j   d S r   )r  r  r   r   r   r   rN    s    zWorkerStatusPlugin.teardownN)
r   r   r   r   r   r   r   r  r  rN  r   r   r   r   r:    s   
	r:  c                   @  st   e Zd ZU ded< ded< ded< ded< d	ed
< dddddZddddddddZdddddddddZdS )rP  r  r   r~   r   r3  r   r   r  zdict[str, str]rr  rO  c                 C  s$   || _ || _t | _i | _i | _d S r   )r   r   r   r   r  rr  )r   r   r   r   r   r   r     s
    z"CollectTaskMetaDataPlugin.__init__r   r   r  )r   r   r  r  r   c                 K  s   | j | d S r   )r   r  )r   r   r   r  r  r   r   r   r    s    z&CollectTaskMetaDataPlugin.update_graphrq   )r  r9  r  r  r  r   c                 O  sP   |dkrL| j j|}|d k	rL|j| jkrL|j| j|< || j|< | j| d S )Nr  )r   r  r   r  r   r  rr  r%  )r   r  r9  r  r  r  r  r   r   r   r    s    
z$CollectTaskMetaDataPlugin.transitionN)r   r   r   r   r   r  r  r   r   r   r   rP    s   
	rP  )
__future__r   r  r  Zdataclassesr  rq  rc  rD  loggingr  r  r8  r  r  r  r=  r   r   collectionsr   r   collections.abcr   r   r   r   r	   r
   r   r   r   r   	functoolsr   Znumbersr   typingr   r   r   r   r   r   r   r  Zsortedcontainersr   r   Ztlzr   r   r   r   r   r   r    r!   r"   Ztornado.ioloopr#   r   Zdask.highlevelgraphr$   Z
dask.utilsr%   r&   r'   r(   r)   r*   Zdask.widgetsr+   r"  r,   r-   r.   r0   ru  Zdistributed._storiesr1   Z!distributed.active_memory_managerr2   r3   Zdistributed.batchedr4   Zdistributed.collectionsr5   Zdistributed.commr6   r7   r8   r9   r:   r;   Zdistributed.comm.addressingr<   Zdistributed.compatibilityr=   Zdistributed.corer>   r?   r@   rA   Z&distributed.diagnostics.memory_samplerrB   Zdistributed.diagnostics.pluginrC   rD   Zdistributed.eventrE   Zdistributed.httprF   Zdistributed.lockrG   Zdistributed.metricsrH   rI   Zdistributed.multi_lockrJ   Zdistributed.noderK   Zdistributed.proctitlerL   Zdistributed.protocol.picklerM   rN   Zdistributed.protocol.serializerO   rP   Zdistributed.publishrQ   Zdistributed.pubsubrR   Zdistributed.queuesrS   Zdistributed.recreate_tasksrT   Zdistributed.securityrU   Zdistributed.semaphorerV   Zdistributed.shufflerW   Zdistributed.stealingrX   Zdistributed.utilsrY   rZ   r[   r\   r]   r^   r_   r`   ra   Zdistributed.utils_commrb   rc   rd   Zdistributed.utils_perfre   rf   Zdistributed.variablerg   Ztyping_extensionsrh   rq   r   rr   rs   rt   r~   r   r   ru   	getLoggerr   r  r   r   r  r  r  r  r}   r   r   Z	dataclassr1  r8  rN  rl  r  r  r  r  r5  rN  r  r  r  r  ra  r  r  r  r  r:  rP  r   r   r   r   <module>   sr   ,$,  ,

?    77nf  T              0                                   Z-i!