U
    /e-                     @  s   d dl mZ d dlZd dlmZ d dlZd dlmZmZm	Z	 d dl
mZ d dlmZ d dlmZ eeZG dd	 d	eZdd
dZdd ZddddedZddddddZddddddZdS )    )annotationsN)deque)format_time	key_splitparse_timedelta)SchedulerPlugin)color_of)timec                   @  s6   e Zd ZdZdddZdd ZdddZdd
dZdS )TaskStreamPluginztask-streamNc                 C  s>   |d kr"t tjdtjd}t|d| _|| _d| _d S )Nz9distributed.scheduler.dashboard.status.task-stream-lengthz8distributed.scheduler.dashboard.tasks.task-stream-length)maxlenr   )maxdaskconfiggetr   buffer	schedulerindex)selfr   r    r   G/tmp/pip-unpacked-wheel-g426oqom/distributed/diagnostics/task_stream.py__init__   s    zTaskStreamPlugin.__init__c                 O  s\   |dkrX|| j jkrd S |ds&d S ||d< |dks>|dkrX| j| |  jd7  _d S )N
processing
startstopskeyZmemoryZerred   )r   Ztasksr   r   appendr   )r   r   startfinishargskwargsr   r   r   
transition!   s    
zTaskStreamPlugin.transitionc                   s    fdd t |tr&t t| }|d k	r@ |dtj}t |trXt t| }|d k	rr |dtj}|d k	r|d kr|d krtj}|| }n2|d kr|d k	r|| }n|d k	r|d kr|| }|d krtj}|d krd}td|}t|tj}fddt||D S )Nc                   s\   ||kr|S || d }t dd j| d D }|| k rL | |d |S  | ||S d S )N   c                 s  s   | ]}|d  V  qdS )stopNr   ).0	startstopr   r   r   	<genexpr>3   s    z;TaskStreamPlugin.collect.<locals>.bisect.<locals>.<genexpr>r   r   )r   r   )targetleftrightZmidvaluebisectr   r   r   r+   .   s    z(TaskStreamPlugin.collect.<locals>.bisectr   c                   s   g | ]} j | qS r   )r   )r#   i)r   r   r   
<listcomp>W   s     z,TaskStreamPlugin.collect.<locals>.<listcomp>)	
isinstancestrr	   r   lenr   r   minrange)r   r   r"   countr   r*   r   collect-   s0    






zTaskStreamPlugin.collectr   c           	      C  sn   g }| j t| j }|d kr"| j }ttd|p.d| |r@|| n|D ]}| j| }|| qFt|||dS )Nr   )workersstart_boundary)r   r0   r   r2   r   r   
rectangles)	r   istartistopr5   r6   msgsZdiffr,   msgr   r   r   r7   Y   s    (
zTaskStreamPlugin.rectangles)N)NNN)NNr   )__name__
__module____qualname__namer   r    r4   r7   r   r   r   r   r
      s
   

,r
   c                 C  s  |d kri }g }g }g }g }g }g }g }	g }
g }g }| D ]j}|d }t |}|dg }zd|d |d f }W n, tk
r   Y q8tjd|dd d	}Y nX ||krt|d
 ||< |D ]}|d |k rqt|d  }t|tk	r||}|	|d |d  d
 d  |	d|d |d    |	t
|d |d   |	| |	t|d  |  |	| |		t|d   |
	|d  |	| |	||  qq8|||||||	|
||d
S )Nr   r   z%s-%dworkerthreadz%Message contained bad information: %sT)exc_info r!   r   actionr"   i  )
r   durationZduration_textr   r?   coloralphar@   worker_thready)r   r   	Exceptionloggerwarningr0   colorstyper/   r   r   prefixalphas)r:   r5   r6   ZL_startZ
L_durationZL_duration_textZL_keyZL_nameZL_colorZL_alphaZL_workerZL_worker_threadZL_yr;   r   r?   r   rH   r$   rF   r   r   r   r7   e   sf    




r7   c                 C  s(   | d dkr t | d }t|S dS d S )NstatusOKr   Zblack)r   r   )r;   splitr   r   r   color_of_message   s    rT   ZredZorangeZgray)transfer
disk-write	disk-readdeserializecomputeg?r   )rU   rY   rX   rV   rW   z	transfer-zdisk-write-z
disk-read-zdeserialize-rC   )Nr   )
__future__r   loggingcollectionsr   r   Z
dask.utilsr   r   r   Zdistributed.diagnostics.pluginr   Z'distributed.diagnostics.progress_streamr   Zdistributed.metricsr	   	getLoggerr<   rK   r
   r7   rT   rM   rP   rO   r   r   r   r   <module>   s:   
U
=	

