U
    /e                     @  s   U d Z ddlmZ ddlZddlZddlZddlZddlm	Z	 ddl
mZmZmZ ddlmZmZ ddlmZmZ ddlmZ dd	lmZmZ dd
lmZ dd Ze Zdaded< e	eZded< e Z dd Z!ddddddZ"dS )z2
A threaded shared-memory scheduler

See local.py
    )annotationsN)defaultdict)HashableMappingSequence)ExecutorThreadPoolExecutor)Lockcurrent_thread)config)MultiprocessingPoolExecutor	get_async)	CPU_COUNTc                   C  s   t  jS )N)r
   ident r   r   1/tmp/pip-unpacked-wheel-dbjnr7gq/dask/threaded.py_thread_get_id   s    r   zExecutor | Nonedefault_poolz2defaultdict[threading.Thread, dict[int, Executor]]poolsc                 C  s   | t  d fS )N   )sysexc_info)edumpsr   r   r   pack_exception    s    r   r   zSequence[Hashable] | Hashable)dskkeysc              	   K  sN  |pt dd}|pt dd}t }t |dkr|dkrf|tkrftdkr`ttat	tj
 t}q|tkr|t| krt| | }qt|}t	|j
 |t| |< nt|tjjrt|}W 5 Q R X t|j|j| |f|ttd|}tT tt }|tk	r@ttD ].}	|	|krt|	 D ]}
|

  q,qW 5 Q R X |S )a  Threaded cached implementation of dask.get

    Parameters
    ----------

    dsk: dict
        A dask dictionary specifying a workflow
    keys: key or list of keys
        Keys corresponding to desired data
    num_workers: integer of thread count
        The number of threads to use in the ThreadPool that will actually execute tasks
    cache: dict-like (optional)
        Temporary storage of results

    Examples
    --------
    >>> inc = lambda x: x + 1
    >>> add = lambda x, y: x + y
    >>> dsk = {'x': 1, 'y': 2, 'z': (inc, 'x'), 'w': (add, 'z', 'y')}
    >>> get(dsk, 'w')
    4
    >>> get(dsk, ['w', 'y'])
    (4, 2)
    poolNnum_workers)cacheZget_idr   )r   getr
   
pools_lockmain_threadr   r   r   atexitregistershutdownr   
isinstancemultiprocessingr   ZPoolr   r   ZsubmitZ_max_workersr   r   set	threading	enumeratelistpopvalues)r   r   r   r   r   kwargsthreadresultsZactive_threadstpr   r   r   r    $   sJ    !

r    )NNN)#__doc__
__future__r   r#   Zmultiprocessing.poolr'   r   r)   collectionsr   collections.abcr   r   r   concurrent.futuresr   r   r	   r
   Zdaskr   Z
dask.localr   r   Zdask.systemr   r   r"   r   __annotations__dictr   r!   r   r    r   r   r   r   <module>   s,      