U
    /e                 	   @  s,  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlmZmZ d dlmZmZmZmZmZmZ d dlmZ d dlmZmZ d dlmZmZmZ d dlmZ d d	l m!Z!m"Z"m#Z#m$Z$m%Z%m&Z&m'Z' d d
l(m)Z) d dl*Z*d dl+m,Z,m-Z- d dl.m/Z/ d dl0m1Z1 d dl2m3Z3 d dl4m5Z5m6Z6 d dl7m8Z8 d dl9m:Z: d dl;m<Z< d dl=m>Z? d dl@mAZA eBdZCe!rd dlDmEZE d dlFmGZG d dlHmIZI e$d ZJdeKd< neLZJddddd d!d"hZMd#eKd$< ddhZNd#eKd%< ddd&d'd d(dd"dh	ZOd#eKd)< d*ZPG d+d, d,e%ZQG d-d. d.e&d/d0ZRG d1d2 d2eSZTG d3d4 d4eTZUG d5d6 d6eSZVG d7d8 d8eSZWed9d:d;d<ZXe	jYd=krd>d?ini ZZef d/d/d@eZG dAdB dBZ[eG dCdD dDZ\G dEdF dFZ]eG dGdH dHe\Z^eG dIdJ dJe\Z_eG dKdL dLe\Z`eG dMdN dNe\ZaG dOdP dPe\ZbeG dQdR dRebZceG dSdT dTebZdeG dUdV dVebZeeG dWdX dXebZfeG dYdZ dZebZgeG d[d\ d\ebZheG d]d^ d^ebZieG d_d` d`ebZjeG dadb dbZkeG dcdd ddekZleG dedf dfekZmeG dgdh dhekZneG didj djekZoeG dkdl dleoZpeG dmdn dneoZqeG dodp dpeoZreG dqdr dreoZseG dsdt dtekZteG dudv dvekZueG dwdx dxeuZveG dydz dzeuZweG d{d| d|euZxeG d}d~ d~ekZyeG dd dekZzeG dd dekZ{eG dd dekZ|eG dd dekZ}eG dd dekZ~eG dd dekZeG dd dekZeG dd dekZe!rdZdeKd< dZdeKd< dZdeKd< neZeZeZdddddZG dd dZG dd dejZG dd dZdS )    )annotationsN)defaultdictdeque)Callable
Collection	ContainerIteratorMappingMutableMapping)copy)	dataclassfield)	lru_cachepartialsingledispatchmethod)chain)TYPE_CHECKINGAnyClassVarLiteral
NamedTuple	TypedDictcast)peekn)parse_bytestypename)worker_story)HeapSet)get_address_host)ErrorMessageerror_message)time)pickle)	Serialize)safe_sizeof)recursive_to_dictz distributed.worker.state_machine)	TypeAlias)WorkerPlugin)Worker)	cancelledconstrainederror	executingfetchflight	forgottenlong-runningmemorymissingreadyreleasedrescheduledresumedwaitingr&   TaskStateStater7   r3   r*   r,   r0   r)   r6   zset[TaskStateState]
PROCESSINGREADYr-   r.   r2   WAITING_FOR_DATAz--no-value-sentinel--c                   @  sB   e Zd ZU dZdZded< dZded< dZded< eZ	d	ed
< dS )SerializedTaskzInfo from distributed.scheduler.TaskState.run_spec
    Input to distributed.worker._deserialize

    (function, args kwargs) and task are mutually exclusive
    Nbytes | Nonefunctionzbytes | tuple | list | Noneargsbytes | dict[str, Any] | Nonekwargsobjecttask)
__name__
__module____qualname____doc__r>   __annotations__r?   rA   NO_VALUErC    rJ   rJ   D/tmp/pip-unpacked-wheel-g426oqom/distributed/worker_state_machine.pyr<   f   s
   
r<   c                   @  s.   e Zd ZU ded< ded< ded< ded< dS )	StartStopstractionfloatstartstopsourceN)rD   rE   rF   rH   rJ   rJ   rJ   rK   rL   s   s   
rL   F)totalc                   @  sN   e Zd ZdddddddZddd	d
ZddddZeZddddZdS )InvalidTransitionrM   r8   list[tuple]keyrP   finishstoryc                 C  s   || _ || _|| _|| _d S NrV   )selfrW   rP   rX   rY   rJ   rJ   rK   __init__{   s    zInvalidTransition.__init__tuple[Callable, tuple]returnc                 C  s   t | | j| j| j| jffS rZ   )typerW   rP   rX   rY   r[   rJ   rJ   rK   
__reduce__   s    zInvalidTransition.__reduce__c                 C  s>   | j j d| j d| j d| j d d dtt| j S )N:  :: ->
  Story:
    
    )		__class__rD   rW   rP   rX   joinmaprM   rY   ra   rJ   rJ   rK   __repr__   s    "zInvalidTransition.__repr__tuple[str, dict[str, Any]]c                 C  s   d| j | j| j| jdfS )Nzinvalid-worker-transitionrV   rV   ra   rJ   rJ   rK   to_event   s    zInvalidTransition.to_eventNrD   rE   rF   r\   rb   rl   __str__rn   rJ   rJ   rJ   rK   rT   z   s
   rT   c                      s"   e Zd Zdd fddZ  ZS )TransitionCounterMaxExceededrm   r^   c                   s   t   \}}d|fS )Nztransition-counter-max-exceeded)superrn   )r[   Ztopicmsgri   rJ   rK   rn      s    z%TransitionCounterMaxExceeded.to_event)rD   rE   rF   rn   __classcell__rJ   rJ   rt   rK   rq      s   rq   c                   @  sL   e Zd ZddddddZddd	d
ZddddZeZddddZdS )InvalidTaskStaterM   r8   rU   rW   staterY   c                 C  s   || _ || _|| _d S rZ   rw   )r[   rW   rx   rY   rJ   rJ   rK   r\      s    zInvalidTaskState.__init__r]   r^   c                 C  s   t | | j| j| jffS rZ   )r`   rW   rx   rY   ra   rJ   rJ   rK   rb      s    zInvalidTaskState.__reduce__c                 C  s6   | j j d| j d| j d d dtt| j S )Nrc   rd   rf   rg   rh   )ri   rD   rW   rx   rj   rk   rM   rY   ra   rJ   rJ   rK   rl      s    zInvalidTaskState.__repr__rm   c                 C  s   d| j | j| jdfS )Nzinvalid-worker-task-staterw   rw   ra   rJ   rJ   rK   rn      s    zInvalidTaskState.to_eventNro   rJ   rJ   rJ   rK   rv      s
   
rv   c                   @  s   e Zd ZdZdS )RecommendationsConflictzOTwo or more recommendations for the same task suggested different finish statesN)rD   rE   rF   rG   rJ   rJ   rJ   rK   ry      s   ry   intr^   c                   C  s   t tjdS )Nz'distributed.scheduler.default-data-size)r   daskconfiggetrJ   rJ   rJ   rK   _default_data_size   s    r~   )   
   slotsT)repreqc                   @  s   e Zd ZU dZded< dZded< eedZded	< eedZ	ded
< eedZ
ded< eedZded< dZded< dZded< dZded< dZded< dZded< eedZded< dZded< eedZded< dZded< dZded < d!Zded"< d!Zded#< dZd$ed%< d&Zd'ed(< eedZd)ed*< dZded+< dZded,< eedZd-ed.< dZ d/ed0< dZ!d1ed2< d3Z"d4ed5< e#$ Z%d6ed7< ed3d8Z&d9ed:< d;d<d=d>Z'dd<d?d@Z(d'd<dAdBZ)d'd<dCdDZ*dEdFdGd-dHdIdJZ+d4d<dKdLZ,dS )M	TaskStatezHolds volatile state relating to an individual Dask task.

    Not to be confused with :class:`distributed.scheduler.TaskState`, which holds
    similar information on the scheduler side.
    rM   rW   NSerializedTask | Nonerun_spec)default_factoryset[TaskState]dependencies
dependentswaiting_for_datawaitersr4   r8   rx   z6Literal[('executing', 'long-running', 'flight', None)]previousz#Literal[('fetch', 'waiting', None)]nextfloat | Nonedurationztuple[int, ...] | Nonepriorityset[str]who_has
str | Nonecoming_fromdict[str, float]resource_restrictionsSerialize | None	exception	traceback exception_texttraceback_texttype | Noner`   r   rz   suspicious_countlist[StartStop]
startstops
start_time	stop_timedictmetadata
int | Nonenbytesdict | Noner   Fbooldonez$ClassVar[weakref.WeakSet[TaskState]]
_instances)initr   __weakref__Noner^   c                 C  s   t j|  d S rZ   )r   r   addra   rJ   rJ   rK   __post_init__%  s    zTaskState.__post_init__c                 C  sV   | j dkrd| j d}n(| j dkr<d| j d| j d}n| j }d| jd| d	S )
Nr)   z
cancelled()r6   zresumed(re   z<TaskState  >)rx   r   r   rW   r[   rx   rJ   rJ   rK   rl   (  s    

zTaskState.__repr__c                 C  s   t | S )a  Override dataclass __hash__, reverting to the default behaviour
        hash(o) == id(o).

        Note that we also defined @dataclass(eq=False), which reverts to the default
        behaviour (a == b) == (a is b).

        On first thought, it would make sense to use TaskState.key for equality and
        hashing. However, a task may be forgotten and a new TaskState object with the
        same key may be created in its place later on. In the Worker state, you should
        never have multiple TaskState objects with the same key; see
        WorkerState.validate_state for relevant checks. We can't assert the same thing
        in __eq__ though, as multiple objects with the same key may appear in
        TaskState._instances for a brief period of time.
        )idra   rJ   rJ   rK   __hash__1  s    zTaskState.__hash__c                 C  s   | j }|d k	r|S t S rZ   )r   r~   )r[   r   rJ   rJ   rK   
get_nbytesB  s    zTaskState.get_nbytesrJ   excludeContainer[str]r   r_   c                C  s    t | |dd}dd | D S )a~  Dictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict

        Notes
        -----
        This class uses ``_to_dict_no_nest`` instead of ``_to_dict``.
        When a task references another task, just print the task repr. All tasks
        should neatly appear under Worker.tasks. This also prevents a RecursionError
        during particularly heavy loads, which have been observed to happen whenever
        there's an acyclic dependency chain of ~200+ tasks.
        T)r   membersc                 S  s   i | ]\}}|r||qS rJ   rJ   .0kvrJ   rJ   rK   
