U
    /e%	                     @  s`   d dl mZ d dlmZmZ d dlmZ d dlmZ d dl	m
Z
 d dlmZ G dd deZd	S )
    )annotations)	AwaitableCallable)parse_bytes)ShardsBuffer)ResourceLimiter)
log_errorsc                      sN   e Zd ZdZedZedZddddd	 fd
dZddddddZ  Z	S )CommShardsBuffera  Accept, buffer, and send many small messages to many workers

    This takes in lots of small messages destined for remote workers, buffers
    those messages in memory, and then sends out batches of them when possible
    to different workers.  This tries to send larger messages when possible,
    while also respecting a memory bound

    **State**

    -   shards: dict[str, list[bytes]]

        This is our in-memory buffer of data waiting to be sent to other workers.

    -   sizes: dict[str, int]

        The size of each list of shards.  We find the largest and send data from that buffer

    State
    -----
    memory_limit: str
        A maximum amount of memory to use across the process, like "1 GiB"
        This includes both data in shards and also in network communications
    max_connections: int
        The maximum number of connections to have out at once
    max_message_size: str
        The maximum size of a single message that we want to send

    Parameters
    ----------
    send: callable
        How to send a list of shards to a worker
        Expects an address of the target worker (string)
        and a payload of shards (list of bytes) to send to that worker
    z2 MiBz100 MiBN
   z-Callable[[str, list[bytes]], Awaitable[None]]zResourceLimiter | Noneint)sendmemory_limiterconcurrency_limitc                   s   t  j||tjd || _d S )N)r   r   max_message_size)super__init__r	   r   r   )selfr   r   r   	__class__ >/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_comms.pyr   3   s    zCommShardsBuffer.__init__strzlist[bytes]None)addressshardsreturnc              
     s>   t  . | d | ||I dH  W 5 Q R X W 5 Q R X dS )z,Send one message off to a neighboring workerr   N)r   timer   )r   r   r   r   r   r   _process@   s    zCommShardsBuffer._process)Nr
   )
__name__
__module____qualname____doc__r   r   Zmemory_limitr   r   __classcell__r   r   r   r   r	      s   #  r	   N)
__future__r   typingr   r   Z
dask.utilsr   Zdistributed.shuffle._diskr   Zdistributed.shuffle._limiterr   Zdistributed.utilsr   r	   r   r   r   r   <module>   s   