U
    /eD<                     @  s  d dl mZ d dlZd dlmZmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ d dlmZmZmZmZmZ d d	lmZ d dlZd d
lmZmZ d dlmZ ee Z!eej"edkZ#eej"edkZ$G dd deZ%G dd deZ&eG dd dZ'eG dd dZ(G dd dej)Z*ddddddZ+G dd de,Z-G d d! d!e,Z.G d"d# d#e,Z/G d$d% d%ej0Z1dS )&    )annotationsN)IteratorMappingMutableMappingSized)contextmanager)	dataclass)partial)perf_counter)AnyLiteral
NamedTupleProtocolcast)parse)deserialize_bytesserialize_bytelistsafe_sizeofz2.2.0z2.3.0c                   @  sB   e Zd ZU dZded< ded< d d dddZd d ddd	Zd
S )SpilledSizez7Size of a key/value pair when spilled to disk, in bytesintmemorydisk)otherreturnc                 C  s   t | j|j | j|j S Nr   r   r   selfr    r   5/tmp/pip-unpacked-wheel-g426oqom/distributed/spill.py__add__   s    zSpilledSize.__add__c                 C  s   t | j|j | j|j S r   r   r   r   r   r    __sub__"   s    zSpilledSize.__sub__N)__name__
__module____qualname____doc____annotations__r!   r"   r   r   r   r    r      s
   
r   c                   @  s0   e Zd ZdZeddddZddddZd	S )
ManualEvictProtoaS  Duck-type API that a third-party alternative to SpillBuffer must respect (in
    addition to MutableMapping) if it wishes to support spilling when the
    ``distributed.worker.memory.spill`` threshold is surpassed.

    This is public API. At the moment of writing, Dask-CUDA implements this protocol in
    the ProxifyHostFile class.
    zSized | boolr   c                 C  s   dS )zAccess to fast memory. This is normally a MutableMapping, but for the purpose
        of the manual eviction API it is just tested for emptiness to know if there is
        anything to evict.
        Nr   r   r   r   r    fast/   s    zManualEvictProto.fastr   c                 C  s   dS )a  Manually evict a key/value pair from fast to slow memory.
        Return size of the evicted value in fast memory.

        If the eviction failed for whatever reason, return -1. This method must
        guarantee that the key/value pair that caused the issue has been retained in
        fast memory and that the problem has been logged internally.

        This method never raises.
        Nr   r*   r   r   r    evict7   s    
zManualEvictProto.evictN)r#   r$   r%   r&   propertyr+   r,   r   r   r   r    r(   &   s   r(   c                   @  s:   e Zd ZU dZdZded< dZded< ddddd	Zd
S )FastMetricszGCumulative metrics for SpillBuffer.fast since the latest worker restartr   r   read_count_totalread_bytes_totalNone)	key_bytesr   c                 C  s    |  j d7  _ |  j|7  _d S N   )r/   r0   )r   r2   r   r   r    log_readK   s    zFastMetrics.log_readN)r#   r$   r%   r&   r/   r'   r0   r5   r   r   r   r    r.   D   s   
r.   c                   @  s   e Zd ZU dZdZded< dZded< dZded< dZded< dZ	ded	< dZ
ded
< dZded< dZded< dddddddZdddddddZdS )SlowMetricszGCumulative metrics for SpillBuffer.slow since the latest worker restartr   r   r/   r0   floatread_time_totalwrite_count_totalwrite_bytes_totalwrite_time_totalpickle_time_totalunpickle_time_totalr1   )r2   pickle_time
write_timer   c                 C  s<   |  j d7  _ |  j|7  _|  j|7  _|  j|7  _d S r3   )r9   r:   r<   r;   )r   r2   r>   r?   r   r   r    	log_write]   s    zSlowMetrics.log_write)r2   	read_timeunpickle_timer   c                 C  s<   |  j d7  _ |  j|7  _|  j|7  _|  j|7  _d S r3   )r/   r0   r8   r=   )r   r2   rA   rB   r   r   r    r5   h   s    zSlowMetrics.log_readN)r#   r$   r%   r&   r/   r'   r0   r8   r9   r:   r;   r<   r=   r@   r5   r   r   r   r    r6   P   s   
r6   c                      s  e Zd ZU dZded< ded< ded< ded< d0ddddd fddZedddddZdddd fddZddddZ	ddd fddZ
ddd fd d!Zed"dd#d$Zed"dd%d&Zed'dd(d)Zed*dd+d,Zd-dd.d/Z  ZS )1SpillBuffera  MutableMapping that automatically spills out dask key/value pairs to disk when
    the total size of the stored data exceeds the target. If max_spill is provided the
    key/value pairs won't be spilled once this threshold has been reached.

    Parameters
    ----------
    spill_directory: str
        Location on disk to write the spill files to
    target: int
        Managed memory, in bytes, to start spilling at
    max_spill: int | False, optional
        Limit of number of bytes to be spilled on disk. Set to False to disable.
    min_log_interval: float, optional
        Minimum interval, in seconds, between warnings on the log file about full disk
    r7   last_loggedmin_log_intervalzset[str]logged_pickle_errorsr.   fast_metricsF   strr   int | Literal[False])spill_directorytarget	max_spillrE   c                   sR   t ||}trt|t }t ji ||td d| _|| _	t
 | _t | _d S )N)r+   slownweightr   )Slowhas_zict_220zictCacheZWeakValueMappingsuper__init___in_memory_weightrD   rE   setrF   r.   rG   )r   rK   rL   rM   rE   rN   	__class__r   r    rV      s    