<dictcomp>Y  s       z.TaskState._to_dict_no_nest.<locals>.<dictcomp>)r%   items)r[   r   outrJ   rJ   rK   _to_dict_no_nestF  s    zTaskState._to_dict_no_nestc                 C  s   | j tkptdd | jD S )Nc                 s  s   | ]}|j tkV  qd S rZ   )rx   r9   )r   dep_tsrJ   rJ   rK   	<genexpr>\  s    z)TaskState.is_protected.<locals>.<genexpr>)rx   r9   anyr   ra   rJ   rJ   rK   is_protected[  s    zTaskState.is_protected)-rD   rE   rF   rG   rH   r   r   setr   r   r   r   rx   r   r   r   r   r   r   r   r   r   r   r   r   r`   r   listr   r   r   r   r   r   r   weakrefWeakSetr   r   r   rl   r   r   r   r   rJ   rJ   rJ   rK   r      sH   
	r   c                      sJ   e Zd ZU dZdZded< eddddd	Zd
dd fddZ  Z	S )InstructionzLCommand from the worker state machine to the Worker, in response to an eventstimulus_idrM   r   r   _InstructionMatch)rA   r_   c                 K  s   t | f|S )a  Generate a partial match to compare against an Instruction instance.
        The typical usage is to compare a list of instructions returned by
        :meth:`WorkerState.handle_stimulus` or in :attr:`WorkerState.stimulus_log` vs.
        an expected list of matches.

        Examples
        --------

        .. code-block:: python

            instructions = ws.handle_stimulus(...)
            assert instructions == [
                TaskFinishedMsg.match(key="x"),
                ...
            ]
        )r   clsrA   rJ   rJ   rK   matchh  s    zInstruction.matchrB   r   otherr_   c                   s"   t |tr|| kS t |S d S rZ   )
isinstancer   rr   __eq__r[   r   rt   rJ   rK   r   |  s    
zInstruction.__eq__)
rD   rE   rF   rG   	__slots__rH   classmethodr   r   ru   rJ   rJ   rt   rK   r   a  s   
r   c                   @  sP   e Zd ZU dZded< ded< ddddd	Zd
dddZdddddZdS )r   z_Utility class, to be used to test an instructions list.
    See :meth:`Instruction.match`.
    ztype[Instruction]r   dict[str, Any]rA   r   r   c                 K  s   || _ || _d S rZ   r   )r[   r   rA   rJ   rJ   rK   r\     s    z_InstructionMatch.__init__rM   r^   c                 C  s2   | j j}ddd | j D }| d| dS )N, c                 s  s    | ]\}}| d | V  qdS )=NrJ   r   rJ   rJ   rK   r     s     z-_InstructionMatch.__repr__.<locals>.<genexpr>(z) (partial match))r   rD   rj   rA   r   )r[   Zcls_strZ
kwargs_strrJ   rJ   rK   rl     s    z_InstructionMatch.__repr__rB   r   r   c                   s.   t  | jk	rdS t fdd| j D S )NFc                 3  s    | ]\}}t  ||kV  qd S rZ   getattrr   r   rJ   rK   r     s     z+_InstructionMatch.__eq__.<locals>.<genexpr>)r`   r   allrA   r   r   rJ   r   rK   r     s    z_InstructionMatch.__eq__N)rD   rE   rF   rG   rH   r\   rl   r   rJ   rJ   rJ   rK   r     s   
r   c                   @  s*   e Zd ZU dZded< ded< ded< dS )		GatherDep)worker	to_gathertotal_nbytesrM   r   r   r   rz   r   NrD   rE   rF   r   rH   rJ   rJ   rJ   rK   r     s   
r   c                   @  s   e Zd ZU dZded< dS )ExecuterW   rM   rW   Nr   rJ   rJ   rJ   rK   r     s   
r   c                   @  s   e Zd ZU dZded< dS )RetryBusyWorkerLaterr   rM   r   Nr   rJ   rJ   rJ   rK   r     s   
r   c                   @  s"   e Zd ZU dZded< ded< dS )DigestMetric)namevaluerM   r   rO   r   Nr   rJ   rJ   rJ   rK   r     s   
r   c                   @  s(   e Zd ZU ded< dZddddZdS )	SendMessageToSchedulerzClassVar[str]oprJ   r   r^   c                   s,    fdd j D } j|d<  j|d< |S )z@Convert object to dict so that it can be serialized with msgpackc                   s   i | ]}|t  |qS rJ   r   r   r   ra   rJ   rK   r     s      z2SendMessageToScheduler.to_dict.<locals>.<dictcomp>r   r   )rH   r   r   r[   drJ   ra   rK   to_dict  s    

zSendMessageToScheduler.to_dictN)rD   rE   rF   rH   r   r   rJ   rJ   rJ   rK   r     s   
r   c                      sh   e Zd ZU dZded< ded< ded< ded< d	ed
< ded< ded< eeZdd fddZ  ZS )TaskFinishedMsgtask-finishedrM   rW   r   r   bytesr`   r   r   r   threadr   r   r   r^   c                   s   t   }d|d< |S )NOKstatusrr   r   r   rt   rJ   rK   r     s    
zTaskFinishedMsg.to_dict)	rD   rE   rF   r   rH   tupler   r   ru   rJ   rJ   rt   rK   r     s   
r   c                      s   e Zd ZU dZded< ded< ded< ded< ded	< d
ed< ded< eeZdd fddZedddd
d dddZ	  Z
S )TaskErredMsgz
task-erredrM   rW   r#   r   r   r   r   r   r   r   r   r   r   r^   c                   s   t   }d|d< |S )Nr+   r   r   r   rt   rJ   rK   r     s    
zTaskErredMsg.to_dictNr   )tsr   r   r_   c              
   C  s.   | j s
tt| j| j | j| j| j|| j|dS )N)rW   r   r   r   r   r   r   r   )r   AssertionErrorr   rW   r   r   r   r   )r   r   r   rJ   rJ   rK   	from_task  s    
zTaskErredMsg.from_task)N)rD   rE   rF   r   rH   r   r   r   staticmethodr   ru   rJ   rJ   rt   rK   r     s   
 r   c                   @  s   e Zd ZU dZdZded< dS )ReleaseWorkerDataMsgzrelease-worker-datar   rM   rW   NrD   rE   rF   r   r   rH   rJ   rJ   rJ   rK   r    s   
r  c                   @  s   e Zd ZU dZdZded< dS )RescheduleMsgZ
rescheduler   rM   rW   Nr  rJ   rJ   rJ   rK   r    s   
r  c                   @  s&   e Zd ZU dZdZded< ded< dS )LongRunningMsgr0   rW   compute_durationrM   rW   r   r  Nr  rJ   rJ   rJ   rK   r  
  s   
r  c                   @  s   e Zd ZU dZdZded< dS )
AddKeysMsgadd-keyskeysCollection[str]r  Nr  rJ   rJ   rJ   rK   r    s   
r  c                   @  s"   e Zd ZU dZdZdZded< dS )RequestRefreshWhoHasMsga{  Worker -> Scheduler asynchronous request for updated who_has information.
    Not to be confused with the scheduler.who_has synchronous RPC call, which is used
    by the Client.

    See also
    --------
    RefreshWhoHasEvent
    distributed.scheduler.Scheduler.request_refresh_who_has
    distributed.client.Client.who_has
    distributed.scheduler.Scheduler.get_who_has
    zrequest-refresh-who-hasr
  r  r  NrD   rE   rF   rG   r   r   rH   rJ   rJ   rJ   rK   r    s   
r  c                   @  s*   e Zd ZU dZdZdZded< ded< dS )	StealResponseMsgzkWorker->Scheduler response to ``{op: steal-request}``

    See also
    --------
    StealRequestEvent
    zsteal-response)rW   rx   rM   rW   zTaskStateState | Nonerx   Nr  rJ   rJ   rJ   rK   r  /  s
   
r  c                   @  s   e Zd ZU dZdZded< i Zded< ddd dd	d
ZddddZdd dddZ	dddddddZ
edd dddZddddZdS ) StateMachineEventzDBase abstract class for all stimuli that can modify the worker state)r   handledrM   r   z,ClassVar[dict[str, type[StateMachineEvent]]]_classesr   )r?   rA   r_   c                 O  s   t | }d|_|S )z<Hack to initialize the ``handled`` attribute in Python <3.10N)rB   __new__r  )r   r?   rA   r[   rJ   rJ   rK   r  L  s    
zStateMachineEvent.__new__r   r^   c                 C  s   | t j| j< d S rZ   )r  r  rD   )r   rJ   rJ   rK   __init_subclass__R  s    z#StateMachineEvent.__init_subclass__rO   r  r_   c                C  s
   || _ | S )zProduce a variant version of self that is small enough to be stored in memory
        in the medium term and contains meaningful information for debugging
        r  )r[   r  rJ   rJ   rK   to_loggableU  s    zStateMachineEvent.to_loggablerJ   r   r   r   r   c                C  sV   dt | ji}t| D ]2}||ks|dr.qt| |}t|s|||< qt||dS )zDictionary representation for debugging purposes.

        See also
        --------
        distributed.utils.recursive_to_dict
        r   _r   )r`   rD   dir
startswithr   callabler%   )r[   r   infor   r   rJ   rJ   rK   _to_dict\  s    

zStateMachineEvent._to_dict)r   r_   c                 C  s>   |   }tj|d }|d}|f |}||_|  |S )zConvert the output of ``recursive_to_dict`` back into the original object.
        The output object is meaningful for the purpose of rebuilding the state machine,
        but not necessarily identical to the original.
        r   r  )r   r  r  popr  _after_from_dict)r   rA   r   r  instrJ   rJ   rK   	from_dictl  s    

zStateMachineEvent.from_dictc                 C  s   dS )zFOptional post-processing after an instance is created by ``from_dict``NrJ   ra   rJ   rJ   rK   r  z  s    z"StateMachineEvent._after_from_dictN)rD   rE   rF   rG   r   rH   r  r  r  r  r  r  r!  r  rJ   rJ   rJ   rK   r  ?  s   
r  c                   @  s   e Zd ZdZdS )
PauseEventrJ   NrD   rE   rF   r   rJ   rJ   rJ   rK   r"  ~  s   r"  c                   @  s   e Zd ZdZdS )UnpauseEventrJ   Nr#  rJ   rJ   rJ   rK   r$    s   r$  c                   @  s   e Zd ZU dZded< dS )RetryBusyWorkerEventr   rM   r   Nr   rJ   rJ   rJ   rK   r%    s   
r%  c                   @  s&   e Zd ZU dZdZded< ded< dS )GatherDepDoneEventz?:class:`GatherDep` instruction terminated (abstract base class))r   r   rM   r   rz   r   NrD   rE   rF   rG   r   rH   rJ   rJ   rJ   rK   r&    s   
r&  c                   @  s<   e Zd ZU dZdZded< ddddd	Zd
dddZdS )GatherDepSuccessEventzV:class:`GatherDep` instruction terminated:
    remote worker fetched successfully
    datadict[str, object]r*  rO   r  r  c                C  s$   t | }||_dd | jD |_|S )Nc                 S  s   i | ]
}|d qS rZ   rJ   r   rJ   rJ   rK   r     s      z5GatherDepSuccessEvent.to_loggable.<locals>.<dictcomp>)r   r  r*  r[   r  r   rJ   rJ   rK   r    s    z!GatherDepSuccessEvent.to_loggabler   r^   c                 C  s   dd | j D | _ d S )Nc                 S  s   i | ]
}|d qS rZ   rJ   r   rJ   rJ   rK   r     s      z:GatherDepSuccessEvent._after_from_dict.<locals>.<dictcomp>r)  ra   rJ   rJ   rK   r    s    z&GatherDepSuccessEvent._after_from_dictN)rD   rE   rF   rG   r   rH   r  r  rJ   rJ   rJ   rK   r(    s
   
r(  c                   @  s   e Zd ZdZdZdS )GatherDepBusyEventzI:class:`GatherDep` instruction terminated:
    remote worker is busy
    rJ   NrD   rE   rF   rG   r   rJ   rJ   rJ   rK   r-    s   r-  c                   @  s   e Zd ZdZdZdS )GatherDepNetworkFailureEventzr:class:`GatherDep` instruction terminated:
    network failure while trying to communicate with remote worker
    rJ   Nr.  rJ   rJ   rJ   rK   r/    s   r/  c                   @  sb   e Zd ZU dZded< ded< ded< ded< eeZd	d
ddZeddddd dddZ	dS )GatherDepFailureEventzclass:`GatherDep` instruction terminated:
    generic error raised (not a network failure); e.g. data failed to deserialize.
    r#   r   r   r   rM   r   r   r   r^   c                 C  s   t t | _d | _d S rZ   r#   	Exceptionr   r   ra   rJ   rJ   rK   r    s    z&GatherDepFailureEvent._after_from_dictBaseExceptionrz   )errr   r   r   r_   c             	   C  s.   t |}| |||d |d |d |d |dS )Nr   r   r   r   )r   r   r   r   r   r   r   )r    )r   r4  r   r   r   rs   rJ   rJ   rK   from_exception  s    	z$GatherDepFailureEvent.from_exceptionN)
rD   rE   rF   rG   rH   r   r   r  r   r5  rJ   rJ   rJ   rK   r0    s   
r0  c                   @  s   e Zd ZU ded< ded< ded< ded< d	ed
< ded< ded< ded< ded< ded< ded< ded< eeZddddZddddd d!d"Zd#dd$d%Zd	d#d&d'd(Z	ddd)d*Z
ed+d+d,d-d+d.d+d/dd0d1dd	d2dd3dd d4
d5d6Zd+S )7ComputeTaskEventrM   rW   dict[str, Collection[str]]r   dict[str, int]r   tuple[int, ...]r   rO   r   r   r   r=   r>   z"bytes | tuple | list | None | Noner?   r@   rA   r   r   r   actorr   r   r   r^   c                 C  sf   t | jtrt| j| _| jd k	rH| jd ks0tt| j| j| j	d| _nt | jtsbt| jd| _d S )N)r>   r?   rA   )rC   )
r   r   r   r   r>   r   r   r<   r?   rA   ra   rJ   rJ   rK   r     s    
  
