U
    /e                     @  s   d dl mZ d dlZd dlZd dlmZ d dlmZ d dlm	Z	m
Z
 d dlmZ d dlZd dlmZ d dlmZ d d	lmZ eeZG d
d dZdS )    )annotationsN)deque)Any)genlocks)IOLoop)parse_timedelta)CommClosedError)timec                   @  sl   e Zd ZdZdddZdd Zdd Zd	d
 ZeZe	j
dd ZdddddZe	j
dddZdd ZdS )BatchedSendaq  Batch messages in batches on a stream

    This takes an IOStream and an interval (in ms) and ensures that we send no
    more than one message every interval milliseconds.  We send lists of
    messages.

    Batching several messages at once helps performance when sending
    a myriad of tiny messages.

    Examples
    --------
    >>> stream = await connect(address)
    >>> bstream = BatchedSend(interval='10 ms')
    >>> bstream.start(stream)
    >>> bstream.send('Hello,')
    >>> bstream.send('world!')

    On the other side, the recipient will get a message like the following::

        ['Hello,', 'world!']
    Nc                 C  s~   |p
t  | _t|dd| _t | _t | _d| _	g | _
d | _d| _d| _d| _d | _ttjdd| _|| _d| _d S )Nms)defaultFr   z+distributed.comm.recent-messages-log-length)maxlen)r   currentloopr   intervalr   Eventwakerstoppedplease_stopbuffercommmessage_countbatch_count
byte_countnext_deadliner   daskconfiggetrecent_message_logserializersZ_consecutive_failures)selfr   r   r     r"   7/tmp/pip-unpacked-wheel-g426oqom/distributed/batched.py__init__-   s     


zBatchedSend.__init__c                 C  s   || _ | j| j d S N)r   r   Zadd_callback_background_send)r!   r   r"   r"   r#   start@   s    zBatchedSend.startc                 C  s   | j o| j  S r%   )r   closedr!   r"   r"   r#   r(   D   s    zBatchedSend.closedc                 C  s   |   rdS dt| j S d S )Nz<BatchedSend: closed>z<BatchedSend: %d in buffer>)r(   lenr   r)   r"   r"   r#   __repr__G   s    zBatchedSend.__repr__c              	   c  s|  | j sXz| j| jV  | j  W n tjk
r<   Y nX | jsLd | _q | jd k	rdt | jk rdq | jg  }| _|  j	d7  _	t | j
 | _zz`t| jj|| jdd}|V }W 5 Q R X |dk r| j| n| jd |  j|7  _W nZ tk
r"   tjd| jdd Y W 6qfY n* tk
rJ   td	 Y W qfY nX W 5 d }X q | j  d S | j  |   d S )
N   raiser    Zon_errorg    .Azlarge-messagezBatched Comm Closed %rT)exc_infozError in batched write)r   r   waitr   clearr   TimeoutErrorr   r
   r   r   
contextlibclosingr   writer    r   appendr   r	   loggerinfo	Exception	exceptionr   setabort)r!   payloadcoronbytesr"   r"   r#   r&   O   sN      

	
zBatchedSend._background_sendr   None)msgsreturnc                 G  s\   | j dk	r&| j  r&td| j d|  jt|7  _| j| | jdkrX| j	  dS )zkSchedule a message for sending to the other side

        This completes quickly and synchronously
        NzComm z already closed.)
r   r(   r	   r   r*   r   extendr   r   r;   )r!   rA   r"   r"   r#   send   s    
zBatchedSend.sendc              	   c  s   | j dkrdS d| _| j  | jj|dV  | j  szD| jrzg | j | _}t	| j j
|| jdd}|V  W 5 Q R X W n tk
r   Y nX | j  V  dS )zyFlush existing messages and then close comm

        If set, raises `tornado.util.TimeoutError` after a timeout.
        NT)timeoutr-   r.   )r   r   r   r;   r   r0   r(   r   r3   r4   r5   r    r	   close)r!   rE   r=   r>   r"   r"   r#   rF      s*    


  zBatchedSend.closec                 C  s<   | j d krd S d| _g | _| j  | j  s8| j   d S )NT)r   r   r   r   r;   r(   r<   r)   r"   r"   r#   r<      s    


zBatchedSend.abort)NN)N)__name__
__module____qualname____doc__r$   r'   r(   r+   __str__r   	coroutiner&   rD   rF   r<   r"   r"   r"   r#   r      s   

Fr   )
__future__r   r3   loggingcollectionsr   typingr   Ztornador   r   Ztornado.ioloopr   r   Z
dask.utilsr   Zdistributed.corer	   Zdistributed.metricsr
   	getLoggerrG   r7   r   r"   r"   r"   r#   <module>   s   
