U
    /eL                     @  s`  d dl mZ d dlZd dlZd dlmZmZ d dlmZ d dl	m
Z
 d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d dlZd d
lmZ d dlmZ d dlmZ d dlmZ d dlm Z m!Z! erd dl"m#Z#m$Z$m%Z%m&Z& dZ'e(e)Z*ej+,dZ-dddhZ.dddddhZ/ddhZ0G dd deZ1G dd deZ2dd d!d"d#d$d%Z3d&hZ4dS )'    )annotationsN)defaultdictdeque)	Container)partial)log2)time)TYPE_CHECKINGAnyClassVar	TypedDictcast)topk)parse_timedelta)PeriodicCallback)CommClosedError)SchedulerPlugin)
log_errorsrecursive_to_dict)	SchedulerSchedulerState	TaskStateWorkerStateg?zdistributed.admin.pdb-on-errreadyZconstrainedZwaitingZmemoryZ	executingzlong-runningZ	cancelledZresumedZreleasedc                   @  s6   e Zd ZU ded< ded< ded< ded< ded< d	S )
InFlightInfor   victimthieffloatvictim_durationthief_durationstrstimulus_idN)__name__
__module____qualname____annotations__ r&   r&   8/tmp/pip-unpacked-wheel-g426oqom/distributed/stealing.pyr   8   s
   
r   c                
   @  s  e Zd ZU ded< ded< ded< dedd	 ed
dD  Zded< ded< ded< ded< ded< ded< ded< ded< ded< ddddZdgd!d"d#d$d%Zd"d&d'd(Z	d)d*d+d,d-d.d/Z
d!d"d0d1d2Zdhd!d!d"d3d4d5Zdd6d"d3d7d8Zd"d&d9d:Zdid6d6d6d!d!d!d!d"d;d<d=Zd>d?d"d@dAdBZd>dCdDdEdFZd>d"dDdGdHZd>d"dDdIdJZd>d"dDdKdLZd>dMdDdNdOZd>dPdPd6dQdRdSZd dTd6d6d6dUd"dVdWdXZd"d&dYdZZdPdd[d\d]ZdPdd[d^d_Zd!d"d#d`daZdbdcdddedfZd S )jWorkStealingr   	schedulerz%dict[str, tuple[set[TaskState], ...]]	stealablez dict[TaskState, tuple[str, int]]key_stealable)g      ?c                 c  s   | ]}d d|d   V  qdS )         Nr&   ).0ir&   r&   r'   	<genexpr>G   s    zWorkStealing.<genexpr>r,      zClassVar[tuple[float, ...]]cost_multipliersr   _callback_timeintcountzdict[TaskState, InFlightInfo]	in_flightzdefaultdict[WorkerState, float]in_flight_occupancyzdefaultdict[WorkerState, int]in_flight_taskszdict[str, dict[int, float]]metricszasyncio.Event_in_flight_event_request_counter)r)   c                 C  s   || _ i | _i | _|jD ]}| j|d qttttj	
ddd| _| j |  tdd| j jd< d| _i | _td	d
 | _tdd
 | _t | _tdd
 tdd
 d| _d| _| j| j jd< d S )Nworkerz,distributed.scheduler.work-stealing-intervalms)defaulti )maxlenstealingr   c                   S  s   dS Nr   r&   r&   r&   r&   r'   <lambda>i       z'WorkStealing.__init__.<locals>.<lambda>c                   S  s   dS rC   r&   r&   r&   r&   r'   rD   j   rE   c                   S  s   dS rC   r&   r&   r&   r&   r'   rD   m   rE   c                   S  s   dS rC   r&   r&   r&   r&   r'   rD   n   rE   )request_count_totalrequest_cost_totalzsteal-response)r)   r*   r+   workers
add_workerr   r   r   daskconfiggetr4   Z
add_pluginr   eventsr6   r7   r   r8   r9   asyncioEventr;   r:   r<   move_task_confirmZstream_handlersselfr)   r>   r&   r&   r'   __init__U   s0    