zComputeTaskEvent.__post_init__rJ   r   r   r   c                C  s   t j|  |dS )Nr   )r  r  _clean)r[   r   rJ   rJ   rK   r    s    zComputeTaskEvent._to_dictr  c                 C  s0   t | }d |_d |_d |_td d d d d|_|S N)rC   r>   r?   rA   )r   r>   rA   r?   r<   r   )r[   r   rJ   rJ   rK   r;    s    zComputeTaskEvent._cleanr  c                C  s   |   }||_|S rZ   )r;  r  r,  rJ   rJ   rK   r  
  s    zComputeTaskEvent.to_loggablec                 C  s   t d d d d d| _d S r<  )r<   r   ra   rJ   rJ   rK   r    s    z!ComputeTaskEvent._after_from_dictNr         ?F)r   r   r   r   r   r:  r   z!dict[str, Collection[str]] | Nonedict[str, int] | Nonezdict[str, float] | Noner   )
rW   r   r   r   r   r   r:  r   r   r_   c          	      C  s@   t | |p
i |pdd |pdD ||dddd|p0i ||p8i |dS )Build a dummy event, with most attributes set to a reasonable default.
        This is a convenience method to be used in unit testing only.
        c                 S  s   i | ]
}|d qS    rJ   r   rJ   rJ   rK   r   %  s      z*ComputeTaskEvent.dummy.<locals>.<dictcomp>rJ   N)rW   r   r   r   r   r   r>   r?   rA   r   r:  r   r   )r6  )	rW   r   r   r   r   r   r:  r   r   rJ   rJ   rK   dummy  s    zComputeTaskEvent.dummy)rD   rE   rF   rH   r   r   r   r  r;  r  r  r  rC  rJ   rJ   rJ   rK   r6    s6   
r6  c                   @  s   e Zd ZU dZded< dZdS )ExecuteDoneEventz\Abstract base event for all the possible outcomes of a :class:`Compute`
    instruction
    rM   rW   r   N)rD   rE   rF   rG   rH   r   rJ   rJ   rJ   rK   rD  3  s   
rD  c                	      s   e Zd ZU ded< ded< ded< ded< ded	< eeZdd
dddZddddd fddZddddZe	d ddddddd dddZ
  ZS )!ExecuteSuccessEventrB   r   rO   rP   rQ   rz   r   r   r`   r  r  c                C  s   t | }||_d |_|S rZ   )r   r  r   r,  rJ   rJ   rK   r  F  s    zExecuteSuccessEvent.to_loggablerJ   r   r   r   r   c                  s(   t  j|d}d|kr$t| j|d< |S )Nr   r`   )rr   r  rM   r`   )r[   r   r   rt   rJ   rK   r  L  s    zExecuteSuccessEvent._to_dictr   r^   c                 C  s   d | _ d | _d S rZ   )r   r`   ra   rJ   rJ   rK   r  S  s    z$ExecuteSuccessEvent._after_from_dictNrB  )r   rM   )rW   r   r   r   r_   c             	   C  s   t | |dd|d|dS )r@  g        r>  N)rW   r   rP   rQ   r   r`   r   )rE  )rW   r   r   r   rJ   rJ   rK   rC  W  s    zExecuteSuccessEvent.dummy)N)rD   rE   rF   rH   r   r   r  r  r  r  rC  ru   rJ   rJ   rt   rK   rE  =  s   
 rE  c                	   @  s   e Zd ZU ded< ded< ded< ded< ded	< ded
< eeZddddZeddddddddd dddZe	ddd dddZ
dS )ExecuteFailureEventr   rP   rQ   r#   r   r   r   rM   r   r   r   r^   c                 C  s   t t | _d | _d S rZ   r1  ra   rJ   rJ   rK   r  w  s    z$ExecuteFailureEvent._after_from_dictN)rP   rQ   zBaseException | ErrorMessage)
err_or_msgrW   rP   rQ   r   r_   c             
   C  s@   t |tr|}nt|}| ||||d |d |d |d |dS )Nr   r   r   r   rW   rP   rQ   r   r   r   r   r   )r   r   r    )r   rG  rW   rP   rQ   r   rs   rJ   rJ   rK   r5  {  s    

z"ExecuteFailureEvent.from_exceptionrW   r   r_   c             
   C  s   t | ddtdddd|dS )r@  Nr   rH  )rF  r#   rW   r   rJ   rJ   rK   rC    s    	zExecuteFailureEvent.dummy)rD   rE   rF   rH   r   r   r  r   r5  r  rC  rJ   rJ   rJ   rK   rF  m  s   
rF  c                   @  s&   e Zd ZdZeddd dddZdS )RescheduleEventrJ   rM   rI  c                C  s   t | |dS )zqBuild an event. This method exists for compatibility with the other
        ExecuteDoneEvent subclasses.
        rJ  )rK  rJ  rJ   rJ   rK   rC    s    zRescheduleEvent.dummyN)rD   rE   rF   r   r  rC  rJ   rJ   rJ   rK   rK    s   rK  c                   @  s   e Zd ZU dZded< dS )CancelComputeEventr   rM   rW   Nr   rJ   rJ   rJ   rK   rL    s   
rL  c                   @  s   e Zd ZdZdS )FindMissingEventrJ   Nr#  rJ   rJ   rJ   rK   rM    s   rM  c                   @  s   e Zd ZU dZdZded< dS )RefreshWhoHasEventzScheduler -> Worker message containing updated who_has information.

    See also
    --------
    RequestRefreshWhoHasMsg
    )r   r7  r   Nr'  rJ   rJ   rJ   rK   rN    s   
rN  c                   @  s"   e Zd ZU dZded< ded< dS )AcquireReplicasEvent)r   r   r7  r   r8  r   Nr   rJ   rJ   rJ   rK   rO    s   
rO  c                   @  s   e Zd ZU dZded< dS )RemoveReplicasEventr
  r  r  Nr   rJ   rJ   rJ   rK   rP    s   
rP  c                   @  s   e Zd ZU dZded< dS )FreeKeysEventr
  r  r  Nr   rJ   rJ   rJ   rK   rQ    s   
rQ  c                   @  s   e Zd ZU dZdZded< dS )StealRequestEventzEvent that requests a worker to release a key because it's now being computed
    somewhere else.

    See also
    --------
    StealResponseMsg
    r   rM   rW   Nr'  rJ   rJ   rJ   rK   rR    s   
rR  c                   @  s2   e Zd ZU dZded< ded< dddd	d
ZdS )UpdateDataEvent)r*  reportr+  r*  r   rT  rO   r  r  c                C  s    t | }||_t| j|_|S rZ   )r   r  r   fromkeysr*  r,  rJ   rJ   rK   r    s    zUpdateDataEvent.to_loggableN)rD   rE   rF   r   rH   r  rJ   rJ   rJ   rK   rS    s   
rS  c                   @  s"   e Zd ZU dZded< ded< dS )SecedeEventr  rM   rW   rO   r  Nr   rJ   rJ   rJ   rK   rV    s   
rV  z'dict[TaskState, TaskStateState | tuple]Recszlist[Instruction]Instructionsztuple[Recs, Instructions]
RecsInstrs)r?   r_   c               	   G  st   i }g }| D ]^\}}|  D ]D\}}||krX|| |krXtd|j d||  d| |||< q||7 }q||fS )z|Merge multiple (recommendations, instructions) tuples.
    Collisions in recommendations are only allowed if identical.
    zMismatched recommendations for rc   z vs. )r   ry   rW   )r?   recsinstrZrecs_iZinstr_ir   rX   rJ   rJ   rK   merge_recs_instructions  s    

r\  c                4   @  s 
  e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded < ded!< ded"< ded#< d$ed%< ded&< d'ed(< d'ed)< ded*< ded+< ded,< ded-< d.ed/< d0ed1< d2ed3< ded4< ded5< d6ed7< ded8< d9ed:< eeZd;d<d<d<d<d<d=d>d?ejejd@ddAdBdCdDdEddd6ddd@dFdGZ	dHdIdJdKdLZ
eddMdNdOZeddMdPdQZeddMdRdSZeddMdTdUZddVddWdXdYdZZd[d\d]d^d_ZdWd\d`dadbZddMdcddZddedfdgdhZdidMdjdkZddldmdndoZdWdddpdqdrZdedMdsdtZdudMdvdwZdWddxdydzd{ZdWd|dded}d~dZdWddedyddZdWddedyddZdWddedyddZdWddedyddZdWddedyddZ dWddedyddZ!dWddedyddZ"dWddedyddZ#dWddedyddZ$dWdddedddZ%dWddedyddZ&dWddedyddZ'dWddedyddZ(dWddedyddZ)dWddedyddZ*dWddedyddZ+dWddddddedddZ,dWddddddedddZ-dWddedyddZ.dWddedyddZ/dWddedyddZ0dWddedyddZ1dWddedyddZ2dWddedyddZ3dWddedyddZ4dWdddedddZ5dWddedyddZ6dWddedyddZ7dWddedyddZ8dWddedyddZ9dWddedyddÄZ:dWdddedĜddƄZ;dWdddedĜddȄZ<dWd|dded}ddʄZ=dWd|dded}dd̄Z>dWd|dded}dd΄Z?dWd|dded}ddЄZ@dWd|dddedҜddԄZAdWddedyddքZBe5e3e<e5e5e5e5e4e-e/e<e@e1e.e2e7e#e#e,e;e=e6e*e%e&e'e,e9e?e e:e,e=e*e6e(e,eeee8e#e,e"eBe>e!e$e)e+e#dל3ZCded< dddd\dڜdd܄ZDdWddddedޜddZEdWdd`ddZFdWd\d`ddZGdWd\d`ddZHdddIdddZIeJdHdedddZKeKjLddedddZMeKjLddedddZNeKjLddedddZOeKjLddedddZPeKjLddedddZQddddd ZReKjLddedddZSeKjLddedddZTeKjLddeddd	ZUeKjLd
dedddZVeKjLddedddZWeKjLddedddZXeKjLddedddZYeKjLddedddZZeKjLddedddZ[eKjLddedddZ\dd dd!d"Z]eKjLd#dedd$d%Z^eKjLd&dedd'd(Z_eKjLd)dedd*d+Z`eKjLd,dedd-d.ZaeKjLd/dedd0d1Zbd2d3d4d5d6Zcd2d7d8d9d:Zdd;d<d=d>d?d@dAZeedBdMdCdDZfdWd\d`dEdFZgdWd\d`dGdHZhdWd\d`dIdJZidWd\d`dKdLZjdWd\d`dMdNZkdWd\d`dOdPZldWd\d`dQdRZmdWd\d`dSdTZndWd\d`dUdVZodWd\d`dWdXZpdWd\d`dYdZZqd\dMd[d\Zrd\dMd]d^Zsd<S (_  WorkerStateay  State machine encapsulating the lifetime of all tasks on a worker.

    Not to be confused with :class:`distributed.scheduler.WorkerState`.

    .. note::
       The data attributes of this class are implementation details and may be
       changed without a deprecation cycle.

    .. warning::
       The attributes of this class are all heavily correlated with each other.
       *Do not* modify them directly, *ever*, as it is extremely easy to obtain a broken
       state this way, which in turn will likely result in cluster-wide deadlocks.

       The state should be exclusively mutated through :meth:`handle_stimulus`.
    rM   addresszdict[str, TaskState]tasksr8  threadszMutableMapping[str, object]r*  zdict[str, WorkerPlugin]pluginszHeapSet[TaskState]r3   r*   rz   nthreadsr   runningr   r7   zdefaultdict[str, set[str]]has_whatz$defaultdict[str, HeapSet[TaskState]]data_neededfetch_countrO   transfer_message_bytes_limitmissing_dep_flightin_flight_taskszdict[str, set[str]]in_flight_workerstransfer_incoming_bytestransfer_incoming_count_limittransfer_incoming_count_total*transfer_incoming_bytes_throttle_thresholdr   busy_workers
generationr   total_resourcesavailable_resourcesr,   long_runningexecuted_countr   r+  actorszdeque[tuple]logzdeque[StateMachineEvent]stimulus_logvalidatetransition_counterzint | Literal[False]transition_counter_maxtransfer_incoming_bytes_limitzrandom.RandomrngrB  Ni'  TF)rb  r^  r*  r`  ra  	resourcesrl  rx  rz  r{  rg  r   z"MutableMapping[str, object] | Noner?  zdict[str, WorkerPlugin] | NonezMapping[str, float] | Nonec                C  st  || _ |r|| _|d k	r|ni | _|d k	r.|ni | _|d k	r@|ni | _|d k	rVt|ni | _| j | _|| _	i | _
d| _t | _tt| _tttt tdd| _d| _i | _t | _|| _d| _td| _d| _t | _d| _ttdd| _ ttdd| _!t | _"t | _#d| _$d| _%t | _&|| _'t(dd| _)t(dd| _*d| _+|	| _,|
| _-i | _.t/0d| _1d S )	NTr   r   r   g    cAi )maxleni'  )2rb  r^  r*  r`  ra  r   rq  r   rr  rx  r_  rc  r   r7   r   rd  r   r   r   operator
attrgetterre  rf  rj  ro  rl  rm  rz   rn  rk  rh  rp  r3   r*   r,   ri  r   rt  rs  rg  r   rv  rw  ry  rz  r{  ru  randomRandomr|  )r[   rb  r^  r*  r`  ra  r}  rl  rx  rz  r{  rg  rJ   rJ   rK   r\     sP    

zWorkerState.__init__r  rX  stimsr_   c                 G  s`   g }t  }|D ]L}t|ts0| j|j|d | |\}}||7 }|| j||jd7 }q|S )zProcess one or more external events, transition relevant tasks to new states,
        and return a list of instructions to be executed as a consequence.

        See also
        --------
        BaseWorker.handle_stimulus
        r  r   )	r!   r   rM  rw  appendr  _handle_event_transitionsr   )r[   r  instructionsr  stimrZ  r[  rJ   rJ   rK   handle_stimulus+  s    
zWorkerState.handle_stimulusr^   c                 C  s
   t | jS )a  Count of tasks currently executing on this worker and counting towards the
        maximum number of threads.

        It includes cancelled tasks, but does not include long running (a.k.a. seceded)
        tasks.

        See also
        --------
        WorkerState.executing
        WorkerState.executed_count
        WorkerState.nthreads
        WorkerState.all_running_tasks
        )lenr,   ra   rJ   rJ   rK   executing_countA  s    zWorkerState.executing_countc                 C  s   | j | jB S )ah  All tasks that are currently occupying a thread. They may or may not count
        towards the maximum number of threads.

        These are:

        - ts.status in (executing, long-running)
        - ts.status in (cancelled, resumed) and ts.previous in (executing, long-running)

        See also
        --------
        WorkerState.executing_count
        )r,   rs  ra   rJ   rJ   rK   all_running_tasksR  s    zWorkerState.all_running_tasksc                 C  s
   t | jS )zNumber of tasks currently being replicated from other workers to this one.

        See also
        --------
        WorkerState.in_flight_tasks
        )r  ri  ra   rJ   rJ   rK   in_flight_tasks_countc  s    z!WorkerState.in_flight_tasks_countc                 C  s
   t | jS )zCurrent number of open data transfers from other workers.

        See also
        --------
        WorkerState.in_flight_workers
        )r  rj  ra   rJ   rJ   rK   transfer_incoming_countm  s    z#WorkerState.transfer_incoming_countr9  r   )rW   r   r   r_   c                C  sv   z| j | }td|| W n& tk
