U
    /ei                     @   s  d dl Z d dlZd dlZd dlmZ d dlmZ d dlmZmZ d dl	Z	d dl
Z
d dlmZ d dlmZ d dlmZmZ dd	 Zd
d Zdd Zdd Zdd Zdd Ze	jdeegdd Ze	jdeegdd Zdd Zdd Zdd Zdd  Zd!d" Z dS )#    N)ThreadPoolExecutor)
ThreadPool)sleeptime)	CPU_COUNTget)addincc                  C   sD   ddt dftddfd} t| ddks*tt| ddgd	ks@td S )
N      xzy)r   r   r   wr      )r   r   )r
   r	   r   AssertionErrordsk r   </tmp/pip-unpacked-wheel-dbjnr7gq/dask/tests/test_threaded.pytest_get   s    r   c                  C   s6   ddt ddftddgfd} t| ddgdks2td S )	Nr   r   r   r   )r   r   abr   r   )   r   )r	   sumr   r   r   r   r   r   test_nested_get   s    r   c                  C   s   ddi} t | ddkstd S )Nr   r   )r   r   r   r   r   r   test_get_without_computation   s    r   c                  C   s|   ddl m}  dd }dd }ddi}| ||d	B | ||d	* tjtd
d t|d W 5 Q R X W 5 Q R X W 5 Q R X d S )Nr   )Callbackc                  _   s   d S Nr   argskwargsr   r   r   _f_ok#   s    z#test_broken_callback.<locals>._f_okc                  _   s   t dd S )Nmy_exception
ValueErrorr    r   r   r   	_f_broken&   s    z'test_broken_callback.<locals>._f_brokenr   r   )startfinishr$   )match)Zdask.callbacksr   pytestraisesr&   r   )r   r#   r'   r   r   r   r   test_broken_callback    s    r-   c                 C   s
   t  d S r   r%   r   r   r   r   bad1   s    r/   c                      s&   dt dfd tt fdd d S )Nr   r   r   r   c                      s
   t  dS Nr   r   r   r   r   r   <lambda>7       z-test_exceptions_rise_to_top.<locals>.<lambda>)r/   r+   r,   r&   r   r   r   r   test_exceptions_rise_to_top5   s    r4   pool_typc              
   C   sf   | t T}tjj|d: tdtdfiddks4ttdtdfiddksNtW 5 Q R X W 5 Q R X d S )Npoolr   r   r   )r   daskconfigsetr   r
   r   )r5   r7   r   r   r   test_reuse_pool:   s    
r;   c              	      sn   dd   fddt dD }ttdd t t|D ff|d< | d	}t|d|d
d	ks`tW 5 Q R X d S )Nc                   S   s   t d t S )N{Gz?)r   	threading	get_identr   r   r   r   fD   s    ztest_pool_kwarg.<locals>.fc                    s   i | ]}d |f fqS r.   r   .0ir?   r   r   
<dictcomp>H   s      z#test_pool_kwarg.<locals>.<dictcomp>   c                 S   s   g | ]}d |fqS r.   r   r@   r   r   r   
<listcomp>I   s     z#test_pool_kwarg.<locals>.<listcomp>r   r   r6   )rangelenr:   r   r   )r5   r   r7   r   rC   r   test_pool_kwargB   s
    "
rI   c                     s   g   fdd} t  }tdD ]@}t j| dd}d|_|  |   dgksVt d d = q t }t  |d krt	d	 t |d
 k shtqhd S )Nc                    s*   t d fddfiddd}| d S )Nr   c                      s    S r   r   r   rB   r   r   r2   S   r3   z8test_threaded_within_thread.<locals>.f.<locals>.<lambda>r   Znum_workers)r   append)rB   resultLrJ   r   r?   R   s    z&test_threaded_within_thread.<locals>.f   )r   targetr!   Tr   
   r<      )
r=   active_countrG   Threaddaemonr(   joinr   r   r   )r?   before_tr(   r   rN   r   test_threaded_within_threadO   s    r\   c                  C   sb   t  } dd tdD }tt|f|d< tdD ]}t|ddd q2t  }|| d ks^td S )	Nc                 S   s    i | ]}d |f|fddfqS )r   c                 S   s   | S r   r   rJ   r   r   r   r2   i   r3   z=test_dont_spawn_too_many_threads.<locals>.<dictcomp>.<lambda>r   r@   r   r   r   rD   i   s      z4test_dont_spawn_too_many_threads.<locals>.<dictcomp>rS   r   rP   r   rK      )r=   rU   rG   r   listr   r   rY   r   rZ   afterr   r   r    test_dont_spawn_too_many_threadsf   s    ra   c                  C   sb   t  } dd tdD }tt|f|d< tdD ]}t|d q2t  }|| td  ks^td S )Nc                 S   s    i | ]}d |f|fddfqS )r   c                 S   s   | S r   r   rJ   r   r   r   r2   v   r3   zGtest_dont_spawn_too_many_threads_CPU_COUNT.<locals>.<dictcomp>.<lambda>r   r@   r   r   r   rD   v   s      z>test_dont_spawn_too_many_threads_CPU_COUNT.<locals>.<dictcomp>rS   r   rP   r   )r=   rU   rG   r   r^   r   r   r   r_   r   r   r   *test_dont_spawn_too_many_threads_CPU_COUNTs   s    rb   c                     s   dd } t df| dfdg   fdd}g }tdD ](}tj|d	}d
|_|  || q8|D ]}|  qf dgd kstd S )Nc                 S   s   dS )Nr   r   r.   r   r   r   r?      s    ztest_thread_safety.<locals>.fg?r   r0   c                      s     td d S r1   )rL   r   r   rO   r   r   r   test_f   s    z"test_thread_safety.<locals>.test_frP   )rR   Tr   )	r   rG   r=   rV   rW   r(   rL   rX   r   )r?   rd   threadsrZ   r[   threadr   rc   r   test_thread_safety   s    
rg   c               
      s   t jdkrddlm nt d dfddt t tjtjd ddd	 tjd d
fdd}  fddtdD }tt	|
 f|d< tj| fd}|  tt2}tt t|d|d W 5 Q R X   W 5 Q R X |  d S )Nwin32r   interrupt_main)returnc                      s   t  t j d S r   )signalpthread_killSIGINTr   )main_threadr   r   rj      s    z&test_interrupt.<locals>.interrupt_main)in_clog_event
clog_eventrk   c                 S   s   |    |  d S r   )r:   wait)rp   rq   r   r   r   clog   s    ztest_interrupt.<locals>.clog)rp   rk   c                    s   |       d S r   )rr   )rp   ri   r   r   	interrupt   s    z!test_interrupt.<locals>.interruptc                    s   i | ]}d |f fqS r.   r   r@   )rs   rq   rp   r   r   rD      s      z"test_interrupt.<locals>.<dictcomp>rP   r   rQ   r6   )sysplatform_threadrj   r=   r>   EventrG   rH   r^   keysrV   r(   r   r   r+   r,   KeyboardInterruptr   r:   rX   )rt   r   Zinterrupterr7   r   )rs   rq   rp   rj   ro   r   test_interrupt   s"    

r{   )!rl   ru   r=   concurrent.futuresr   Zmultiprocessing.poolr   r   r   r+   r8   Zdask.systemr   Zdask.threadedr   Zdask.utils_testr	   r
   r   r   r   r-   r/   r4   markZparametrizer;   rI   r\   ra   rb   rg   r{   r   r   r   r   <module>   s2   