zSpillBuffer.__init__z
str | NonezIterator[None]keyr   c              
   c  sz  zd V  W nh t k
r~ } zT|j\}|| jks4t|| jksBtt }|| j | jkrht	d || _t
 W 5 d }~X Y n tk
r   t }|| j | jkrtjddd || _t
 Y n tk
rt } z|j\}}ttjtdkrn|| jkst|| jkst||kr2|d k	s&t| |= |n2|| jkr^tjd|dd | j| t
 W 5 d }~X Y nX d S )Nz;Spill file on disk reached capacity; keeping data in memoryz,Spill to disk failed; keeping data in memoryT)exc_infoz2.0.0zFailed to pickle )MaxSpillExceededargsr+   AssertionErrorrN   r
   rD   rE   loggerwarningHandledErrorOSErrorerrorPickleErrorparse_versionrS   __version__rF   add)r   r\   eZkey_enowZorig_er   r   r    handle_errors   sH     


zSpillBuffer.handle_errorsr   r1   r\   valuer   c              	     sj   z4|  |  t || | j| W 5 Q R X W n0 tk
rd   || jksRt|| jks`tY nX dS )a  If sizeof(value) < target, write key/value pair to self.fast; this may in
        turn cause older keys to be spilled from fast to slow.
        If sizeof(value) >= target, write key/value pair directly to self.slow instead.

        Raises
        ------
        Exception
            sizeof(value) >= target, and value failed to pickle.
            The key/value pair has been forgotten.

        In all other cases:

        - an older value was evicted and failed to pickle,
        - this value or an older one caused the disk to fill and raise OSError,
        - this value or an older one caused the max_spill threshold to be exceeded,

        this method does not raise and guarantees that the key/value that caused the
        issue remained in fast.
        N)	rl   rU   __setitem__rF   discardrc   r+   r`   rN   )r   r\   rn   rY   r   r    ro      s    zSpillBuffer.__setitem__r)   c              
   C  sZ   z>|  d* | j \}}}tt|W  5 Q R  W S Q R X W n tk
rT   Y dS X dS )a  Implementation of :meth:`ManualEvictProto.evict`.

        Manually evict the oldest key/value pair, even if target has not been
        reached. Returns sizeof(value).
        If the eviction failed (value failed to pickle, disk full, or max_spill
        exceeded), return -1; the key/value pair that caused the issue will remain in
        fast. The exception has been logged internally.
        This method never raises.
        N)rl   r+   r,   r   r   rc   )r   _rP   r   r   r    r,      s    
"zSpillBuffer.evictc                   s4   || j kr(tt| j j| }| j| t |S r   )r+   r   r   weightsrG   r5   rU   __getitem__)r   r\   nbytesrY   r   r    rt      s    
zSpillBuffer.__getitem__c                   s   t  | | j| d S r   )rU   __delitem__rF   rp   r   r\   rY   r   r    rv     s    zSpillBuffer.__delitem__zMapping[str, Any]c                 C  s   | j S )zxKey/value pairs stored in RAM. Alias of zict.Buffer.fast.
        For inspection only - do not modify directly!
        )r+   r*   r   r   r    r     s    zSpillBuffer.memoryc                 C  s   | j S )z~Key/value pairs spilled out to disk. Alias of zict.Buffer.slow.
        For inspection only - do not modify directly!
        )rN   r*   r   r   r    r     s    zSpillBuffer.diskrQ   c                 C  s$   t rttj| jjn| j}tt|S r   )rR   r   rS   rT   rN   datarQ   )r   rN   r   r   r    _slow_uncached  s    zSpillBuffer._slow_uncachedr   c                 C  s   | j jS )zNumber of bytes spilled to disk. Tuple of

        - output of sizeof()
        - pickled size

        The two may differ substantially, e.g. if sizeof() is inaccurate or in case of
        compression.
        )ry   total_weightr*   r   r   r    spilled_total  s    