rB   t| | j |< }Y nX |jsX|sRt||_| j|d|j	|t
 f |S )Nz+Data task %s already known (stimulus_id=%s)zensure-task-exists)r_  loggerdebugKeyErrorr   r   r   rv  r  rx   r!   )r[   rW   r   r   r   rJ   rJ   rK   _ensure_task_exists{  s    
zWorkerState._ensure_task_existszMapping[str, Collection[str]]r   )r   r_   c                 C  s   |  D ]\}}| j|}|s"qt|}| j|krZ|| j |jdkrZtd| j| |j	|krfq|j	| D ].}| j
| | |jdkrp| j| | qp||j	 D ].}| j
| | |jdkr| j| | q||_	qd S )Nr1   zEScheduler claims worker %s holds data for task %s, which is not true.r-   )r   r_  r}   r   r^  removerx   r  r  r   rd  re  r   )r[   r   rW   workersr   r   rJ   rJ   rK   _update_who_has  s0    




zWorkerState._update_who_has)r   r_   c                 C  s0  t d| |j}| j|d | j|d |jD ]&}| j| |j | j	| | q4|j
  | j|d |jD ]}|j| |j| qz|j
  d|_d|_d|_d|_d|_d|_d|_d|_d|_| j| | j| | j| | j| | j| | j| | j| dS )zEnsure that TaskState attributes are reset to a neutral default and
        Worker-level state associated to the provided key is cleared (e.g.
        who_has)
        This is idempotent
        zPurge task: %sNFr   )r  r  rW   r*  r  ru  r   rd  discardre  clearr`  r   r   r   r   r   r   r   r   r   r   r   rh  r3   r*   r,   rs  ri  r7   )r[   r   rW   r   r   rJ   rJ   rK   _purge_state  s:    



zWorkerState._purge_statec                 C  s0   | j | jk}| j| jk}| j| jk}|r,|p.|S )a  Decides whether the WorkerState should throttle data transfers from other workers.

        Returns
        -------
        * True if the number of incoming data transfers reached its limit
        and the size of incoming data transfers reached the minimum threshold for throttling
        * True if the size of incoming data transfers reached its limit
        * False otherwise
        )r  rl  rk  rn  r{  )r[   Zreached_count_limitZreached_throttle_thresholdZreached_bytes_limitrJ   rJ   rK   #_should_throttle_incoming_transfers  s    

z/WorkerState._should_throttle_incoming_transfersrY  )r   r_   c          
      C  s`  | j r| jsi g fS |  r$i g fS i }g }|  D ] \}}|| jksLt| |\}}|sh| jsht|sr qXdd |D }t	dt
||t
|t
| j| j| jt
| j | jd|||t f |D ]>}	| jr|	jdkst||	jkst|	|kstd|f||	< q|t||||d || j|< |  jd7  _|  j|7  _|  r4 qXq4||fS )	zTransition tasks from fetch to flight, until there are no more tasks in fetch
        state or a threshold has been reached.
        c                 S  s   h | ]
}|j qS rJ   r   r   r   rJ   rJ   rK   	<setcomp>	  s     z4WorkerState._ensure_communicating.<locals>.<setcomp>z]Gathering %d tasks from %s; %d more remain. Pending workers: %d; connections: %d/%d; busy: %dzgather-dependenciesr-   r.   )r   r   r   r   rB  )rc  re  r  _select_workers_for_gatherr^  r   _select_keys_for_gatherrk  r  r  r  r  rl  ro  rv  r  r!   rx  rx   r   r   rj  rm  )
r[   r   recommendationsr  r   Zavailable_tasksZto_gather_tasksmessage_nbytesZto_gather_keysr   rJ   rJ   rK   _ensure_communicating  s`    	
z!WorkerState._ensure_communicatingz(Iterator[tuple[str, HeapSet[TaskState]]]c           	   	   c  s   t | j}g }t| j D ]\\}}|s2| j|= q|| jks|| jkrHq|| j	t ||kt
| | j ||f qt| |rt|\}}}}}}|s| j|= q| t
|krt|| j	|t
| |||f q||fV  |s| j|= qdS )a}  Helper of _ensure_communicating.

        Yield the peer workers and tasks in data_needed, sorted by:

        1. By highest-priority task available across all workers
        2. If tied, first by local peer workers, then remote. Note that, if a task is
           replicated across multiple host, it may go in a tie with itself.
        3. If still tied, by number of tasks available to be fetched from the host
           (see note below)
        4. If still tied, by a random element. This is statically seeded to guarantee
           reproducibility.

           FIXME https://github.com/dask/distributed/issues/6620
                 You won't get determinism when a single task is replicated on multiple
                 workers, because TaskState.who_has changes order at every interpreter
                 restart.

        Omit workers that are either busy or in flight.
        Remove peer workers with no tasks from data_needed.

        Note
        ----
        Instead of number of tasks, we could've measured total nbytes and/or number of
        tasks that only exist on the worker. Raw number of tasks is cruder but simpler.
        N)r   r^  r   re  r   rj  ro  r  peekr   r  r|  r  heapqheapifyheappopheappush)	r[   hostheapr   r_  r  Z	is_remoteZ
ntasks_negZrndrJ   rJ   rK   r  7  s<    




z&WorkerState._select_workers_for_gatherztuple[list[TaskState], int])	availabler_   c                 C  s^   g }d}|rV|  }| ||r"qV|jD ]}| j| | q(|| || 7 }q||fS )zHelper of _ensure_communicating.

        Fetch all tasks that are replicated on the target worker within a single
        message, up to transfer_message_bytes_limit or until we reach the limit
        for the size of incoming data transfers.
        r   )r  _task_exceeds_transfer_limitsr   re  r  r  r   )r[   r  r   r  r   r   rJ   rJ   rK   r  w  s    	

z#WorkerState._select_keys_for_gather)r   r  r_   c                 C  sF   | j dkr|dkrdS | j| j  }|dkr:t|| j| }| |kS )a  Would asking to gather this task exceed transfer limits?

        Parameters
        ----------
        ts
            Candidate task for gathering
        message_nbytes
            Total number of bytes already scheduled for gathering in this message
        Returns
        -------
        exceeds_limit
            True if gathering the task would exceed limits, False otherwise
            (in which case the task can be gathered).
        r   F)rk  r{  minrg  r   )r[   r   r  Zincoming_bytes_allowancerJ   rJ   rK   r    s    
z)WorkerState._task_exceeds_transfer_limitsc                 C  sx   | j si g fS i }t| j| jk rp|  }|s0qp| jrP|jtksDt||ksPtd||< | 	| | j
| q|g fS )Nr,   )rc  r  r,   rb  _next_ready_taskrx  rx   r:   r   _acquire_resourcesr   )r[   rZ  r   rJ   rJ   rK   _ensure_computing  s    
zWorkerState._ensure_computingzTaskState | Nonec                 C  s   | j r`| jr`| j  }| j }|js*t|js4t|j|jk rT| |rT| j S | j  S n4| j rp| j  S | jr| j }| |r| j S dS )z=Pop the top-priority task from self.ready or self.constrainedN)r3   r*   r  r   r    _resource_restrictions_satisfiedr  )r[   ZtsrZtscrJ   rJ   rK   r    s"    








zWorkerState._next_ready_taskr   )r   r   r_   c              
   C  s   |j | jkr(|j | jkr(td| d|j}|jd ks@|d krz| j|j  }W n  tk
rp   | j|j  }Y nX t||_t| }|_~zt	|}W n  t
k
r   t	|j}Y nX t|j |j|t||j| j|j |j|dS )NzTask z
 not ready)rW   r   r`   r   r   r   r   r   )rW   r*  ru  RuntimeErrorr`   r   r  sizeofr"   dumpsr2  rD   r   r   r   r`  r}   r   )r[   r   r   typr   Ztyp_serializedrJ   rJ   rK   _get_task_finished_msg  s2    
z"WorkerState._get_task_finished_msgrB   )r   r   r   r_   c          	      C  s  |j | jkrd|_i g fS i }g }|j | jkr<|| j|j < nPt }|| j|j < t }|| dkr|jd||d |td|| |d d|_|jdkrt	||_|  j|j7  _t
||_
|jD ](}|j| |js|jdkrd	||< q| j|j d
|t f ||fS )a  
        Put a key into memory and set data related task state attributes.
        On success, generate recommendations for dependents.

        This method does not generate any scheduler messages since this method
        cannot distinguish whether it has to be an `add-task` or a
        `task-finished` signal. The caller is required to generate this message
        on success.

        Raises
        ------
        Exception:
            In case the data is put into the in-memory buffer and a serialization error
            occurs during spilling, this re-raises that error. This has to be handled by
            the caller since most callers generate scheduler messages on success (see
            comment above) but we need to signal that this was not successful.

            Can only trigger if distributed.worker.memory.target is enabled, the value
            is individually larger than target * memory_limit, and the task is not an
            actor.
        r1   g{Gzt?z
disk-writerN   rP   rQ   zdisk-write-target-durationr   r   r   Nr7   r3   zput-in-memory)rW   r*  rx   ru  r!   r   r  r   r   r  r`   r   r   r  rv  )	r[   r   r   r   r  r  rP   rQ   deprJ   rJ   rK   _put_key_in_memory   s@    





zWorkerState._put_key_in_memoryc                 C  sZ   |j s|dig fS d|_d|_|  jd7  _|js6t|j D ]}| j| | q<i g fS )Nr2   r-   FrB  )r   rx   r   rf  r   r   re  r   )r[   r   r   wrJ   rJ   rK   _transition_generic_fetchI  s    

z%WorkerState._transition_generic_fetchc                C  s$   | j | | | | j||dS Nr   )rh  r  r  _transition_released_waitingr[   r   r   rJ   rJ   rK   _transition_missing_waitingU  s    
z'WorkerState._transition_missing_waitingc                C  s<   | j r|jdkst|js"i g fS | j| | j||dS )Nr2   r   )rx  rx   r   r   rh  r  r  r  rJ   rJ   rK   _transition_missing_fetch\  s    z%WorkerState._transition_missing_fetchc                C  s6   | j | | j||d\}}|j| jks.t||fS r  )rh  r  _transition_generic_releasedrW   r_  r   r[   r   r   rZ  r  rJ   rJ   rK   _transition_missing_releasedh  s     
z(WorkerState._transition_missing_releasedc                C  s   |j s
t| j||dS r  )r   r   _transition_generic_missingr  rJ   rJ   rK   _transition_flight_missingr  s    
z&WorkerState._transition_flight_missingc                C  s0   | j r|jrtd|_| j| d|_i g fS )Nr2   F)rx  r   r   rx   rh  r   r   r  rJ   rJ   rK   r  x  s    
z'WorkerState._transition_generic_missingc                C  s"   | j r|jdkst| j||dS )Nr4   r   )rx  rx   r   r  r  rJ   rJ   rK   _transition_released_fetch  s    z&WorkerState._transition_released_fetchc                C  sX   |  | i }|jD ]&}|js|jttB dhB krd||< qd|_|jsPd||< |g fS )Nr1   r4   r/   )r  r   r   rx   r:   r9   r   )r[   r   r   rZ  
dependencyrJ   rJ   rK   r    s    


z(WorkerState._transition_generic_releasedc                  s    j r"t fdd|jD s"ti }|j  |jD ].}|jdkr6|j| |j| d||< q6|jstd||< d|_ j	| |g fS )Nc                 3  s   | ]}|j  jkV  qd S rZ   )rW   r_  r   r   ra   rJ   rK   r     s     z;WorkerState._transition_released_waiting.<locals>.<genexpr>r1   r-   r3   r7   )
rx  r   r   r   r   r  rx   r   r   r7   )r[   r   r   r  r   rJ   ra   rK   r    s    



z(WorkerState._transition_released_waiting)r   r   r   r_   c                C  sp   | j r<|jdkst|jst|jD ]}|| j| ks$tq$d|_d|_||_| j| |  j	d8  _	i g fS )Nr-   Fr.   rB  )
rx  rx   r   r   re  r   r   ri  r   rf  )r[   r   r   r   r  rJ   rJ   rK   _transition_fetch_flight  s    

z$WorkerState._transition_fetch_flightc                C  s   |  j d8  _ | j||dS NrB  r   )rf  r  r  rJ   rJ   rK   _transition_fetch_missing  s    z%WorkerState._transition_fetch_missingc                C  s   |  j d8  _ | j||dS r  )rf  r  r  rJ   rJ   rK   _transition_fetch_released  s    z&WorkerState._transition_fetch_releasedc                C  sL   |j d k	st|  j |j 8  _ | j||d\}}|t|j|d ||fS )Nr   rJ  )r   r   r  r  r  rW   r  rJ   rJ   rK   _transition_memory_released  s     
z'WorkerState._transition_memory_releasedc                  s    j rn|jdkst|jrtt fdd|jD s:ttdd |jD sRt| jks`t| jksntd|_ j	|  j
|   S )Nr7   c                 3  s&   | ]}|j  jkp|j  jkV  qd S rZ   rW   r*  ru  r   r  ra   rJ   rK   r     s   z>WorkerState._transition_waiting_constrained.<locals>.<genexpr>c                 s  s   | ]}|j d kV  qdS r1   Nrx   r  rJ   rJ   rK   r     s     r*   )rx  rx   r   r   r   r   r3   r*   r7   r  r   r  r  rJ   ra   rK   _transition_waiting_constrained  s    
z+WorkerState._transition_waiting_constrainedc                C  s.   |j s
tti t|j|dgf| j||dS )zNote: this transition is triggered exclusively by a task raising the
        Reschedule() Exception; it is not involved in work stealing.
        rJ  r   )r   r   r\  r  rW   r  r  rJ   rJ   rK   !_transition_executing_rescheduled  s
    
