U
    l/e                      @   s  d Z ddlZddlZddlmZmZmZmZmZm	Z	m
Z
mZmZmZmZ ddlmZmZ ddlmZmZmZmZ dde
ee  ee ee dd	d
Zdde
ee  eee dddZdddee eeee dddZdeee  eeee dddZdS )zt
Friendlier version of asyncio standard library.

Provisional library.  Must be imported as `aioitertools.asyncio`.
    N)AnyAsyncGeneratorAsyncIterable	AwaitablecastDictIterableListOptionalSetTuple   )itermaybe_await)AnyIterableAsyncIteratorMaybeAwaitableT)timeout)awsr   returnc                C  s   t  }dd | D }d}|r2|dkr2t | }nd}|r|rz|t  }|dkrz|D ]}t|tjrV|  qVqVt ttt	t
t  t	t
t  f tj||tjdI dH \}}|D ]}|I dH V  qq6dS )a  
    Run awaitables in `aws` concurrently, and yield results as they complete.

    Unlike `asyncio.as_completed`, this yields actual results, and does not require
    awaiting each item in the iterable.

    Cancels all remaining awaitables if a timeout is given and the timeout threshold
    is reached.

    Example::

        async for value in as_completed(futures):
            ...  # use value immediately

    c                 S   s   h | ]}t |qS  asyncioensure_future).0ar   r   8/tmp/pip-unpacked-wheel-n_uj7zou/aioitertools/asyncio.py	<setcomp>3   s     zas_completed.<locals>.<setcomp>Nr   )r   return_when)settime
isinstancer   ZFuturecancelTimeoutErrorr   r   r   r   r   waitFIRST_COMPLETED)r   r   donepending	remaining	thresholdZfutitemr   r   r   as_completed   s2    
	
	r,   F)return_exceptions)	iterablesr-   r   c                  sV  t   t  tt dd fddfdd| D }t|}zz|rz  }|rb|V  n|W n t j	k
r~   Y nX z }|V  W qH t j	k
r   t
|D ]}| r|| qt dI dH  Y qHX qHW n t jtfk
r   Y nX W 5 |D ]}| s|  q|D ].}z|I dH  W n t jk
rJ   Y nX q X dS )ao  
    Yield results from one or more async iterables, in the order they are produced.

    Like :func:`as_completed`, but for async iterators or generators instead of futures.
    Creates a separate task to drain each iterable, and a single queue for results.

    If ``return_exceptions`` is ``False``, then any exception will be raised, and
    pending iterables and tasks will be cancelled, and async generators will be closed.
    If ``return_exceptions`` is ``True``, any exceptions will be yielded as results,
    and execution will continue until all iterables have been fully consumed.

    Example::

        async def generator(x):
            for i in range(x):
                yield i

        gen1 = generator(10)
        gen2 = generator(12)

        async for value in as_generated([gen1, gen2]):
            ...  # intermixed values yielded from gen1 and gen2
    N)r   r   c              
      s   z(| 2 z3 d H W } |I d H  q6 W n` tjk
rX   t| trR|  I d H   Y n2 tk
r } z  |I d H  W 5 d }~X Y nX d S N)putr   CancelledErrorr"   r   aclose	Exception)r   r+   e)	exc_queuequeuer   r   tailerz   s    
zas_generated.<locals>.tailerc                    s   g | ]}t  |qS r   r   )r   r   )r7   r   r   
<listcomp>   s     z as_generated.<locals>.<listcomp>gMbP?)r   Queuer   r   r    r'   r#   r1   
get_nowaitZ
QueueEmptylistremovesleepGeneratorExit)r.   r-   Ztasksr(   taskexcvaluer   )r5   r6   r7   r   as_generatedZ   s@    


rB   r-   limit)argsr-   rE   r   c                    s  i }i }dgt | }t }t }d}|t |k r|dksFt ||k r|| |krf|||  | n.t|| }	||	 |||	< |g||| < |d7 }q&|r@zVtj|tjdI dH \}}|D ]2}
| r|
 r|
 |||
 < q|
	 |||
 < qW nD tj
k
r>   |D ]}
|
  qtj|ddiI dH   Y nX |s&|t |kr&qVq&| D ]0}tdt |D ]}||d  ||| < qpq^|S )aR  
    Like asyncio.gather but with a limit on concurrency.

    Note that all results are buffered.

    If gather is cancelled all tasks that were internally created and still pending
    will be cancelled as well.

    Example::

        futures = [some_coro(i) for i in range(10)]

        results = await gather(*futures, limit=2)
    Nr   rC   r   )r   r-   T)lenr    appendr   r   addr%   r&   	exceptionresultr1   r#   gathervaluesrange)r-   rE   rF   Z	input_mapposretr(   r'   next_argr?   xlstir   r   r   rL      sF     

 rL   )itrr-   rE   r   c                    s*   t dd t| 2 I dH ||dI dH S )z
    Wrapper around gather to handle gathering an iterable instead of *args.

    Note that the iterable values don't have to be awaitable.
    c                    s   g | z3 d H W }t |q6 S r/   )r   )r   rT   r   r   r   r8     s     zgather_iter.<locals>.<listcomp>NrD   )rL   aiter)rU   r-   rE   r   r   r   gather_iter   s
    
rW   )FrC   )__doc__r   r!   typingr   r   r   r   r   r   r   r	   r
   r   r   builtinsr   rV   r   typesr   r   r   r   floatr,   boolrB   intrL   rW   r   r   r   r   <module>   sB   4
?
SQ  
