U
    /e                     @  sx   d dl mZ d dlZd dlZd dlZd dlZd dlmZmZ d dl	m
Z
mZ d dlmZ d dlmZ G dd de
ZdS )	    )annotationsN)BinaryIOCallable)ShardsBuffer	ShardType)ResourceLimiter)
log_errorsc                      sf   e Zd ZdZdZdddddd fd	d
ZddddddZdddddZdd fddZ  Z	S )DiskShardsBuffera  Accept, buffer, and write many small objects to many files

    This takes in lots of small objects, writes them to a local directory, and
    then reads them back when all writes are complete.  It buffers these
    objects in memory so that it can optimize disk access for larger writes.

    **State**

    -   shards: dict[str, list[ShardType]]

        This is our in-memory buffer of data waiting to be written to files.

    -   sizes: dict[str, int]

        The size of each list of shards.  We find the largest and write data from that buffer

    State
    -----
    memory_limit: str
        A maximum amount of memory to use, like "1 GiB"

    Parameters
    ----------
    directory: pathlib.Path
        Where to write and read data.  Ideally points to fast disk.
    dump: callable
        Writes an object to a file, like pickle.dump
    load: callable
        Reads an object from that file, like pickle.load
    sizeof: callable
        Measures the size of an object in memory
       Nstrz+Callable[[list[ShardType], BinaryIO], None]z%Callable[[BinaryIO], list[ShardType]]zResourceLimiter | None)	directorydumploadmemory_limiterc                   sF   t  j|dd t|| _tj| js6t| j || _	|| _
d S )N   )r   concurrency_limit)super__init__pathlibPathr   ospathexistsmkdirr   r   )selfr   r   r   r   	__class__ =/tmp/pip-unpacked-wheel-g426oqom/distributed/shuffle/_disk.pyr   2   s    zDiskShardsBuffer.__init__zlist[ShardType]None)idshardsreturnc                   s\   t  L | d6 t| jt| ddd}| || W 5 Q R X W 5 Q R X W 5 Q R X dS )a'  Write one buffer to file

        This function was built to offload the disk IO, but since then we've
        decided to keep this within the event loop (disk bandwidth should be
        prioritized, and writes are typically small enough to not be a big
        deal).

        Most of the logic here is about possibly going back to a separate
        thread, or about diagnostics.  If things don't change much in the
        future then we should consider simplifying this considerably and
        dropping the write into communicate above.
        writeab mode	bufferingN)r   timeopenr   r   r   )r   r    r!   fr   r   r   _processD   s      zDiskShardsBuffer._processz	int | str)r    r"   c              
   C  s   |    | jstdg }zP| d< t| jt| ddd}| |}| }W 5 Q R X W 5 Q R X W n t	k
r   t
|Y nX |r|  j|7  _|S t
|dS )z%Read a complete file back into memoryz$Tried to read from file before done.readrbr%   r&   N)Zraise_on_exceptionZ_inputs_doneRuntimeErrorr)   r*   r   r   r   tellFileNotFoundErrorKeyError
bytes_read)r   r    partsr+   sizer   r   r   r-   Z   s(      
 zDiskShardsBuffer.read)r"   c              	     s6   t   I d H  tt t| j W 5 Q R X d S )N)r   close
contextlibsuppressr1   shutilrmtreer   )r   r   r   r   r6   r   s    zDiskShardsBuffer.close)N)
__name__
__module____qualname____doc__r   r   r,   r-   r6   __classcell__r   r   r   r   r	      s   ! r	   )
__future__r   r7   r   r   r9   typingr   r   Zdistributed.shuffle._bufferr   r   Zdistributed.shuffle._limiterr   Zdistributed.utilsr   r	   r   r   r   r   <module>   s   