z-WorkerState._transition_executing_rescheduledc                C  s   | j rp|jdkst|| jks"t|| jks0t|jr:t|jD ].}|j| jks`|j| j	ks`t|jdks@tq@|j
r|dig fS d|_|jd k	st| j| | j| |  S )Nr7   r1   r*   r3   )rx  rx   r   r3   r*   r   r   rW   r*  ru  r   r   r7   r  r   r  )r[   r   r   r  rJ   rJ   rK   _transition_waiting_ready  s    

z%WorkerState._transition_waiting_readyr#   r   )r   r   r   r   r   r   r_   c                C  sB   ||_ ||_||_||_d|_tj||| j|j	d}i |gfS )Nr+   )r   r   )
r   r   r   r   rx   r   r   r`  r}   rW   )r[   r   r   r   r   r   r   smsgrJ   rJ   rK   _transition_generic_error  s    
z%WorkerState._transition_generic_errorc                C  sp   |j s
t|jdkr,|jdks"t|di}n$|jdks:t|jdksHt|di}d|_d|_ d|_d|_|g fS )zIn case of failure of the previous state, discard the error and kick off the
        next state without informing the scheduler
        r,   r0   r-   r.   r7   r4   FNr   r   r   r   rx   )r[   r   r   r   r   r   r   rZ  rJ   rJ   rK   _transition_resumed_error'  s    


z%WorkerState._transition_resumed_errorc                C  sJ   |j s
t|jdkst|jdks&td|_d|_ d|_d|_|dig fS )a  If the task raises the Reschedule() exception, but the scheduler already told
        the worker to fetch it somewhere else, silently transition to fetch.

        Note that this transition effectively duplicates the logic of
        _transition_resumed_error.
        r  r-   r4   FNr  r  rJ   rJ   rK   _transition_resumed_rescheduledC  s    	
z+WorkerState._transition_resumed_rescheduledc                C  s   |j dkr\| jr|jdkst|jrHd|_d|_d|_ d|_|dig fS d|_d|_ d|_n,| jr|j dkspt|jdks~t|jrti g fS )z
        See also
        --------
        _transition_cancelled_fetch
        _transition_cancelled_waiting
        _transition_resumed_waiting
        _transition_flight_fetch
        r.   r7   r4   FNr  r-   )r   rx  r   r   r   rx   r  rJ   rJ   rK   _transition_resumed_fetchU  s"    

z%WorkerState._transition_resumed_fetchc                C  s   |dig fS )Nr-   rJ   r  rJ   rJ   rK   _transition_resumed_missing}  s    z'WorkerState._transition_resumed_missingc                C  s   |j r
td|_d |_i g fS )Nr)   )r   r   rx   r   r  rJ   rJ   rK   _transition_resumed_released  s    
z(WorkerState._transition_resumed_releasedc                C  s   |j r
t|jdkr<|jdks"td|_d|_d|_i g fS |jdkr|jdksTtd|_d|_d|_t|jd|d}i |gfS |jdkst|jdksti g fS dS )z
        See also
        --------
        _transition_cancelled_fetch
        _transition_cancelled_or_resumed_long_running
        _transition_cancelled_waiting
        _transition_resumed_fetch
        r,   r-   Nr0   rW   r  r   r.   r7   )r   r   r   r   rx   r  rW   r[   r   r   r  rJ   rJ   rK   _transition_resumed_waiting  s*    


  
z'WorkerState._transition_resumed_waitingc                C  sb   |j dkr2|jr|dig fS d|_d|_ i g fS n,|j dks@t|jrJtd|_d|_i g fS dS )z
        See also
        --------
        _transition_cancelled_waiting
        _transition_resumed_fetch
        _transition_resumed_waiting
        r.   r4   Nr  r6   r-   )r   r   rx   r   r   r  rJ   rJ   rK   _transition_cancelled_fetch  s    



z'WorkerState._transition_cancelled_fetchc                C  s~   |j r
t|jdkr(d|_d|_i g fS |jdkrXd|_d|_t|jd|d}i |gfS |jdksftd|_d|_i g fS dS )z
        See also
        --------
        _transition_cancelled_fetch
        _transition_cancelled_or_resumed_long_running
        _transition_resumed_fetch
        _transition_resumed_waiting
        r,   Nr0   r  r.   r6   r7   )r   r   r   rx   r  rW   r   r  rJ   rJ   rK   _transition_cancelled_waiting  s$    


  
z)WorkerState._transition_cancelled_waitingr   )r   r?   r   r_   c                G  s(   |j si g fS d |_d|_ | j||dS )NFr   )r   r   r  )r[   r   r   r?   rJ   rJ   rK   _transition_cancelled_released  s
    z*WorkerState._transition_cancelled_releasedc                C  sH   | j r(|jdkst|jrt|jr(tttd |j|_d|_i g fS )zWe can't stop executing a task just because the scheduler asked us to,
        so we're entering cancelled state and waiting until it completes.
        r  r)   )rx  rx   r   r   r   r   r   r   r  rJ   rJ   rK   _transition_executing_released	  s    

z*WorkerState._transition_executing_releasedc                C  s   | j rr|jdkst|jrt|j| jks.t|| jks<t|| jksJt|jD ] }|j| jksP|j| j	ksPtqPd|_t
|j|d}i |gfS )Nr*   r,   rJ  )rx  rx   r   r   rW   r*  r3   r*   r   ru  r   )r[   r   r   r  r[  rJ   rJ   rK   !_transition_constrained_executing	  s    

z-WorkerState._transition_constrained_executingc                  s    j rf|jdkst|jrt|j jks.t| jks<t| jksJtt fdd|j	D sftd|_t
|j|d}i |gfS )Nr3   c                 3  s&   | ]}|j  jkp|j  jkV  qd S rZ   r  r  ra   rJ   rK   r   )	  s   z:WorkerState._transition_ready_executing.<locals>.<genexpr>r,   rJ  )rx  rx   r   r   rW   r*  r3   r*   r   r   r   )r[   r   r   r[  rJ   ra   rK   _transition_ready_executing 	  s    
z'WorkerState._transition_ready_executingc                C  s   |j si g fS | j||dS r  )r   r  r  rJ   rJ   rK   _transition_flight_fetch2	  s    z$WorkerState._transition_flight_fetchc                C  s$   |j r
td|_d |_d|_i g fS )Nr.   r)   r  r  rJ   rJ   rK   _transition_flight_released=	  s
    
z'WorkerState._transition_flight_released)r   r  r   r_   c                C  sB   d|_ | j| | j| t|j||d}ti |gf|  S )za
        See also
        --------
        _transition_cancelled_or_resumed_long_running
        r0   r  )	rx   r,   r  rs  r   r  rW   r\  r  )r[   r   r  r   r  rJ   rJ   rK   "_transition_executing_long_runningH	  s      z.WorkerState._transition_executing_long_runningc                C  s4   |j dkstd|_ | j| | j| |  S )a*  Handles transitions:

        - cancelled(executing) -> long-running
        - cancelled(long-running) -> long-running (user called secede() twice)
        - resumed(executing->fetch) -> long-running
        - resumed(long-running->fetch) -> long-running (user called secede() twice)

        Unlike in the executing->long_running transition, do not send LongRunningMsg.
        From the scheduler's perspective, this task no longer exists (cancelled) or is
        in memory on another worker (resumed). So it shouldn't hear about it.
        Instead, we're going to send the LongRunningMsg when and if the task
        transitions back to waiting.

        See also
        --------
        _transition_executing_long_running
        _transition_cancelled_waiting
        _transition_resumed_waiting
        r  r0   )r   r   r,   r  rs  r   r  )r[   r   r  r   rJ   rJ   rK   -_transition_cancelled_or_resumed_long_running\	  s
    z9WorkerState._transition_cancelled_or_resumed_long_runningc                C  s   | j ||d|dS )zThis transition is *normally* triggered by ExecuteSuccessEvent.
        However, beware that it can also be triggered by scatter().
        r   r   _transition_to_memoryr[   r   r   r   rJ   rJ   rK   _transition_executing_memoryx	  s       z(WorkerState._transition_executing_memoryc                C  s   | j ||d|dS )z)This transition is triggered by scatter()r	  r   r  r  rJ   rJ   rK   _transition_released_memory	  s       z'WorkerState._transition_released_memoryc                C  s   | j ||d|dS )zThis transition is *normally* triggered by GatherDepSuccessEvent.
        However, beware that it can also be triggered by scatter().
        r	  r   r  r  rJ   rJ   rK   _transition_flight_memory	  s       z%WorkerState._transition_flight_memoryc                C  s\   |j dkr|jdkstd}n |j dks,t|jdks:td}d|_ d|_| j||||dS )	a(  Normally, we send to the scheduler a 'task-finished' message for a completed
        execution and 'add-data' for a completed replication from another worker. The
        scheduler's reaction to the two messages is fundamentally different; namely,
        add-data is only admissible for tasks that are already in memory on another
        worker, and won't trigger transitions.

        In the case of resumed tasks, the scheduler's expectation is set by ts.next -
        which means, the opposite of what the worker actually just completed.
        r  r-   r	  r.   r7   r   Nr   )r   r   r   r  )r[   r   r   r   msg_typerJ   rJ   rK   _transition_resumed_memory	  s    
z&WorkerState._transition_resumed_memoryz&Literal[('add-keys', 'task-finished')])r   r   r  r   r_   c          	   
   C  s   z| j |||d\}}W nB tk
rZ } z$t|}|t| ig f W Y S d }~X Y nX |dkr||t|jg|d n |dkst|| j	||d ||fS )Nr   r	  r  r   r   )
r  r2  r    r   valuesr  r  rW   r   r  )	r[   r   r   r  r   rZ  instrsers   rJ   rJ   rK   r  	  s    *z!WorkerState._transition_to_memoryc                C  sz   i }| j r"tdd |jD r"t|jD ](}|j| |jdkr(|js(d||< q(| | d|_| j	|j
d  |g fS )Nc                 s  s   | ]}|j d kV  qdS )r/   Nr  r  rJ   rJ   rK   r   	  s     z=WorkerState._transition_released_forgotten.<locals>.<genexpr>r4   r/   )rx  r   r   r   r   r  rx   r  r_  r  rW   )r[   r   r   r  r  rJ   rJ   rK   _transition_released_forgotten	  s    


z*WorkerState._transition_released_forgotten)3)r)   r+   )r)   r-   )r)   r0   )r)   r1   )r)   r2   )r)   r4   )r)   r5   )r)   r7   )r6   r+   )r6   r-   )r6   r0   )r6   r1   )r6   r4   )r6   r5   )r6   r7   )r*   r,   )r*   r4   )r+   r4   )r,   r+   r  )r,   r1   )r,   r4   )r,   r5   )r-   r.   )r-   r2   )r-   r4   )r.   r+   )r.   r-   )r.   r1   )r.   r2   )r.   r4   )r0   r+   )r0   r1   )r0   r5   )r0   r4   )r1   r4   )r2   r+   )r2   r-   )r2   r4   )r2   r7   )r3   r,   )r3   r4   )r4   r+   )r4   r-   )r4   r/   )r4   r1   )r4   r2   )r4   r7   )r7   r*   )r7   r3   )r7   r4   zSClassVar[Mapping[tuple[TaskStateState, TaskStateState], Callable[..., RecsInstrs]]]_TRANSITIONS_TABLE)method_namer?   rA   r_   c              	   O  s\   | j  D ]L\}}t||r
zt|||| W q
 tk
rT   tjd|dd Y q
X q
d S )Nz!Plugin '%s' failed with exceptionT)exc_info)ra  r   hasattrr   r2  r  r  )r[   r  r?   rA   r   ZpluginrJ   rJ   rK   _notify_plugins
  s    
  zWorkerState._notify_pluginszTaskStateState | tuple)r   rX   r?   r   r_   c             
   G  s  t |tr,|rt|dd }tt|d }|j|kr>i g fS |j}| j||f}|  jd7  _| j	r| j| j	krt
|j||| ||dk	r|| |f|d|i\}}| d|j|| nd||fkrz| j|d|d\}}||d }	rLt |	tr|	^}
}n
|	d }
}|
d	kr$qt||f| j||
f|d|i\}}qt||f| j||f|d|i\}}W n@ ttfk
r } zt|j||| ||W 5 d}~X Y nX nt|j||| || j|j|||jd
d | D |t f ||fS )zTransition a key from its current state to the finish state

        See Also
        --------
        Worker.transitions: wrapper around this method
        rB  Nr   r   Z
transitionr4   r   rJ   r/   c                 S  s*   i | ]"\}}|j t|tr"|d  n|qS r=  )rW   r   r   )r   r   newrJ   rJ   rK   r   p
  s    z+WorkerState._transition.<locals>.<dictcomp>)r   r   r   r   r8   rx   r  r}   ry  rz  rq   rW   rY   r  _transitionr  r\  rT   ry   rv  r  r   r!   )r[   r   rX   r   r?   rP   funcrZ  r  r   Zv_stateZv_argsr  rJ   rJ   rK   r  
  sl    


  




,zWorkerState._transitionc                   s   t  fdd|j D S )Nc                 3  s    | ]\}} j | |kV  qd S rZ   )rr  )r   resourceneededra   rJ   rK   r   {
  s   z?WorkerState._resource_restrictions_satisfied.<locals>.<genexpr>)r   r   r   r[   r   rJ   ra   rK   r  z
  s    z,WorkerState._resource_restrictions_satisfiedc                 C  s*   |j  D ]\}}| j|  |8  < q
d S rZ   r   r   rr  r[   r   r  r   rJ   rJ   rK   r  
  s    zWorkerState._acquire_resourcesc                 C  s*   |j  D ]\}}| j|  |7  < q
d S rZ   r  r  rJ   rJ   rK   _release_resources
  s    zWorkerState._release_resourcesrW  )r  r   r_   c                  sn   g  t  ddd fdd}||  jd\}} |7  || jrjD ]}| qZ S )zProcess transitions until none are left

        This includes feedback from previous transitions and continues until we
        reach a steady state
        rW  r   )rZ  r_   c                   sH   | rD|   \}}| j||d\}}| |  | q d S r  )popitemr   r  updateextend)rZ  r   rX   a_recsa_instructionsr  r[   r   r_  rJ   rK   process_recs
  s    
  

z.WorkerState._transitions.<locals>.process_recsr   )r   r   r  rx  validate_task)r[   r  r   r  r  r	  r   rJ   r
  rK   r  
  s    
	zWorkerState._transitions)evr_   c                 C  s   t |d S rZ   )	TypeErrorr[   r  rJ   rJ   rK   r  
  s    zWorkerState._handle_eventrS  c           
      C  s
  i }g }|j  D ]\}}z| j| }d|f||< W n tk
r   t| | j|< }z| j|||jd\}}W n: tk
r }	 z|tt	|	
 i}g }W 5 d }	~	X Y nX || || Y nX | j|d|jt f q|jr|tt|j |jd ||fS )Nr1   r   zreceive-from-scatterr  )r*  r   r_  r  r   r  r   r2  r   r    r  r  r  rv  r  r!   rT  r  r   )
r[   r  r  r  rW   r   r   rZ  r  r  rJ   rJ   rK   _handle_update_data
  s2    
  
zWorkerState._handle_update_datarQ  c                 C  sJ   | j d|j|jt f i }|jD ]}| j|}|r$d||< q$|g fS )a  Handler to be called by the scheduler.

        The given keys are no longer referred to and required by the scheduler.
        The worker is now allowed to release the key, if applicable.

        This does not guarantee that the memory is released since the worker may
        still decide to hold on to the data and task since it is required by an
        upstream dependency.
        z	free-keysr4   )rv  r  r  r   r!   r_  r}   )r[   r  r  rW   r   rJ   rJ   rK   _handle_free_keys
  s    

zWorkerState._handle_free_keysrP  c                 C  s   i }g }g }|j D ]Z}| j|}|dks|jdkr6q| sb| j|jd|jt	 f d||< q|| q|r| jd||jt	 f |t
||jd ||fS )a  Stream handler notifying the worker that it might be holding unreferenced,
        superfluous data.

        This should not actually happen during ordinary operations and is only intended
        to correct any erroneous state. An example where this is necessary is if a
        worker fetches data for a downstream task but that task is released before the
        data arrives. In this case, the scheduler will notify the worker that it may be
        holding this unnecessary data, if the worker hasn't released the data itself,
        already.

        This handler does not guarantee the task nor the data to be actually
        released but only asks the worker to release the data on a best effort
        guarantee. This protects from race conditions where the given keys may
        already have been rescheduled for compute in which case the compute
        would win and this handler is ignored.

        For stronger guarantees, see handler free_keys
        Nr1   zremove-replica-confirmedr4   zremove-replica-rejectedr  )r  r_  r}   rx   r   rv  r  rW   r   r!   r  )r[   r  r  r  ZrejectedrW   r   rJ   rJ   rK   _handle_remove_replicas
  s&    

z#WorkerState._handle_remove_replicasrO  c                 C  s   | j r0|j |j kstt|j s0ti }|j D ]2\}}| j|d|j	d}|j
dkr>||_d||< q>| |j |g fS )NrA  rW   r   r   r1   r-   )rx  r   r  r   r   r   r  r   r  r   rx   r  )r[   r  r  rW   r   r   rJ   rJ   rK   _handle_acquire_replicas  s    	

z$WorkerState._handle_acquire_replicasr6  c           
   	   C  sR  z$| j |j }td||jd W n* tk
rN   t|j | j |j< }Y nX | j|jd|j	|jt
 f i }g }|j	tdddhB krn|j	dkr|| j||jd n|j	d	kr|tj||jd nt|j	d
kr(d||< |j|_|j| jf }|  jd8  _|jr"d | j|j< d |_d |_d|_d|_||_|j|_|j|_|j	dkrh|jdksp|j|_| jr|j |j kst |j! D ]*}|st t"|t"t#|kst q|j$ D ]F\}}| j%|||jd}	|	j	dkr||	_|j&'|	 |	j('| q| )|j n"t*d| d|j d| +| ||fS )Nz)Asked to compute an already known task %s)rC   r   zcompute-taskr,   r0   r7   r1   r   r+   >   r2   r-   r)   r4   r.   r6   rB  r   r)   r6   r  r  z&Unexpected task state encountered for z; stimulus_id=z; story=),r_  rW   r  r  r   r  r   rv  r  rx   r!   r:   r  r   r   r   r   rp  r:  ru  r   r   r   r   r   r   r   r   rx  r   r  r   r   r  r  r   r   r  r   r   r   r  r  rY   )
r[   r  r   r  r  r   Zdep_workersZdep_keyr   r   rJ   rJ   rK   _handle_compute_task-  s|    



z WorkerState._handle_compute_taskr&  zIterator[TaskState]c                 c  sT   |  j |j8  _ | j|j}|D ],}| j| }d|_d|_| j	| |V  q"dS )aZ  Common code for the handlers of all subclasses of GatherDepDoneEvent.

        Yields the tasks that need to transition out of flight.
        The task states can be flight, cancelled, or resumed, but in case of scatter()
        they can also be in memory or error states.

        See also
        --------
        _execute_done_common
        TN)
rk  r   rj  r  r   r_  r   r   ri  r  )r[   r  r  rW   r   rJ   rJ   rK   _gather_dep_done_common  s    
z#WorkerState._gather_dep_done_commonr(  c                 C  s   i }|  |D ]}|j|jkr4d|j|j f||< q| j|jd|jt f | jrv|jdksbt	|| j
|j ksvt	|j|j | j|j |j d||< q|g fS )zjgather_dep terminated successfully.
        The response may contain fewer keys than the request.
        r1   missing-depr-   )r  rW   r*  rv  r  r   r!   rx  rx   r   re  r   r   r  rd  )r[   r  r  r   rJ   rJ   rK   _handle_gather_dep_success  s    
z&WorkerState._handle_gather_dep_successr-  c                 C  sx   | j |j i }g }| |D ]$}d||< |j| j  s ||j q t|j|jdg}|rp|t	||jd ||fS )z,gather_dep terminated: remote worker is busyr-   )r   r   r  )
ro  r   r   r  r   r  rW   r   r   r  )r[   r  r  Zrefresh_who_hasr   r  rJ   rJ   rK   _handle_gather_dep_busy  s"     z#WorkerState._handle_gather_dep_busyr/  c                 C  s   i }|  |D ]&}| j|jd|jt f d||< q| j|jdD ]>}| j	rn|j
dks^t|j|jksnt|j|jhkrFd||< qF| j|jD ]}| j| }|j|j q|g fS )a  gather_dep terminated: network failure while trying to
        communicate with remote worker

        Though the network failure could be transient, we assume it is not, and
        preemptively act as though the other worker has died (including removing all
        keys from it, even ones we did not fetch).

        This optimization leads to faster completion of the fetch, since we immediately
        either retry a different worker, or ask the scheduler to inform us of a new
        worker if no other worker is available.
        r  r-   rJ   r2   )r  rv  r  rW   r   r!   re  r  r   rx  rx   r   r   rd  r_  r  )r[   r  r  r   rW   rJ   rJ   rK   "_handle_gather_dep_network_failure  s    


z.WorkerState._handle_gather_dep_network_failurer0  c                   s     fdd|   D }|g fS )zvgather_dep terminated: generic error raised (not a network failure);
        e.g. data failed to deserialize.
        c                   s$   i | ]}|d  j  j j jfqS )r+   )r   r   r   r   r  r  rJ   rK   r     s   z:WorkerState._handle_gather_dep_failure.<locals>.<dictcomp>)r  )r[   r  r  rJ   r  rK   _handle_gather_dep_failure  s    
z&WorkerState._handle_gather_dep_failurerV  c                 C  s,   | j |j}|si g fS |d|jfig fS )Nr0   )r_  r}   rW   r  r[   r  r   rJ   rJ   rK   _handle_secede  s    zWorkerState._handle_secederR  c                 C  sd   | j |j}|d k	r|jnd }t|j||jd}|tdhB krV|sHt|di|gfS i |gfS d S )N)rW   rx   r   r7   r4   )r_  r}   rW   rx   r  r   r:   r   )r[   r  r   rx   r  rJ   rJ   rK   _handle_steal_request	  s    z!WorkerState._handle_steal_requestr"  c                 C  s   d| _ i g fS )zPrevent any further tasks to be executed or gathered. Tasks that are
        currently executing or in flight will continue to progress.
        F)rc  r  rJ   rJ   rK   _handle_pause  s    zWorkerState._handle_pauser$  c                 C  s   d| _ |  S )zEmerge from paused statusT)rc  r  r  rJ   rJ   rK   _handle_unpause#  s    zWorkerState._handle_unpauser%  c                 C  s   | j |j i g fS rZ   )ro  r  r   r  rJ   rJ   rK   _handle_retry_busy_worker)  s    z%WorkerState._handle_retry_busy_workerrL  c                 C  sZ   | j |j}|r"|jtdhB kr*i g fS | j|jd|jt f |j	rNt
|dig fS )zCancel a task on a best-effort basis. This is only possible while a task
        is in state `waiting` or `ready`; nothing will happen otherwise.
        r7   zcancel-computer4   )r_  r}   rW   rx   r:   rv  r  r   r!   r   r   r  rJ   rJ   rK   _handle_cancel_compute.  s    
z"WorkerState._handle_cancel_computerD  z$tuple[TaskState, Recs, Instructions]c                 C  s   | j |j}|s"t| |j| jr@|| jk|| jkks@td|_|  j	d7  _	| 
| | j| | j| |  \}}||kst|||fS )a  Common code for the handlers of all subclasses of ExecuteDoneEvent.

        The task state can be executing, cancelled, or resumed, but in case of scatter()
        it can also be in memory or error state.

        See also
        --------
        _gather_dep_done_common
        TrB  )r_  r}   rW   r   rY   rx  r,   rs  r   rt  r  r  r  r[   r  r   rZ  r[  rJ   rJ   rK   _execute_done_common=  s    
z WorkerState._execute_done_commonrE  c                 C  sl   |  |\}}}|jd|j|jd |td|j|j |jd |j|_|j|_d|j	f||< ||fS )zTask completed successfullycomputer  zcompute-durationr  r1   )
r&  r   r  rP   rQ   r   r   r   r`   r   r%  rJ   rJ   rK   _handle_execute_successY  s    
z#WorkerState._handle_execute_successrF  c                 C  s^   |  |\}}}|jdk	r<|jdk	r<|jd|j|jd d|j|j|j|jf||< ||fS )zTask execution failedNr'  r  r+   )	r&  rP   rQ   r   r  r   r   r   r   r%  rJ   rJ   rK   _handle_execute_failurej  s    z#WorkerState._handle_execute_failurerK  c                 C  s    |  |\}}}d||< ||fS )zTask raised Reschedule() exception while it was running.

        Note: this has nothing to do with work stealing, which instead causes a
        FreeKeysEvent.
        r5   )r&  r%  rJ   rJ   rK   _handle_reschedule{  s    zWorkerState._handle_reschedulerM  c                 C  sX   | j si g fS | jr4| j D ]}|jrt| |qtdd | j D |jd}i |gfS )Nc                 S  s   g | ]
}|j qS rJ   r   r  rJ   rJ   rK   
<listcomp>  s     z4WorkerState._handle_find_missing.<locals>.<listcomp>r  )rh  rx  r   r   rY   r  r   )r[   r  r   r  rJ   rJ   rK   _handle_find_missing  s    
z WorkerState._handle_find_missingrN  c                 C  sl   |  |j i }g }|jD ]H}| j|}|s0q|jrJ|jdkrJd||< q|js|jdkrd||< q||fS )Nr2   r-   )r  r   r_  r}   rx   )r[   r  r  r  rW   r   rJ   rJ   rK   _handle_refresh_who_has  s    


z#WorkerState._handle_refresh_who_haszstr | TaskStaterU   )keys_or_tasks_or_stimulir_   c                 G  s   dd |D }t || jS )ziReturn all records from the transitions log involving one or more tasks or
        stimulus_id's
        c                 S  s    h | ]}t |tr|jn|qS rJ   r   r   rW   r   r  rJ   rJ   rK   r    s    z$WorkerState.story.<locals>.<setcomp>)r   rv  )r[   r.  Zkeys_or_stimulirJ   rJ   rK   rY     s    zWorkerState.storyzlist[StateMachineEvent])keys_or_tasksr_   c                   s"   dd |D   fdd| j D S )z;Return all state machine events involving one or more tasksc                 S  s    h | ]}t |tr|jn|qS rJ   r/  r0  rJ   rJ   rK   r    s     z-WorkerState.stimulus_story.<locals>.<setcomp>c                   s    g | ]}t |d d kr|qS )rW   Nr   )r   r  r
  rJ   rK   r+    s      z.WorkerState.stimulus_story.<locals>.<listcomp>)rw  )r[   r1  rJ   r
  rK   stimulus_story  s    zWorkerState.stimulus_storyrJ   r   r   r   r   c                  s   | j | j| jdd | j D dd | j D t| jdd | j	
 D dd | jD t| jdd | jD d	d | jD | jd
d | jD | j| j| j| j| jd} fdd|
 D }t| dS )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        c                 S  s   g | ]
}|j qS rJ   r   r  rJ   rJ   rK   r+    s     z(WorkerState._to_dict.<locals>.<listcomp>c                 S  s   g | ]
}|j qS rJ   r   r  rJ   rJ   rK   r+    s     c                 S  s$   i | ]\}}|d d |  D qS )c                 S  s   g | ]
}|j qS rJ   r   r  rJ   rJ   rK   r+    s     z3WorkerState._to_dict.<locals>.<dictcomp>.<listcomp>)sorted)r   r  tssrJ   rJ   rK   r     s    z(WorkerState._to_dict.<locals>.<dictcomp>c                 S  s   h | ]
}|j qS rJ   r   r  rJ   rJ   rK   r    s     z'WorkerState._to_dict.<locals>.<setcomp>c                 S  s   h | ]
}|j qS rJ   r   r  rJ   rJ   rK   r    s     c                 S  s   h | ]
}|j qS rJ   r   r  rJ   rJ   rK   r    s     c                 S  s   g | ]
}|j qS rJ   r   r  rJ   rJ   rK   r+    s     )r^  rb  rc  r3   r*   r*  re  r,   rd  rs  ri  rj  rh  ro  rv  rw  ry  r_  c                   s   i | ]\}}| kr||qS rJ   rJ   r   r   rJ   rK   r     s       r   )r^  rb  rc  r3   r3  r*   r   rU  r*  re  r   r,   rd  rs  ri  rj  rh  ro  rv  rw  ry  r_  r%   )r[   r   r  rJ   r   rK   r    s.    

zWorkerState._to_dictz,dict[TaskStateState | Literal['other'], int]c              
     s   t  fdd jD }t jt jt j| t jt jt j j	t j
