U
    /e"                     @  s   d dl mZ d dlZd dlZd dlZd dlmZ d dlmZ d dl	m
Z
mZmZmZ d dlmZ d dlmZ edZed	ed
ZG dd dee ZdS )    )annotationsN)defaultdict)Iterator)AnyGenericSizedTypeVar)time)ResourceLimiterzdistributed.shuffle	ShardType)boundc                   @  s~  e Zd ZU dZded< ded< ded< ded	< d
ed< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< ded< dLdddddd d!Zd"d#d$d%Zd&d'ddd(d)d*Zd&d'dd+d,d-Zd&d.d/d0d1Z	e
dd#d2d3Zdd#d4d5Zd6dd7d8d9Zdd#d:d;Zdd#d<d=Zdd#d>d?Zd@d#dAdBZdCdCdCddDdEdFZejd&dGdHdIdJZdKS )MShardsBuffera  A buffer for P2P shuffle

    The objects to buffer are typically bytes belonging to certain shards.
    Typically the buffer is implemented on sending and receiving end.

    The buffer allows for concurrent writing and buffers shards to reduce overhead of writing.

    The shards are typically provided in a format like::

        {
            "bucket-0": [b"shard1", b"shard2"],
            "bucket-1": [b"shard1", b"shard2"],
        }

    Buckets typically correspond to output partitions.

    If exceptions occur during writing, the buffer is automatically closed. Subsequent attempts to write will raise the same exception.
    Flushing will not raise an exception. To ensure that the buffer finished successfully, please call `ShardsBuffer.raise_on_exception`
    z!defaultdict[str, list[ShardType]]shardszdefaultdict[str, int]sizesintconcurrency_limitzResourceLimiter | Nonememory_limiterzdict[str, float]diagnosticsmax_message_sizebytes_totalbytes_memorybytes_written
bytes_readbool_accepts_input_inputs_donezNone | Exception
_exceptionzlist[asyncio.Task]_taskszasyncio.Condition_shards_availablezasyncio.Lock_flush_lock   None)r   r   r   returnc                   s   d _ tt _tt _d  _| _d _| _	tt
 _ fddt|D  _t  _t  _| _d _d _d _d _d S )NTFc                   s   g | ]}t   qS  )asyncioZcreate_task_background_task).0_selfr$   ?/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_buffer.py
<listcomp>H   s   z)ShardsBuffer.__init__.<locals>.<listcomp>r   )r   r   listr   r   r   r   r   r   r   floatr   ranger   r%   	Conditionr   Lockr   r   r   r   r   r   )r*   r   r   r   r$   r)   r+   __init__:   s$    





zShardsBuffer.__init__zdict[str, Any]r#   c                 C  s2   | j | jt| j| j| j| j| jr*| jjnddS )Nr   )ZmemorytotalZbucketsZwrittenreadr   Zmemory_limit)	r   r   lenr   r   r   r   r   Z	_maxvaluer)   r$   r$   r+   	heartbeatU   s    zShardsBuffer.heartbeatstrzlist[ShardType])idr   sizer#   c              
     s   zt }z$| ||I d H  |  j|7  _W n. tk
