U
    /e                   	   @   s8  d dl Z d dlZd dlZd dlmZ d dlmZ d dlZd dlZd dlm	Z	m
Z
 d dlmZmZmZmZmZ d dlmZ d dlmZ dd	 Zd
d Zdd Zdd Zejjejdk dddd Zdd Zdd Zdd Zdd Z dd Z!dd  Z"G d!d" d"Z#d#d$ Z$ej%d%e j&egd&d' Z'd(d) Z(d*d+ Z)d,d- Z*d.d/ Z+ej%d0d1d0gd2d3 Z,G d4d5 d5Z-d6d7 Z.ej%d8d9dd:gd; fd<dd gd; fd<e.d:gd; fgd=d> Z/d?d@ Z0ejjej1dAkdBddCdD Z2ejjej1dAkdBddEdF Z3ejjej1dAkdGddHdI Z4ejjej1dAkdGddJdK Z5dS )L    N)ProcessPoolExecutor)add)computedelayed)_dumps_loadsgetget_contextremote_exception)	CPU_COUNT)incc                 C   s   t d}|| gS )Nnumpy)pytestimportorskiparray)anp r   C/tmp/pip-unpacked-wheel-dbjnr7gq/dask/tests/test_multiprocessing.pyunrelated_function_global   s    
r   c                 C   s   | | S Nr   r   br   r   r   my_small_function_global   s    r   c                  C   s0   t t} d| kstd| ks td| ks,tdS )z<Unrelated globals should not be included in serialized bytes   my_small_function_globals   unrelated_function_globals   numpyN)r   r   AssertionError)r   r   r   r   test_pickle_globals   s    r   c                     sN   t d  fdd} dd }t|}d|ks2td|ks>td|ksJtd	S )
z;Unrelated locals should not be included in serialized bytesr   c                    s     | gS r   )r   )r   r   r   r   unrelated_function_local%   s    z4test_pickle_locals.<locals>.unrelated_function_localc                 S   s   | | S r   r   r   r   r   r   my_small_function_local(   s    z3test_pickle_locals.<locals>.my_small_function_localr   s   my_small_function_locals   unrelated_function_localN)r   r   r   r   )r   r   r   r   r   r   test_pickle_locals!   s    
r       zrequires pickle protocol 5)reasonc                  C   s   t d} t jddd | d}g }t||jd}t|dksDtt|d tj	sXtt
|d t
|ksptt||d	}| ||kstd
S )z$Test that out-of-band pickling worksr   Zcloudpicklez1.3.0)Z
minversionr!   )buffer_callback   r   )buffersN)r   r   Zaranger   appendlenr   
isinstancepicklePickleBuffer
memoryviewr   all)r   r   lr   Za2r   r   r   test_out_of_band_pickling1   s    

r.   c                   C   s   t dd S )N12345
ValueErrorr   r   r   r   badC   s    r2   c               	   C   s@   dt fi} tt}t| d W 5 Q R X dt|jks<td S )Nxr/   )r2   r   raisesr1   r   strvaluer   )dsker   r   r   test_errors_propagateG   s    
r9   c                  C   sr   t d} t| d}t| d}t|t|ks0tt|t s>tdt|ksNtdt|ks^tdt|ksntd S )NZhelloztraceback-body	Traceback)	TypeErrorr
   typer   r(   r5   )r8   r   r   r   r   r   test_remote_exceptionO   s    

