U
    /en                     @  s   d dl mZ d dlmZ d dlZd dlmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZmZ e	jdd	 Ze	jd
d ZG dd dejZdS )    )annotationsN)merge)gen)parse_timedelta)time)TimeoutErrorsyncc              
   c  s   | j ddV }| j}|dkr(|| n`|dkrB|  |  nFz|\}}}||W n, tk
r } z|| W 5 d}~X Y nX dS )z[
    Coroutine that waits on Dask future, then transmits its outcome to
    cf_future.
    F)Zraiseitfinished	cancelledN)Z_resultstatusZ
set_resultcancelZset_running_or_notify_cancelwith_tracebackBaseExceptionZset_exception)future	cf_futureresultr   typexctb r   :/tmp/pip-unpacked-wheel-g426oqom/distributed/cfexecutor.py_cascade_future   s    

r   c              	   c  s.   | D ]$}z
|V  W q t k
r&   Y qX qd S N)	Exception)futuresZfutr   r   r   _wait_on_futures%   s
    
r   c                   @  sL   e Zd ZdZedddddgZdd Zd	d
 Zdd Zdd Z	dddZ
dS )ClientExecutorzY
    A concurrent.futures Executor that executes tasks on a dask.distributed Client.
    Zpureworkers	resourcesZallow_other_workersretriesc                 K  sH   t |}|| jks(tdt|| j  || _t | _d| _|| _	d S )Nz+unsupported arguments to ClientExecutor: %sF)
set_allowed_kwargs	TypeErrorsorted_clientweakrefWeakSet_futures	_shutdown_kwargs)selfclientkwargsskr   r   r   __init__7   s    

zClientExecutor.__init__c                   s4   t  } fdd}|| | jjt | |S )zK
        Wrap a distributed Future in a concurrent.futures Future.
        c                   s   |   r jdkr   d S )Nr
   )r
   r   r   )r   r   r   r   cf_callbackJ   s    z0ClientExecutor._wrap_future.<locals>.cf_callback)cfZFutureZadd_done_callbackr$   loopZadd_callbackr   )r*   r   r   r0   r   r/   r   _wrap_futureC   s
    
zClientExecutor._wrap_futurec                 O  s@   | j rtd| jj|f|t| j|}| j| | |S )a/  Submits a callable to be executed with the given arguments.

        Schedules the callable to be executed as ``fn(*args, **kwargs)``
        and returns a Future instance representing the execution of the callable.

        Returns
        -------
        A Future representing the given call.
        z*cannot schedule new futures after shutdown)	r(   RuntimeErrorr$   submitr   r)   r'   addr3   )r*   fnargsr,   r   r   r   r   r5   S   s
    
zClientExecutor.submitc                   s~   | dddk	r&tt   d|kr4|d= |rHtdt| jj|f|jt fdd}| S )a&  Returns an iterator equivalent to ``map(fn, *iterables)``.

        Parameters
        ----------
        fn : A callable that will take as many arguments as there are
            passed iterables.
        iterables : One iterable for each parameter to *fn*.
        timeout : The maximum number of seconds to wait. If None, then there
            is no limit on the wait time.
        chunksize : ignored.

        Returns
        -------
        An iterator equivalent to: ``map(fn, *iterables)`` but the calls may
        be evaluated out-of-order.

        Raises
        ------
        concurrent.futures.TimeoutError:
            If the entire result iterator could not be generated before the given
            timeout.
        Exception:
            If ``fn(*args)`` raises for any values.
        timeoutN	chunksizez!unexpected arguments to map(): %sc               	   3  s   z`D ]V}j| d k	rRz| t  V  W q\ tk
rN   t	jY q\X q| V  qW 5 t } j|  j|  X d S r   )
listr'   updater$   r   r6   r   r   r   r1   )	remainingr   Zend_timefsr*   r9   r   r   result_iterator   s    z+ClientExecutor.map.<locals>.result_iterator)	popr   r   r"   r#   r$   mapr)   iter)r*   r7   	iterablesr,   r@   r   r>   r   rB   c   s    
zClientExecutor.mapTc                 C  s<   | j s8d| _ t| j}|r,t| jjt| n| j| dS )a  Clean-up the resources associated with the Executor.

        It is safe to call this method several times. Otherwise, no other
        methods can be called after this one.

        Parameters
        ----------
        wait : If True then shutdown will not return until all running
            futures have finished executing.  If False then all running
            futures are cancelled immediately.
        TN)r(   r;   r'   r   r$   r2   r   r   )r*   waitr?   r   r   r   shutdown   s    
zClientExecutor.shutdownN)T)__name__
__module____qualname____doc__	frozensetr!   r.   r3   r5   rB   rF   r   r   r   r   r   .   s   <r   )
__future__r   concurrent.futuresr   r1   r%   Ztlzr   Ztornador   Z
dask.utilsr   Zdistributed.metricsr   Zdistributed.utilsr   r   	coroutiner   r   ZExecutorr   r   r   r   r   <module>   s   