zWorkStealing.__init__Nr
   None)r)   returnc                   sF   d| j jkrdS t| j| jd d}|  || j jd< | j  dS )aF  Start the background coroutine to balance the tasks on the cluster.
        Idempotent.
        The scheduler argument is ignored. It is merely required to satisfy the
        plugin interface. Since this class is simultaneously an extension, the
        scheduler instance is already registered during initialization
        rB   Ni  )callbackZcallback_time)r)   periodic_callbacksr   balancer4   startr;   set)rR   r)   pcr&   r&   r'   rY   s   s     zWorkStealing.start)rU   c                   s0   | j jdd}|r|  | j I dH  dS )zStop the background task balancing tasks on the cluster.
        This will block until all currently running stealing requests are
        finished. Idempotent
        rB   N)r)   rW   popstopr;   wait)rR   r[   r&   r&   r'   r]      s    zWorkStealing.stopr&   )excludezContainer[str]dict)r_   rU   c                C  s   t | |ddS )zDictionary representation for debugging purposes.
        Not type stable and not intended for roundtrips.

        See also
        --------
        Client.dump_cluster_state
        distributed.utils.recursive_to_dict
        T)r_   members)r   )rR   r_   r&   r&   r'   _to_dict_no_nest   s    	zWorkStealing._to_dict_no_nest)msgrU   c                 C  s   | j d|S NrB   )r)   Z	log_event)rR   rc   r&   r&   r'   log   s    zWorkStealing.log)r)   r>   rU   c                 C  s    t dd tdD | j|< d S )Nc                 s  s   | ]}t  V  qd S N)rZ   )r/   _r&   r&   r'   r1      s     z*WorkStealing.add_worker.<locals>.<genexpr>r2   )tupleranger*   rQ   r&   r&   r'   rI      s    zWorkStealing.add_workerr    c                 C  s   | j |= d S rf   )r*   rQ   r&   r&   r'   remove_worker   s    zWorkStealing.remove_workerc                 C  s&   | j j}d|kr"|d   |d= d S rd   )r)   rW   r]   )rR   Zpcsr&   r&   r'   teardown   s    zWorkStealing.teardown)keyrY   finishcompute_startcompute_stopargskwargsrU   c           	      O  sL   |dkr | j j| }| | n(|dkrH| j j| }| | | | d S )N
processing)r)   tasksput_key_in_stealableremove_key_from_stealable_remove_from_in_flight)	rR   rl   rY   rm   rn   ro   rp   rq   tsr&   r&   r'   
transition   s    

zWorkStealing.transitionr   r   )rw   inforU   c                 C  sx   || j |< | j  |d }|d }| j|  |d 8  < | j|  |d 7  < | j|  d8  < | j|  d7  < d S )Nr   r   r   r   r,   )r7   r;   clearr8   r9   rR   rw   ry   r   r   r&   r&   r'   _add_to_in_flight   s    

