U
    /e                     @  sr   d dl mZ d dlZd dlmZmZ d dlmZ d dlm	Z	 e
eZG dd deZdd	 Zd
d Zdd ZdS )    )annotationsN)coerce_to_addressconnect)SchedulerPlugin)dumps_functionc                   @  s"   e Zd ZdZdddZdd ZdS )EventStreamz Maintain a copy of worker eventsNc                 C  s   d| _ g | _|r||  d S )Nr   )namebufferZ
add_plugin)self	scheduler r   G/tmp/pip-unpacked-wheel-g426oqom/distributed/diagnostics/eventstream.py__init__   s    zEventStream.__init__c                 O  s0   |dkr,||d< |dks |dkr,| j | d S )N
processingkeyZmemoryZerred)r	   append)r
   r   startfinishargskwargsr   r   r   
transition   s    zEventStream.transition)N)__name__
__module____qualname____doc__r   r   r   r   r   r   r      s   
r   c                 C  s   g |j  |_ }|S )N)r	   )r   esr	   r   r   r   swap_buffer   s    r   c                 C  s   | j |jd d S )N)r   )Zremove_pluginr   )r   r   r   r   r   teardown!   s    r   c                   sB   t | } t| I dH }|dtttt|ttdI dH  |S )a  Open a TCP connection to scheduler, receive batched task messages

    The messages coming back are lists of dicts.  Each dict is of the following
    form::

        {'key': 'mykey', 'worker': 'host:port', 'status': status,
         'compute_start': time(), 'compute_stop': time(),
         'transfer_start': time(), 'transfer_stop': time(),
         'disk_load_start': time(), 'disk_load_stop': time(),
         'other': 'junk'}

    Where ``status`` is either 'OK', or 'error'

    Parameters
    ----------
    address: address of scheduler
    interval: time between batches, in seconds

    Examples
    --------
    >>> stream = await eventstream('127.0.0.1:8786', 0.100)  # doctest: +SKIP
    >>> print(await read(stream))  # doctest: +SKIP
    [{'key': 'x', 'status': 'OK', 'worker': '192.168.0.1:54684', ...},
     {'key': 'y', 'status': 'error', 'worker': '192.168.0.1:54684', ...}]
    Nfeed)opsetupfunctionintervalr   )r   r   writer   r   r   r   )addressr"   Zcommr   r   r   eventstream%   s    
	r%   )
__future__r   loggingZdistributed.corer   r   Zdistributed.diagnostics.pluginr   Zdistributed.workerr   	getLoggerr   loggerr   r   r   r%   r   r   r   r   <module>   s   
