U
    /e                     @  s   d dl mZ d dlZd dlZd dlmZmZ d dlmZm	Z	 d dl
mZ d dlmZ d dlmZmZ d dlmZ d d	lmZmZmZmZ G d
d deeef ZdS )    )annotationsN)Callable
Collection)ExecutorThreadPoolExecutor)wraps)chain)AnyLiteral)Buffer)KTVTTlockedc                      s   e Zd ZU dZded< ded< ded< ded	< eejd
dddddddd fddZdd fddZddddddZ	e
d+ddddd d!Zd"d#dd$d%d&Ze
d,d'dd(d)d*Z  ZS )-AsyncBuffera  Extension of :class:`~zict.Buffer` that allows offloading all reads and writes
    from/to slow to a separate worker thread.

    This requires ``fast`` to be fully thread-safe (e.g. a plain dict).

    ``slow.__setitem__`` and ``slow.__getitem__`` will be called from the offloaded
    thread, while all of its other methods (including, notably for the purpose of
    thread-safety consideration, ``__contains__`` and ``__delitem__``) will be called
    from the main thread.

    See Also
    --------
    Buffer

    Parameters
    ----------
    Same as in Buffer, plus:

    executor: concurrent.futures.Executor, optional
        An Executor instance to use for offloading. It must not pickle/unpickle.
        Defaults to an internal ThreadPoolExecutor.
    nthreads: int, optional
        Number of offloaded threads to run in parallel. Defaults to 1.
        Mutually exclusive with executor parameter.
    zExecutor | Noneexecutorz
int | Nonenthreadszset[asyncio.Future]futureszdict[asyncio.Future, float]evictingN   )r   r   r	   intNone)argsr   r   kwargsreturnc                  s>   t  j|| || _|rd n|| _|d k| _t | _i | _d S )N)super__init__r   r   Z_internal_executorsetr   r   )selfr   r   r   r   	__class__ 5/tmp/pip-unpacked-wheel-z3s6s24u/zict/async_buffer.pyr   /   s    
zAsyncBuffer.__init__r   c                   sJ   t    | jD ]}|  q| jd k	rF| jd k	rF| jjdd d | _d S )NT)wait)r   closer   cancelr   r   shutdown)r   futurer   r!   r"   r%   >   s    


zAsyncBuffer.closezCallable[..., T]zasyncio.Future[T])funcr   r   c                 G  sj   | j d kr$| jstt| jdd| _ t }t }|j| j |j	|f| }| j
| || j
j |S )Nzzict.AsyncBuffer offloader)Zthread_name_prefix)r   r   AssertionErrorr   asyncioZget_running_loopcontextvarsZcopy_contextZrun_in_executorrunr   addadd_done_callbackremove)r   r)   r   Zloopcontextr(   r!   r!   r"   _offloadG   s    

 zAsyncBuffer._offloadraisezCollection[KT]zLiteral[('raise', 'omit')]zasyncio.Future[dict[KT, VT]])keysmissingr   c                   s   dkrfdd D  n2dkr@ D ]}|kr(t |q(ntd zj }W n t k
rr   Y nX t }|| |S dd fdd	}|S )
aj  Fetch one or more key/value pairs. If not all keys are available in fast,
        offload to a worker thread moving keys from slow to fast, as well as possibly
        moving older keys from fast to slow.

        Parameters
        ----------
        keys:
            collection of zero or more keys to get
        missing: raise or omit, optional
            raise (default)
                If any key is missing, raise KeyError.
            omit
                If a key is missing, return a dict with less keys than those requested.

        Notes
        -----
        All keys may be present when you call ``async_get``, but ``__delitem__`` may be
        called on one of them before the actual data is fetched. ``__setitem__`` also
        internally calls ``__delitem__`` in a non-atomic way, so you may get
        ``KeyError`` when updating a value too.
        Zomitc                   s   g | ]}| kr|qS r!   r!   ).0key)r   r!   r"   
<listcomp>t   s      z)AsyncBuffer.async_get.<locals>.<listcomp>r3   z%missing: expected raise or omit; got zdict[KT, VT]r#   c               	     sR   i }  D ]D}j jrt z| | |< W q tk
rJ   dkrF Y qX q| S )Nr3   )fastclosedr+   ZCancelledErrorKeyError)dkr4   r5   r   r!   r"   
_async_get   s    
z)AsyncBuffer.async_get.<locals>._async_get)r;   
ValueErrorr9   Zget_all_or_nothingr+   ZFutureZ
set_resultr2   )r   r4   r5   r7   r<   fr?   r!   r>   r"   	async_getX   s     
zAsyncBuffer.async_getr   r   )r7   valuer   c                 C  s   |  || |   dS )zImmediately set a key in fast. If this causes the total weight to exceed n,
        asynchronously start moving keys from fast to slow in a worker thread.
        N)Zset_noevictasync_evict_until_below_target)r   r7   rC   r!   r!   r"   __setitem__   s    zAsyncBuffer.__setitem__zfloat | None)nr   c                 C  sh   |dkr| j }td|}tt| jjg| j }||kr>dS | | j	|}|| j|< |
| jj dS )zvIf the total weight exceeds n, asynchronously start moving keys from fast to
        slow in a worker thread.
        Ng        )rF   maxminr   r9   Ztotal_weightr   valuesr2   Zevict_until_below_targetr/   __delitem__)r   rF   Zweightr(   r!   r!   r"   rD      s    

z*AsyncBuffer.async_evict_until_below_target)r3   )N)__name__
__module____qualname____doc____annotations__r   r   r   r%   r2   r   rB   rE   rD   __classcell__r!   r!   r   r"   r      s"   
 	 @r   )
__future__r   r+   r,   collections.abcr   r   concurrent.futuresr   r   	functoolsr   	itertoolsr   typingr	   r
   Zzict.bufferr   Zzict.commonr   r   r   r   r   r!   r!   r!   r"   <module>   s   