zWorkStealing._add_to_in_flightzInFlightInfo | None)rw   rU   c                 C  s   | j |d }|r|d }|d }| j|  |d 8  < | j|  |d 7  < | j|  d7  < | j|  d8  < | j s| j  | j  |S )Nr   r   r   r   r,   )r7   r\   r8   r9   rz   r;   rZ   r{   r&   r&   r'   rv      s    

z#WorkStealing._remove_from_in_flightc                 C  s"   || j kr| | | | d S rf   )r7   ru   rt   )rR   rw   r&   r&   r'   recalculate_cost   s    

zWorkStealing.recalculate_costc                 C  s^   |  |\}}|d k	rZ|d k	s"t|js,t|j}|j}| j| | | ||f| j|< d S rf   )steal_time_ratioAssertionErrorprocessing_onaddressr*   addr+   )rR   rw   cost_multiplierlevelwsr>   r&   r&   r'   rt      s    
z!WorkStealing.put_key_in_stealablec                 C  sT   | j |d }|d krd S |\}}z| j| | | W n tk
rN   Y nX d S rf   )r+   r\   r*   removeKeyError)rR   rw   resultr>   r   r&   r&   r'   ru      s    z&WorkStealing.remove_key_from_stealablez'tuple[float, int] | tuple[(None, None)]c                 C  s   |j j}|tkrdS |jsdS | j|}|sL|js8t||jjksHtdS |	 }|| jj
 t }|| }ttt|d }|dk rd}n|t| jkrdS ||fS )a;  The compute to communication time ratio of a key

        Returns
        -------
        cost_multiplier: The increased cost from moving this task as a factor.
        For example a result of zero implies a task without dependencies.
        level: The location within a stealable list to place this value
        )NN)r   r   r.   r,   )prefixname
fast_tasksZdependenciesr)   get_task_durationr   r   Zlong_runningZget_nbytes_deps	bandwidthLATENCYr5   roundr   lenr3   )rR   rw   splitZcompute_timenbytesZtransfer_timer   r   r&   r&   r'   r~      s&    	
zWorkStealing.steal_time_ratior   )rw   r   r   rU   c              
   C  s2  z|| j krW dS d| j }|  jd7  _|j}| | td|||j||j | j|| j	|| }| j|| j	|| }| jj
|j d||d |||||d}| || |W S  tk
r   td|| Y d	S  tk
r, }	 z&t|	 trd
d l}
|
   W 5 d }	~	X Y nX d S )Nz	in-flightzsteal-r,   z#Request move %s, %s: %2f -> %s: %2fzsteal-request)oprl   r!   )r   r   r   r   r!   z(Worker comm %r closed while stealing: %rzcomm-closedr   )r7   r<   rl   ru   loggerdebug	occupancyr)   r   get_comm_costZstream_commsr   sendr|   r   ry   	Exception	exceptionLOG_PDBpdb	set_trace)rR   rw   r   r   r!   rl   r   r   ry   er   r&   r&   r'   move_task_request  sZ    



zWorkStealing.move_task_requestr=   z
str | None)rl   stater!   r>   rU   c             
     s6  z| j j| }W n" tk
r2   td| Y d S X z0| j| d |krb| d||||f W d S W n* tk
r   | d||||f Y d S X | |}|st|d }|d }td|||| | j j	r|j
|kstz:z|||j|j|g}	|tks|tkrJ|| j j|jkrJ| d|j| j jkf|	 | j j||d	 n|tkrd| d|	 nf|tkr| | ||_
|| || | | | j |j| | d|	 ntd| W nF tk
r }
 z&t|
 tr dd l}|   W 5 d }
~
X Y nX W 5 | j | | j | X d S )Nz,Key released between request and confirm: %sr!   zstale-responsezalready-abortedr   r   z%Confirm move %s, %s -> %s.  State: %sZ
reschedule)r!   already-computingconfirmzUnexpected task state: r   )r   )r   )r)   rs   r   r   r   r7   re   rv   r   validater   check_idle_saturatedr   _WORKER_STATE_UNDEFINED_WORKER_STATE_CONFIRMrH   rL   Z_reschedule_WORKER_STATE_REJECTru   Zremove_from_processingZadd_to_processingrt   Zsend_task_to_worker
ValueErrorr   r   r   r   r   )rR   rl   r   r!   r>   rw   ry   r   r   Z_log_msgr   r   r&   r&   r'   rP   G  sl    








zWorkStealing.move_task_confirmc                   s  j }g }t }t  d}t|j   rBt t|jkrPW 5 Q R  d S |j}|st	d|j j
d} fdd|D }|sW 5 Q R  d S t|dk rt|j
dd}|st sttjD ]\}} s qt|D ]}j|j | }	|	r s
qt|	D ]f}
 s$ q||