t jd	}t jt |   |d< }|dkst|S )Nc                 3  s   | ]} j | jd kV  qdS r  )r_  rx   r   rW   ra   rJ   rK   r     s    z*WorkerState.task_counts.<locals>.<genexpr>)	r,   r0   r1   r3   r*   r7   r-   r2   r.   r   r   )sumru  r  r,   rs  r*  r3   r*   r7   rf  rh  ri  r_  r  r   )r[   Zn_actors_in_memoryr   r   rJ   ra   rK   task_counts  s     zWorkerState.task_countsc                 C  s:   |j | jks|j | jkstt|jts,t|jr6td S rZ   )rW   r*  ru  r   r   r   rz   r   r  rJ   rJ   rK   _validate_task_memory  s    z!WorkerState._validate_task_memoryc                 C  s   |j dks|jdkr2|| jks"t|| jksftn4|j dksJ|jdksJt|| jksXt|| jksft|jdk	stt|j| jkst|jrt|j	D ] }|| j
kst|| jkstqdS )aj  Validate tasks:

        - ts.state == executing
        - ts.state == long-running
        - ts.state == cancelled, ts.previous == executing
        - ts.state == cancelled, ts.previous == long-running
        - ts.state == resumed, ts.previous == executing, ts.next == fetch
        - ts.state == resumed, ts.previous == long-running, ts.next == fetch
        r,   r0   N)rx   r   r,   r   rs  r   rW   r*  r   r   r3   r*   r[   r   r  rJ   rJ   rK   _validate_task_executing  s    


z$WorkerState._validate_task_executingc                   s   |j dkr2|jrt| jks"t| jksftn4|js<t|j dksJt| jksXt| jksft|j jksvt|jrt|jrtt	 fdd|j
D stdS )zWValidate tasks:

        - ts.state == ready
        - ts.state == constrained
        r3   r*   c                 3  s&   | ]}|j  jkp|j  jkV  qd S rZ   r  r  ra   rJ   rK   r   <  s    z3WorkerState._validate_task_ready.<locals>.<genexpr>N)rx   r   r   r3   r*   rW   r*  r   r   r   r   r  rJ   ra   rK   _validate_task_ready)  s    