rZ } z|| _d| _W 5 d }~X Y nX t }d| j	d  d|  | j	d< d| j	d  d||   | j	d< W 5 | j r| j |I d H  |  j|8  _X d S )NTg\(\?Zavg_sizeg{Gz?Zavg_duration)
r   Zdecreaser   r	   _processr   	Exceptionr   r   r   )r*   r9   r   r:   startestopr$   r$   r+   process`   s&    
zShardsBuffer.process)r9   r   r#   c                   s
   t  d S NNotImplementedError)r*   r9   r   r$   r$   r+   r;   w   s    zShardsBuffer._processr   )r9   r#   c                 C  s
   t  d S rA   rB   )r*   r9   r$   r$   r+   r5   z   s    zShardsBuffer.readc                 C  s   | j  S rA   )r   r)   r$   r$   r+   empty}   s    zShardsBuffer.emptyc              
     sj  dd fdd} j 4 I d H "  j |I d H   jrV jsVW 5 Q I d H R  qft j jjd} jdkrd}g }| jk r6zbz> j| 	 }|
| t|}||7 } j|  |8  < W n tk
r   Y W q6Y nX W 5  j| s j|=  j| rt j|= X q|n j	|} j	|} j   W 5 Q I d H R X  |||I d H  qd S )Nr   r3   c                     s   t  jp jS rA   )r   r   r   r$   r)   r$   r+   	_continue   s    z0ShardsBuffer._background_task.<locals>._continue)keyr   )r   wait_forr   r   maxr   __getitem__r   AssertionErrorpopappendr6   
IndexError
notify_allr@   )r*   rE   Zpart_idr:   r   Zshardsr$   r)   r+   r&      s4    
zShardsBuffer._background_taskzdict[str, list[ShardType]])datar#   c              
     s*  | j r| j | jr| jr(td|  d|s0dS d}d}i }| D ]\}}ttt|}|||< qDt| }|  j	|7  _	|  j
|7  _
| jr| j| | j4 I dH H | D ].\}}| j| | | j|  || 7  < q| j  W 5 Q I dH R X | jr| j I dH  ~~|s&tdS )a  
        Writes many objects into the local buffers, blocks until ready for more

        Parameters
        ----------
        data: dict
            A dictionary mapping destinations to lists of objects that should
            be written to that destination
        zTrying to put data in closed .Nr   )r   r   r   RuntimeErroritemssummapr6   valuesr   r   r   Zincreaser   r   extendr   notifyZwait_for_availablerJ   )r*   rP   r   r:   r   Zid_Ztotal_batch_sizer$   r$   r+   write   s4    
zShardsBuffer.writec                 C  s   | j r| j dS )z:Raises an exception if something went wrong during writingN)r   r)   r$   r$   r+   raise_on_exception   s    zShardsBuffer.raise_on_exceptionc                   s    j 4 I dH  d _ j4 I dH :  j   j fddI dH  d _ j  W 5 Q I dH R X tj j I dH   j	s j
rtt  j
fW 5 Q I dH R X dS )zpWait until all writes are finished.

        This closes the buffer such that no new writes are allowed
        NFc                     s    j  p jp jS rA   )r   r   r   r$   r)   r$   r+   <lambda>       z$ShardsBuffer.flush.<locals>.<lambda>T)r   r   r   rN   rG   r   r%   gatherr   r   r   rJ   typer)   r$   r)   r+   flush   s    


zShardsBuffer.flushc              
     s   |   I dH  | js,| jr,tt| | jf| jD ]}|  q2d| _d| _| j	
  d| _| j4 I dH  | j  W 5 Q I dH R X tj| j I dH  dS )zUFlush and close the buffer.

        This cleans up all allocated resources.
        NFTr   )r_   r   r   rJ   r^   r   cancelr   r   r   clearr   rN   r%   r]   )r*   tr$   r$   r+   close   s    


zShardsBuffer.closez'ShardsBuffer'c                   s   | S rA   r$   r)   r$   r$   r+   
__aenter__   s    zShardsBuffer.__aenter__r   )exctyp	tracebackr#   c                   s   |   I d H  d S rA   )rc   )r*   re   rf   rg   r$   r$   r+   	__aexit__   s    zShardsBuffer.__aexit__zIterator[None])namer#   c                 c  s,   t  }d V  t  }| j|  || 7  < d S rA   )r	   r   )r*   ri   r=   r?   r$   r$   r+   r	      s    zShardsBuffer.timeN)r    r!   )__name__
__module____qualname____doc____annotations__r2   r7   r@   r;   r5   propertyrD   r&   rY   rZ   r_   rc   rd   rh   
contextlibcontextmanagerr	   r$   r$   r$   r+   r      sF   
  !*r   )
__future__r   r%   rp   loggingcollectionsr   collections.abcr   typingr   r   r   r   Zdistributed.metricsr	   Zdistributed.shuffle._limiterr
   	getLoggerloggerr   r   r$   r$   r$   r+   <module>   s   