jks<|
j|k	rJ|	|
 q|d	7 }t||
  }shq|
|jkr|	|
 q
|}
|}j |
|}j |
|}j |
}|| | ||| d
  kr|
|| || }||||
j||j||j|f jd |  d	7  < jd |  |7  < 
|}|}j |||sn | |	|
 qj j|
|d qq|r d|f  j!d	7  _!t }|j"r|j"d #||  W 5 Q R X d S )Nr   
   rl   c                   s6   g | ].} |d kr||jkr| kr|qS )g?)_combined_occupancy_combined_nprocessingZnthreads)r/   r   potential_thievesrR   r&   r'   
<listcomp>  s
   z(WorkStealing.balance.<locals>.<listcomp>   T)rl   reverser,   r-   rF   rG   )Zoccrequestzsteal-duration)$r)   r   r   rZ   Zidlevaluesr   rH   Z	saturatedr   r   sortedr   	enumerater3   listr*   r   r+   r   discard
_get_thiefrr   r   r   r   appendrl   r:   r   Zis_unoccupiedr   re   r6   digestsr   )rR   sre   rY   r0   Zpotential_victimsr   rg   r   r*   rw   r   Z	occ_thiefZ
occ_victimZcomm_cost_thiefZcomm_cost_victimZcomputeZcostZnproc_thiefr]   r&   r   r'   rX     s    
    







  
 
zWorkStealing.balance)r   rU   c                 C  s   |j | j|  S rf   )r   r8   rR   r   r&   r&   r'   r     s    z WorkStealing._combined_occupancyc                 C  s   t |j| j|  S rf   )r   rr   r9   r   r&   r&   r'   r     s    z"WorkStealing._combined_nprocessingc                 C  s0   | j  D ]}|D ]}|  qq
| j  d S rf   )r*   r   rz   r+   )rR   r)   r*   r   r&   r&   r'   restart  s    zWorkStealing.restartzstr | TaskStater   )
keys_or_tsrU   c                   st   dd |D  g }| j jddD ]N\}}|d dkr>|d }n|g}|D ]$}t fdd	|D rH|| qHq |S )
Nc                 S  s    h | ]}t |ts|jn|qS r&   )
isinstancer    rl   )r/   rl   r&   r&   r'   	<setcomp>   s     z%WorkStealing.story.<locals>.<setcomp>rB   )Ztopicr   r   r,   c                 3  s   | ]}| kV  qd S rf   r&   )r/   xkeysr&   r'   r1     s     z%WorkStealing.story.<locals>.<genexpr>)r)   Z
get_eventsanyr   )rR   r   outrg   Ltr&   r   r'   story  s    
zWorkStealing.story)N)NN)NN)r"   r#   r$   r%   rh   ri   r3   rS   rY   r]   rb   re   rI   rj   rk   rx   r|   rv   r}   rt   ru   r~   r   rP   rX   r   r   r   r   r&   r&   r&   r'   r(   @   sN   


  

&7Dgr(   r   r   zset[WorkerState]zWorkerState | None)r)   rw   r   rU   c                 C  sB   |  |}|d k	r.||@ }|r$|}n
|js.d S t|t| j|dS )Nr   )valid_workersZloose_restrictionsminr   Zworker_objective)r)   rw   r   r   Zvalid_thievesr&   r&   r'   r     s    
r   zsplit-shuffle)5
__future__r   rN   loggingcollectionsr   r   collections.abcr   	functoolsr   mathr   r   typingr	   r
   r   r   r   Ztlzr   rJ   Z
dask.utilsr   Zdistributed.compatibilityr   Zdistributed.corer   Zdistributed.diagnostics.pluginr   Zdistributed.utilsr   r   Zdistributed.schedulerr   r   r   r   r   	getLoggerr"   r   rK   rL   r   r   r   r   r   r(   r   r   r&   r&   r&   r'   <module>   sP   
   P