z WorkerState._validate_task_readyc                   s|   |j  jkst|jrt| jks(t|js2t|j fdd|jD ksPt|jD ] }| jksht| j	ksVtqVd S )Nc                   s(   h | ] }|j  jkr|j  jkr|qS rJ   r  r  ra   rJ   rK   r  E  s    z5WorkerState._validate_task_waiting.<locals>.<setcomp>)
rW   r*  r   r   r7   r   r   r   r3   r*   r9  rJ   ra   rK   _validate_task_waiting@  s    


z"WorkerState._validate_task_waitingc                 C  s   |j | jkst|j | jks t|| jks.t|jD ] }|| jksFt|| jks4tq4|js`t|j| j	kspt|j | j	|j kstdS )zValidate tasks:

        - ts.state == flight
        - ts.state == cancelled, ts.previous == flight
        - ts.state == resumed, ts.previous == flight, ts.next == waiting
        N)
rW   r*  r   ru  ri  r   r3   r*   r   rj  r9  rJ   rJ   rK   _validate_task_flightN  s    

z!WorkerState._validate_task_flightc                 C  s   |j | jkst|j | jks t| j|jks0t|jr:t|jsDt|jD ]*}|j | j| ksbt|| j| ksJtqJ|j	D ] }|| j
kst|| jks|tq|d S rZ   )rW   r*  r   ru  r^  r   r   rd  re  r   r3   r*   )r[   r   r  r  rJ   rJ   rK   _validate_task_fetch_  s    



z WorkerState._validate_task_fetchc                   s    j | jkst j | jks t jr*t jr4tt fdd| j D rTt | j	ksbt j
D ] }|| jkszt|| jkshtqhd S )Nc                 3  s   | ]} j |kV  qd S rZ   r   )r   rd  r   rJ   rK   r   q  s     z5WorkerState._validate_task_missing.<locals>.<genexpr>)rW   r*  r   ru  r   r   r   rd  r  rh  r   r3   r*   r9  rJ   r?  rK   _validate_task_missingl  s    

 
z"WorkerState._validate_task_missingc                 C  s@   |j d kst|jdkr$| | n|jdks2t| | d S )Nr  r.   )r   r   r   r:  r=  r  rJ   rJ   rK   _validate_task_cancelledw  s
    
