U
    /e                     @  s   d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlmZ d dlmZ eeZd	d
 Zdd Zdd Zd#ddZdd ZddddedZddddddZddddd dZd!d" ZdS )$    )annotationsN)mergevalmap)	key_split)coerce_to_addressconnect)AllProgresscolor_of)dumps_functionc                   s(   t tt j jd fdddD S )N)allnbytesc                   s   i | ]}|t t j| qS  )r   lenstate).0r   allprogressr   K/tmp/pip-unpacked-wheel-g426oqom/distributed/diagnostics/progress_stream.py
<dictcomp>   s    zcounts.<locals>.<dictcomp>)memoryerredreleased
processingqueued)r   r   r   r   r   )Z	schedulerr   r   r   r   counts   s    
r   c                 O  s   | j tjd d S )N)name)Zremove_pluginr   r   )selfargskwargsr   r   r   _remove_all_progress_plugin   s    r    c                   sB   t | } t| I dH }|dtttt|ttdI dH  |S )a"  Open a TCP connection to scheduler, receive progress messages

    The messages coming back are dicts containing counts of key groups::

        {'inc': {'all': 5, 'memory': 2, 'erred': 0, 'released': 1},
         'dec': {'all': 1, 'memory': 0, 'erred': 0, 'released': 0}}

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = await eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(await read(stream))  # doctest: +SKIP
    Nfeed)opsetupfunctionintervalZteardown)r   r   writer   r   r   r    )addressr%   Zcommr   r   r   progress_stream!   s    
	r(         c                   s  dt | d | d jdd  d|   t } fdd|  D } |d< d	d
  D |d< fdd
t|D |d< fdd
t|D |d< fdd
t|D |d< fdd
t|D |d< dd
  D |d< g |d< g |d< g |d< g |d< g |d< g |d< g |d< t|d |d |d |d  |d! |d"d#g| |d |d D ](\}}}}}	}
}}| | | }||  | | }|| |  | | }|| | |  | | }|| | | |	  | | }|| | | |	 |
  | | }d$|| | |f }|d | |d | |d | |d | |d | |d | |d | qX|S )%a  
    >>> msg = {'all': {'inc': 5, 'dec': 1, 'add': 4},
    ...        'memory': {'inc': 2, 'dec': 0, 'add': 1},
    ...        'erred': {'inc': 0, 'dec': 1, 'add': 0},
    ...        'released': {'inc': 1, 'dec': 0, 'add': 1},
    ...        'processing': {'inc': 1, 'dec': 0, 'add': 2},
    ...        'queued': {'inc': 1, 'dec': 0, 'add': 2}}

    >>> progress_quads(msg, nrows=2)  # doctest: +SKIP
    {'all': [5, 4, 1],
    'memory': [2, 1, 0],
    'erred': [0, 0, 1],
    'released': [1, 1, 0],
    'processing': [1, 2, 0],
    'queued': [1, 2, 0],
    'name': ['inc', 'add', 'dec'],
    'show-name': ['inc', 'add', 'dec'],
    'left': [0, 0, 1],
    'right': [0.9, 0.9, 1.9],
    'top': [0, -1, 0],
    'bottom': [-0.8, -1.8, -0.8],
    'color': ['#45BF6F', '#2E6C8E', '#440154'],
    'released-loc': [0.18, 0.225, 1.0],
    'memory-loc': [0.54, 0.45, 1.0],
    'erred-loc': [0.54, 0.45, 1.9],
    'processing-loc': [0.72, 0.9, 1.9],
    'queued-loc': [0.9, 1.35, 1.9],
    'done': ['3 / 5', '2 / 4', '1 / 1']}
    g?r   T)keyreverseNc                   s$   i | ]\} | fd dD qS )c                   s   g | ]}  |d qS )r   )getr   r   vr   r   
<listcomp>c   s     z-progress_quads.<locals>.<dictcomp>.<listcomp>r   )r   k)namesr/   r   r   c   s      z"progress_quads.<locals>.<dictcomp>r   c                 S  s,   g | ]$}t |d kr|n|dd d qS )   N   z...)r   r.   r   r   r   r1   f   s     z"progress_quads.<locals>.<listcomp>z	show-namec                   s   g | ]}|  qS r   r   r   inrowsr   r   r1   g   s     leftc                   s   g | ]}|   qS r   r   r6   )r9   widthr   r   r1   h   s     rightc                   s   g | ]}|   qS r   r   r6   r8   r   r   r1   i   s     topc                   s   g | ]}|   d  qS )g?r   r6   r8   r   r   r1   j   s     Zbottomc                 S  s   g | ]}t |qS r   r	   r.   r   r   r   r1   k   s     colorzreleased-locz
memory-locz	erred-loczprocessing-locz
queued-loczno-worker-locdoner   r   r   r   r   Z	no_workerr   z%d / %d)sortedr-   r   itemsrangezipappend)msgr9   ZncolsndrmepqnwalZrlmlelplZqlZnwlr?   r   )r3   r9   r;   r   progress_quadsA   sX    
 $rS   c                 C  s(   | d dkr t | d }t|S dS d S )NstatusOKr+   Zblack)r   r
   )rE   splitr   r   r   color_of_message   s    rW   ZredZorangeZgray)transfer
disk-write	disk-readdeserializecomputeg?   )rX   r\   r[   rY   rZ   z	transfer-zdisk-write-z
disk-read-zdeserialize- c           	      C  s<  |d }t |}|dg }|D ]}t|d  }t|tk	rF||}| d |d |d  d d  | d d|d |d    | d | | d	 t|d  |  | d
 | | d t|d   | d |d  d|d |d f }| d | ||kr t|d ||< | d ||  q t|S )Nr+   
startstopsactionstartstop   i  durationr   r>   alphaZworkerz%s-%dthreadworker_thready)	r   r-   colorstypestrrD   prefixalphasr   )	listsrE   workersr+   r   r_   Z	startstopr>   rg   r   r   r   task_stream_append   s(    
"
rp   )r)   r*   )
__future__r   loggingZtlzr   r   Z
dask.utilsr   Zdistributed.corer   r   Z distributed.diagnostics.progressr   Zdistributed.utilsr
   Zdistributed.workerr   	getLogger__name__loggerr   r    r(   rS   rW   ri   rm   rl   rp   r   r   r   r   <module>   s@   

 
O	

	