zSpillBuffer.spilled_totalzdict[str, float]c                 C  s   | j }| jj}t| j| jjt| j| jjjd}|j	 D ]\}}||d| < q<|j	 D ]"\}}||d|krv|nd| < q^|S )a  Metrics to be exported to Prometheus or to be parsed directly.

        From these you may generate derived metrics:

        cache hit ratio:
          by keys  = memory_read_count_total / (memory_read_count_total + disk_read_count_total)
          by bytes = memory_read_bytes_total / (memory_read_bytes_total + disk_read_bytes_total)

        mean times per key:
          pickle   = pickle_time_total     / disk_write_count_total
          write    = disk_write_time_total / disk_write_count_total
          unpickle = unpickle_time_total   / disk_read_count_total
          read     = disk_read_time_total  / disk_read_count_total

        mean bytes per key:
          write    = disk_write_bytes_total / disk_write_count_total
          read     = disk_read_bytes_total  / disk_read_count_total

        mean bytes per second:
          write    = disk_write_bytes_total / disk_write_time_total
          read     = disk_read_bytes_total  / disk_read_time_total
        )Zmemory_countZmemory_bytesZ
disk_countZ
disk_bytesZmemory_pickleZdisk_)
rG   ry   metricslenr+   rz   rN   r   __dict__items)r   Zfmsmoutkvr   r   r    get_metrics&  s    zSpillBuffer.get_metrics)FrH   )r#   r$   r%   r&   r'   rV   r   rl   ro   r,   rt   rv   r-   r   r   ry   r{   r   __classcell__r   r   rY   r    rC   p   s.   
  2	rC   rI   r   r   rm   c                 C  s   t |S r   r   )r\   rn   r   r   r    rW   N  s    rW   c                   @  s   e Zd ZdS )r^   Nr#   r$   r%   r   r   r   r    r^   S  s   r^   c                   @  s   e Zd ZdS )rf   Nr   r   r   r   r    rf   W  s   rf   c                   @  s   e Zd ZdS )rc   Nr   r   r   r   r    rc   [  s   rc   c                      s~   e Zd ZU ded< ded< ded< ded< dd
dd fddZd
ddddZd
dddddZd
dd fddZ  ZS )rQ   rJ   
max_weightzdict[str, SpilledSize]weight_by_keyr   rz   r6   r}   FrI   )rK   r   c                   sB   t  ttddtt| || _i | _t	dd| _
t | _d S )Nraise)Zon_errorr   )rU   rV   r	   r   r   rS   ZFiler   r   r   rz   r6   r}   )r   rK   r   rY   r   r    rV   f  s    
zSlow.__init__r   r[   c                 C  s^   t  }| j| }t|trtnts&tt  }| |}t  }| jj	t
||| || d |S )N)r2   rA   rB   )r
   d
isinstancehas_zict_230	bytearraybytesr`   loadr}   r5   r~   )r   r\   t0pickledt1r   t2r   r   r    rt   q  s    

zSlow.__getitem__r1   rm   c           
   
   C  s   t  }z| |}W n, tk
r@ } zt||W 5 d }~X Y nX tdd |D }t  }|| jksht|| jksvt| jdk	r| j	j
| | jkrt||| j|< t  }tt||}	|	| j|< |  j	|	7  _	| jj||| || d d S )Nc                 s  s&   | ]}t |tr|jnt|V  qd S r   )r   
memoryviewru   r~   ).0framer   r   r    	<genexpr>  s   z#Slow.__setitem__.<locals>.<genexpr>F)r2   r>   r?   )r
   dump	Exceptionrf   sumr   r`   r   r   rz   r   r^   r   r   r}   r@   )
r   r\   rn   r   r   rj   Zpickled_sizer   r   rP   r   r   r    ro     s4    

zSlow.__setitem__c                   s&   t  | |  j| j|8  _d S r   )rU   rv   rz   r   poprw   rY   r   r    rv     s    zSlow.__delitem__)F)	r#   r$   r%   r'   rV   rt   ro   rv   r   r   r   rY   r    rQ   `  s   
/rQ   )2
__future__r   loggingcollections.abcr   r   r   r   
contextlibr   Zdataclassesr   	functoolsr	   timer
   typingr   r   r   r   r   Zpackaging.versionr   rg   rS   Zdistributed.protocolr   r   Zdistributed.sizeofr   	getLoggerr#   ra   rh   rR   r   r   r(   r.   r6   ZBufferrC   rW   r   r^   rf   rc   ZFuncrQ   r   r   r   r    <module>   s6   
 _