U
    /e                     @  s   d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z
 ddlmZ eeZe Zdd ZG dd	 d	e
jZdddZdd Ze ZdS )a  
Modified ThreadPoolExecutor to support threads leaving the thread pool

This includes a global `secede` method that a submitted function can call to
have its thread leave the ThreadPoolExecutor's thread pool.  This allows the
thread pool to allocate another thread if necessary and so is useful when a
function realises that it is going to be a long-running job that doesn't want
to take up space.  When the function finishes its thread will terminate
gracefully.

This code copies and modifies two functions from the
`concurrent.futures.thread` module, notably `_worker` and
ThreadPoolExecutor._adjust_thread_count` to allow for checking against a global
`threading.local` state.  These functions are subject to the following license,
which is included as a comment at the end of this file:

    https://docs.python.org/3/license.html

... and are under copyright by the Python Software Foundation

   Copyright 2001-2016 Python Software Foundation; All Rights Reserved
    )annotationsN)_concurrent_futures_thread)timec              	   C  s  dt _| t _zzt jr| jJ | jrb| j \}}| j| | jt	
  |  W 5 Q R  qW 5 Q R X z|jdd}W n tjk
r   Y qY nX |d k	r|  ~qtjs| d ks| jr|d  W W 2d S q~ W n" tk
r   tjddd Y nX W 5 t `t `X d S )NT   timeoutzException in worker)exc_info)thread_stateproceedexecutor_rejoin_lock_rejoin_listpop_threadsaddremove	threadingcurrent_threadsetgetqueueEmptyrunthread	_shutdownputBaseExceptionloggercritical)r   Z
work_queueZrejoin_threadZrejoin_eventZtask r   B/tmp/pip-unpacked-wheel-g426oqom/distributed/threadpoolexecutor.py_worker'   s4    

r!   c                      s6   e Zd Ze Z fddZdd Zd	ddZ  Z	S )
ThreadPoolExecutorc                   s0   t  j|| g | _t | _|dd| _d S )NZthread_name_prefixZDaskThreadPoolExecutor)super__init__r   r   Lockr   r   _thread_name_prefix)selfargskwargs	__class__r   r    r$   J   s    
 zThreadPoolExecutor.__init__c                 C  s\   t | j| jk rXtjt| jdt t	| j
f  | | jfd}d|_| j| |  d S )Nz-%d-%d)targetnamer(   T)lenr   Z_max_workersr   Threadr!   r&   osgetpidnext_counter_work_queuedaemonr   start)r'   tr   r   r    _adjust_thread_countR   s    z'ThreadPoolExecutor._adjust_thread_countTNc              
   C  s   t v | j d| _| jd  W 5 Q R X |d k	r<t | }|rv| jD ].}|d k	rdt|t  d}nd }|j|d qFW 5 Q R X d S )NTr   r   )	threads_lockZ_shutdown_lockr   r4   r   r   r   maxjoin)r'   waitr   deadliner7   Ztimeout2r   r   r    shutdown^   s    

zThreadPoolExecutor.shutdown)TN)
__name__
__module____qualname__	itertoolscountr3   r$   r8   r>   __classcell__r   r   r*   r    r"   F   s   r"   Tc              	   C  s:   dt _t& t jjt  | r,t j  W 5 Q R X dS )zwHave this thread secede from the ThreadPoolExecutor

    See Also
    --------
    rejoin : rejoin the thread pool
    FN)	r	   r
   r9   r   r   r   r   r   r8   )adjustr   r   r    seceden   s
    rF   c               	   C  sX   t  } t  }tj}|j |j| |f W 5 Q R X |dd  |	  dt_
dS )a  Have this thread rejoin the ThreadPoolExecutor

    This will block until a new slot opens up in the executor.  The next thread
    to finish a task will leave the pool to allow this one to join.

    See Also
    --------
    secede : leave the thread pool
    c                   S  s   d S )Nr   r   r   r   r    <lambda>       zrejoin.<locals>.<lambda>TN)r   r   Eventr	   r   r   r   appendZsubmitr<   r
   )r   eventer   r   r    rejoin|   s    
rM   )T)__doc__
__future__r   rB   loggingr0   r   r   Zdistributedr   r   Zdistributed.metricsr   	getLoggerr?   r   localr	   r!   r"   rF   rM   r%   r9   r   r   r   r    <module>   s    
(