r=   c                  C   s(   ddd dfd} t | ddks$td S )N   c                 S   s   | d S Nr$   r   r3   r   r   r   <lambda>\       z.test_lambda_with_cloudpickle.<locals>.<lambda>r3   r3   yrD      )r   r   )r7   r   r   r   test_lambda_with_cloudpickle[   s    rF   c                   C   s   dd S )Nc                 S   s   | d S r?   r   r@   r   r   r   rA   a   rB   zlambda_result.<locals>.<lambda>r   r   r   r   r   lambda_result`   s    rG   c                  C   s(   dt fi} t| d}|ddks$td S )Nr3   r>   rE   )rG   r   r   )r7   fr   r   r   $test_lambda_results_with_cloudpickled   s    

rI   c                   @   s   e Zd Zdd Zdd ZdS )NotUnpickleablec                 C   s   dS )Nr   r   )selfr   r   r   __getstate__k   s    zNotUnpickleable.__getstate__c                 C   s   t dd S )NzCan't unpickle mer0   )rK   stater   r   r   __setstate__n   s    zNotUnpickleable.__setstate__N)__name__
__module____qualname__rL   rN   r   r   r   r   rJ   j   s   rJ   c               	   C   sd   t  } dt| fi}tt t|d W 5 Q R X tdf| d}tt t|d W 5 Q R X d S )Nr3   r   )r3   r   )rJ   boolr   r4   r1   r   )r   r7   r   r   r   %test_unpicklable_args_generate_errorsr   s    rS   pool_typc              
   C   sf   | t T}tjj|d: tdtdfiddks4ttdtdfiddksNtW 5 Q R X W 5 Q R X d S )N)poolr3   r$   r>   )r   daskconfigsetr   r   r   )rT   rU   r   r   r   test_reuse_pool   s    
rY   c                	   C   sB   t jjtjtjd$ tdtddfdddks4tW 5 Q R X d S )N)Z
func_dumpsZ
func_loadsr$   r3   r>   rC   rD   rE   )	rV   rW   rX   r)   dumpsloadsr   r   r   r   r   r   r   test_dumps_loads   s    r\   c                  C   s0   dt dftddfd} t| ddgdks,td S )Nr$   r3   
   rD   r3   rD   zr_   )r>      )r   r   r   r   )dr   r   r   &test_fuse_doesnt_clobber_intermediates   s    rb   c               	      sf   ddl m}  dtdftddfd}g  |  fdd	d
 t|ddd W 5 Q R X t dksbtd S )Nr   )Callbackr$   r3   r]   rD   r^   c                    s
     | S r   )r&   )keyargskeysr   r   rA      rB   z+test_optimize_graph_false.<locals>.<lambda>)Zpretaskr_   F)Zoptimize_graphr>   )Zdask.callbacksrc   r   r   r   r'   r   )rc   ra   r   rf   r   test_optimize_graph_false   s    rh   c                  C   sT   G dd d} t dd | d}t|j| \}t|| sBt|jdksPtdS )a  Previously `dask.multiprocessing.get` would accidentally forward
    `HighLevelGraph` graphs through the dask optimization/scheduling routines,
    resulting in odd errors. One way to trigger this was to have a
    non-indexable object in a task. This is just a smoketest to ensure that
    things work properly even if `HighLevelGraph` objects get passed to
    `dask.multiprocessing.get`. See https://github.com/dask/dask/issues/7190.
    c                   @   s   e Zd Zdd Zdd ZdS )z0test_works_with_highlevel_graph.<locals>.NoIndexc                 S   s
   || _ d S r   r@   )rK   r3   r   r   r   __init__   s    z9test_works_with_highlevel_graph.<locals>.NoIndex.__init__c                 S   s   t dd S )NzOh no!)	Exception)rK   rd   r   r   r   __getitem__   s    z<test_works_with_highlevel_graph.<locals>.NoIndex.__getitem__N)rO   rP   rQ   ri   rk   r   r   r   r   NoIndex   s   rl   c                 S   s   | S r   r   r@   r   r   r   rA      rB   z1test_works_with_highlevel_graph.<locals>.<lambda>r$   N)r   r   rV   Z__dask_keys__r(   r   r3   )rl   r3   resr   r   r   test_works_with_highlevel_graph   s
    	rn   randomr   c              	      s   dkrt d}|jndd ltddfdd d}tjjdd	" t fd
dt|D \}W 5 Q R X t	t||kst
d S )Nr   r   FZpurec                      s   t  fddtdD S )Nc                 3   s   | ]}  d dV  qdS )r   i'  N)randint).0iro   r   r   	<genexpr>   s     z/test_random_seeds.<locals>.f.<locals>.<genexpr>r!   )tupleranger   rt   r   r   rH      s    ztest_random_seeds.<locals>.fr]   	processes)	schedulerc                    s   g | ]
}  qS r   r   rr   _rH   r   r   
<listcomp>   s     z%test_random_seeds.<locals>.<listcomp>)r   r   ro   r   rV   rW   rX   r   rw   r'   r   )ro   r   Nresultsr   )rH   ro   r   test_random_seeds   s    
&r   c                   @   s   e Zd ZdZdS )global_r   N)rO   rP   rQ   r6   r   r   r   r   r      s   r   c                   C   s
   dt _d S r?   r   r6   r   r   r   r   	proc_init   s    r   z(scheduler, initializer, expected_results	threadingr$   r]   rx   c              	      s   t dddd  dt_tj| |d" t fddtd	D \}W 5 Q R X ||ks^tt fd
dtd	D | |d\}||kstd S )NFrp   c                   S   s   t jS r   r   r   r   r   r   rH      s    z#test_process_initializer.<locals>.fr$   )ry   zmultiprocessing.initializerc                    s   g | ]
}  qS r   r   rz   r|   r   r   r}      s     z,test_process_initializer.<locals>.<listcomp>r]   c                    s   g | ]
}  qS r   r   rz   r|   r   r   r}      s     )ry   initializer)	r   r   r6   rV   rW   rX   r   rw   r   )ry   r   Zexpected_resultsr   Zresults2r   r|   r   test_process_initializer   s    	
&r   c                  C   s   ddl } d| jkS )zhWe check for spawn by ensuring subprocess doesn't have modules only
    parent process should have:
    r   NFAKE_MODULE_FOR_TESTsysmodulesr   r   r   r   check_for_pytest   s    r   win32z*Windows doesn't support different contextsc               	   C   sb   dd } ddl }d|jd< z8tjddi td	| fid	}W 5 Q R X |sPtW 5 |jd= X dS )
zThe 'multiprocessing.context' config is used to create the pool.

    We assume default is 'spawn', and therefore test for 'fork'.
    c                  S   s   dd l } d| jkS )Nr   r   r   r   r   r   r   r      s    z@test_custom_context_used_python3_posix.<locals>.check_for_pytestr   Nr$   r   multiprocessing.contextforkr3   )r   r   rV   rW   rX   r   r   )r   r   resultr   r   r   &test_custom_context_used_python3_posix   s    
r   c                	   C   sx   t  t dksttjddi t  t dks:tW 5 Q R X tjddi t  t dksjtW 5 Q R X dS )zpget_context() respects configuration.

    If default context is changed this test will need to change too.
    spawnr   
forkserverr   N)r	   multiprocessingr   rV   rW   rX   r   r   r   r   $test_get_context_using_python3_posix  s
    r   z!POSIX supports different contextsc                
   C   sj   t dtdfiddksttt< tjddi  t dtdfiddksRtW 5 Q R X W 5 Q R X dS )zOn Windows, setting 'multiprocessing.context' doesn't explode.

    Presumption is it's not used since it's unsupported, but mostly we care about
    not breaking anything.
    r3   r$   r>   r   r   N)	r   r   r   r   warnsUserWarningrV   rW   rX   r   r   r   r   %test_custom_context_ignored_elsewhere  s    r   c                
   C   sR   t  tksttt0 tjddi t  tks:tW 5 Q R X W 5 Q R X dS )z?On Python 2/Windows, get_context() always returns same context.r   r   N)	r	   r   r   r   r   r   rV   rW   rX   r   r   r   r   test_get_context_always_default*  s    r   )6r   r)   r   concurrent.futuresr   operatorr   r   rV   r   r   Zdask.multiprocessingr   r   r   r	   r
   Zdask.systemr   Zdask.utils_testr   r   r   r   r    markZskipifHIGHEST_PROTOCOLr.   r2   r9   r=   rF   rG   rI   rJ   rS   ZparametrizeZPoolrY   r\   rb   rh   rn   r   r   r   r   r   platformr   r   r   r   r   r   r   r   <module>   sr   




	 
 