z$WorkerState._validate_task_cancelledc                 C  sv   |j dkr$|jdkst| | n&|j dks2t|jdks@t| | |jD ] }|| jksbt|| jksPtqPd S )Nr  r-   r.   r7   )r   r   r   r:  r=  r   r3   r*   r9  rJ   rJ   rK   _validate_task_resumed  s    


z"WorkerState._validate_task_resumedc                 C  s   |j | jkst|j | jks t|jr*t|jr4t| j D ]}||ks>tq>|| jks^t|| j	kslt|| j
kszt|jrt|jrt|jrt|jrtd S rZ   )rW   r*  r   ru  r   r   re  r  r,   ri  rh  r   r   r   r   )r[   r   r4  rJ   rJ   rK   _validate_task_released  s    

	


z#WorkerState._validate_task_releasedc              
   C  sL  z|j | jkr"| j|j  |ks"t|jdkr8| | n|jdkrN| | n|jdkrd| | n|jdkrz| | n|jdkr| | nl|jdkr| 	| nV|jdkr| 
| n@|jdkr| | n*|jd	kr| | n|jd
kr| | W nH tk
rF } z(t| t|j |j| |d|W 5 d }~X Y nX d S )Nr1   r7   r2   r)   r6   )r3   r*   r  r.   r-   r4   rw   )rW   r_  r   rx   r8  r<  r@  rA  rB  r;  r:  r=  r>  rC  r2  r  r   rv   rY   )r[   r   r  rJ   rJ   rK   r    s@    










  zWorkerState.validate_taskc              	     s   j  D ]}|jD ]&}| jks&t|j j| kstq|jD ]*} j |j |ksZt||jksBt|qB|j	D ]*} j |j |kst|j
tkstt|qtq
 j D ]L\}}| jkst|D ]0}| j kst || j | jkstqqt } j D ]@\}}|D ]0}|| |j
dks4t||jkstqq
t| jks`t jD ]}|j
dksftqf jD ]}|j
dkstq jD ]}|j
dkstq jD ]2}|j
dks|j
dkr|jdkst|q jD ]2}|j
dks|j
dkr"|jdkst|q jD ]2}|j
dks4|j
dkr\|jdks4t|q4 jD ]}|j
d	ksntqnt j  j j j j j j jf D ]} j |j |kstqt fd
dt j jD }	 j|	kstd jd|	  jD ]}
|
 j kst|
q jD ]}
|
 j ks<t|
q< j  D ]}  | qb j!r j" j!k st #  d S )Nr-   r2   r3   r*   r,   r  r0   r.   r7   c                 3  s   | ]} j | jpd V  qdS )r   N)r_  r   r5  ra   rJ   rK   r     s    z-WorkerState.validate_state.<locals>.<genexpr>zself.nbytes=z; expected )$r_  r  r   r^  r   rW   rd  r   r   r   rx   r;   r   rY   r   re  r   r  rf  rh  r3   r*   r,   r   rs  ri  r7   r   r6  r*  ru  r   r  rz  ry  _validate_resources)r[   r   r   r  Zts_waitr  r   Z	fetch_tssr4  Zexpect_nbytesrW   rJ   ra   rK   validate_state  s    











"

zWorkerState.validate_statec                 C  s   | j  | j kst| j  }| j D ]*\}}|dksFt| j||  |8  < q,| jD ]>}|j D ].\}}|dkst||jf||  |8  < qlq^tdd |	 D st|dS )zKAssert that available_resources + resources held by tasks = total_resourcesg&.r   c                 s  s   | ]}t |d k V  qdS )g&.>N)abs)r   r   rJ   rJ   rK   r   %  s     z2WorkerState._validate_resources.<locals>.<genexpr>N)
rq  r  rr  r   r   r   r  r   r   r  )r[   rS   r   r   r   rJ   rJ   rK   rD    s    

zWorkerState._validate_resources)trD   rE   rF   rG   rH   r   r   mathinfr\   r  propertyr  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r  r   r  registerr  r  r  r  r  r  r  r  r  r  r  r   r!  r"  r#  r$  r&  r(  r)  r*  r,  r-  rY   r2  r  r7  r8  r:  r;  r<  r=  r>  r@  rA  rB  rC  r  rE  rD  rJ   rJ   rJ   rK   r]  $  s  
		&?	))E@&I
(
)$


8
\++Z"
	$#"Yr]  c                   @  s   e Zd ZU dZded< ded< ddddZd	d
dddZdd
dddZd+dd
dddZe	j
dd
dddZe	j
dddddddd Ze	j
dddd!d"d#Ze	j
ddd$d%d&Ze	j
ddd
d'd(d)Zd*S ),
BaseWorkerzWrapper around the :class:`WorkerState` that implements instructions handling.
    This is an abstract class with several ``@abc.abstractmethod`` methods, to be
    subclassed by :class:`~distributed.worker.Worker` and by unit test mock-ups.
    r]  rx   zset[asyncio.Task]_async_instructionsr  c                 C  s   || _ t | _d S rZ   )rx   r   rL  r   rJ   rJ   rK   r\   1  s    zBaseWorker.__init__zasyncio.Task[StateMachineEvent]r   )rC   r_   c                 C  s@   | j | z| }W n tjk
r0   Y dS X | | dS )zJAn asynchronous instruction just completed; process the returned stimulus.N)rL  r  resultasyncioZCancelledErrorr  )r[   rC   r  rJ   rJ   rK   _handle_stimulus_from_task5  s    z%BaseWorker._handle_stimulus_from_taskr  r  c                 G  s^  | j j| }|D ]F}d}t|tr6| |  nt|trR| |j|j	 nt|t
r|jsftdtd|jd }t|dkr|dd d }tj| j|j|j|j|jdd	|j d
| dd}njt|trtj| j|j|jdd|j dd}n6t|tr0tj| |jd|j dd}nt||dk	r| j| || j qdS )az  Forward one or more external stimuli to :meth:`WorkerState.handle_stimulus`
        and process the returned instructions, invoking the relevant Worker callbacks
        (``@abc.abstractmethod`` methods below).

        Spawn asyncio tasks for all asynchronous instructions and start tracking them.

        See also
        --------
        WorkerState.handle_stimulus
        Nr      r   P   M   z...)r   r   zgather_dep(z, {z})r   r   zexecute(r   zretry_busy_worker_later() rx   r  r   r   batched_sendr   r   digest_metricr   r   r   r   r   rj   r   r  rN  Zcreate_task
gather_depr   r   r   r   executerW   r   retry_busy_worker_laterr  rL  r   Zadd_done_callbackrO  )r[   r  r  r   rC   Zkeys_strrJ   rJ   rK   r  @  sF    






zBaseWorker.handle_stimulus   rO   )timeoutr_   c                   s\   | j s
dS | j D ]}|  qtj| j |dI dH \}}|D ]}td| d|  q<dS )z$Cancel all asynchronous instructionsN)rZ  z$Failed to cancel asyncio task after z
 seconds: )rL  cancelrN  waitr  r+   )r[   rZ  rC   r  pendingrJ   rJ   rK   closex  s    

zBaseWorker.closer   )rs   r_   c                 C  s   dS )a  Send a fire-and-forget message to the scheduler through bulk comms.

        Parameters
        ----------
        msg: dict
            msgpack-serializable message to send to the scheduler.
            Must have a 'op' key which is registered in Scheduler.stream_handlers.
        NrJ   )r[   rs   rJ   rJ   rK   rT    s    zBaseWorker.batched_sendrM   r  rz   )r   r   r   r   r_   c                  s   dS )a  Gather dependencies for a task from a worker who has them

        Parameters
        ----------
        worker : str
            Address of worker to gather dependencies from
        to_gather : list
            Keys of dependencies to gather from worker -- this is not
            necessarily equivalent to the full list of dependencies of ``dep``
            as some dependencies may already be present on this worker.
        total_nbytes : int
            Total number of bytes for all the dependencies in to_gather combined
        NrJ   )r[   r   r   r   r   rJ   rJ   rK   rV    s    	zBaseWorker.gather_deprI  c                  s   dS )zExecute a taskNrJ   )r[   rW   r   rJ   rJ   rK   rW    s    zBaseWorker.execute)r   r_   c                   s   dS )z9Wait some time, then take a peer worker out of busy stateNrJ   )r[   r   rJ   rJ   rK   rX    s    z"BaseWorker.retry_busy_worker_later)r   r   r_   c                 C  s   dS )z!Log an arbitrary numerical metricNrJ   )r[   r   r   rJ   rJ   rK   rU    s    zBaseWorker.digest_metricN)rY  )rD   rE   rF   rG   rH   r\   rO  r  r^  abcabstractmethodrT  rV  rW  rX  rU  rJ   rJ   rJ   rK   rK  (  s"   
8
rK  c                   @  sr   e Zd ZU ded< ded< dddddZd	dd
dddZd
dddZddddddZddd
dddZdS )DeprecatedWorkerStateAttributerM   r   r   targetNrb  c                 C  s
   || _ d S rZ   rc  )r[   rb  rJ   rJ   rK   r\     s    z'DeprecatedWorkerStateAttribute.__init__r`   r   )ownerr   r_   c                 C  s
   || _ d S rZ   rS  )r[   rd  r   rJ   rJ   rK   __set_name__  s    z+DeprecatedWorkerStateAttribute.__set_name__r^   c                 C  s(   t d| j d| jp| j dt d S )NzThe `Worker.z,` attribute has been moved to `Worker.state.`)warningswarnr   rb  FutureWarningra   rJ   rJ   rK   _warn_deprecated  s    z/DeprecatedWorkerStateAttribute._warn_deprecatedzWorker | Noneztype[Worker]r   )instancerd  r_   c                 C  s(   |d krd S |    t|j| jp$| jS rZ   )rj  r   rx   rb  r   )r[   rk  rd  rJ   rJ   rK   __get__  s    z&DeprecatedWorkerStateAttribute.__get__r(   )rk  r   r_   c                 C  s"   |    t|j| jp| j| d S rZ   )rj  setattrrx   rb  r   )r[   rk  r   rJ   rJ   rK   __set__  s    z&DeprecatedWorkerStateAttribute.__set__)N)	rD   rE   rF   rH   r\   re  rj  rl  rn  rJ   rJ   rJ   rK   ra    s   
ra  )
__future__r   r_  rN  r  loggingrG  r  r  sysrg  r   collectionsr   r   collections.abcr   r   r   r   r	   r
   r   Zdataclassesr   r   	functoolsr   r   r   	itertoolsr   typingr   r   r   r   r   r   r   Ztlzr   r{   Z
dask.utilsr   r   Zdistributed._storiesr   Zdistributed.collectionsr   Zdistributed.commr   Zdistributed.corer   r    Zdistributed.metricsr!   Zdistributed.protocolr"   Zdistributed.protocol.serializer#   Zdistributed.sizeofr$   r  Zdistributed.utilsr%   	getLoggerr  Ztyping_extensionsr&   Zdistributed.diagnostics.pluginr'   Zdistributed.workerr(   r8   rH   rM   r9   r:   r;   rI   r<   rL   r2  rT   rq   rv   ry   r~   version_infoZdc_slotsr   r   r   r   r   r   r   r   r   r   r  r  r  r  r  r  r  r"  r$  r%  r&  r(  r-  r/  r0  r6  rD  rE  rF  rK  rL  rM  rN  rO  rP  rQ  rR  rS  rV  rW  rX  rY  r   r   r   r\  r]  ABCrK  ra  rJ   rJ   rJ   rK   <module>   sb    $
	&# 
"">$P	/=                     