U
    /ek6                    @  s  U d dl mZ d dlZd dlZd dlZd dlZd dlZd dlZd dl	Z	d dl
Z
d dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlZd dlmZ d dlmZmZmZ d dlm Z m!Z!m"Z" d dl#m$Z$ d dl%m&Z& d dl'm(Z(m)Z)m*Z*m+Z+m,Z, d dl-Z-d dl.Z.d d	l/m0Z0m1Z1m2Z2 d d
l3m4Z4 d dl5m6Z6 d dl7Z7d dl8m9Z9m:Z:m;Z; d dl8m<Z= d dl>m?Z? d dl@mAZAmBZBmCZC d dlDmEZE d dlFmGZG d dlHmIZImJZJ d dlKmLZL d dlMmNZNmOZOmPZPmQZQmRZRmSZS d dlTmUZU d dlVmWZW d dlXm%Z% d dlYmZZZ d dl[m\Z\ d dl]m^Z^ d dl_m`Z` d dlambZc d dldmeZe d dlfmgZgmhZhmiZimjZjmkZkmlZlmmZmmnZn d dlompZpmqZq d d lrmsZsmtZtmuZumvZvmwZw d dlrmbZx d d!lrmyZy zd dlzZ7W n e{k
r    Y nX zd d"l|m}Z} W n$ e{k
r6   d#d$d%d&Z}Y nX e~eZd'd( ejjj D Zd)Zehd*d+   d,d-iZd.d/ Ze-jd0d1 Ze-jd2d3 Zee7jjZd4d5 Zd6d7 Zd8d9 Zd:d; Zd<d= Zd>d? Zd@dA ZdBdC ZdDdE ZdFdG Zd|dIdJZd}dKdLZd~dMdNZddPdQZddRdSZddTdUZdVdW ZdXdY ZdZd[d\d]d^ZG d_d` d`Ze%fdadbZeeZdcedd< e$ ZG dedf dfZdgdh Zdidj ZddkdlZdmdn ZddodpZdqdr Zemdsdt Ze ddudvZejddwdxZe-jdydz Ze-jd{d| Ze-jd}d~ Ze-jdd Ze-jdd Ze-jdd ZeZe-jdd Ze dddZe-jdd Ze-jdd Ze-jdd Zdd Zdd Ze dddZedfddddddZdeqddfddddddddddZdd[dddZdd[dddZdd[dddZdd Zddedeqd-ddddOddd-dfdddddd#ddddddd#dddddZddddd[dddZdd[dddZddd[dddZe ddd#dddddŜddǄZΐdddʄZϐddd̄Ze1dd΄ Zeу rddЄ Zne-jӠdѡZҐdddԄZdefddքZאdddلZؐdddۄZdd݄ Zdd߄ Zdd Ze ejdfddZe dd Ze dd Ze dd ZejejejedZdd Zdd Zdd Zdd Zdd ZdddZdddZdd Zdd Ze d d Ze dd ZdddddZdd[ddd	Ze dd#ddddddZe dd Ze dd Ze dddZe-jdd ZG dd deWZG dd deGZG dd deOZdd  Zdd!d"Zd-d
d#d$d$d#d#d[d%d&d'Zd$dd(d)d*Z G d+d, d,eEZd-dd#d.d/d0Ze d1d2d1d2d3d4d5d6Zd7d8 Ze-jd9d:d;d< Zdd=d>d?d=d@dAdBZG dCdD dDeqZG dEdF dFeqZG dGdH dHeqZ	e d-dIdJd#ddKdLdMZ
e dNdOdPdQdRZdSdTddUdVdd[dWdXdYZdSdTdZdJddÐd[d\d]d^Ze-jd_d` Ze-jdadbgdcddde Ze- dfdg Ze- dhdi ZdjddkdldmZddjd2dndodpdqZddjd2drdodsdtZdudv Ze ZG dwdx dxZdjdxdydzd{ZdS (      )annotationsN)defaultdict)Callable
CollectionMapping)contextmanagernullcontextsuppress)countsleep)IOAny	GeneratorIteratorLiteral)assocmemoizemerge)AsyncHTTPClient)IOLoop)Event	Schedulersystem)versions)BatchedSend)Client_global_clientsdefault_client)Comm)TCP)MACOSWINDOWS)initialize_logging)CommClosedErrorConnectionPoolStatusclean_exceptionconnectrpc)SpecCluster)WorkerPlugin)time)Nanny)
ServerNode)enable_proctitle_on_children)deserialize)	TaskState)Security)DequeHandler_offload_executorget_ipget_ipv6get_mp_contextiscoroutinefunction
log_errorsreset_logger_locks)WORKER_ANY_RUNNINGWorker)ComputeTaskEventExecuteInvalidTransitionSecedeEventStateMachineEvent)WorkerState)is_debuggingboolreturnc                   C  s   t  d k	S N)sysgettrace rJ   rJ   :/tmp/pip-unpacked-wheel-g426oqom/distributed/utils_test.pyrC   `   s    rC   c                 C  s$   i | ]\}}t |tjr||jqS rJ   )
isinstanceloggingLoggerlevel).0nameloggerrJ   rJ   rK   
<dictcomp>i   s    rS      c                   C  s   d S rG   rJ   rJ   rJ   rJ   rK   <lambda>p       rU   z1distributed.scheduler.active-memory-manager.startFc                    s"   t jD ]} | jddI d H  qd S )NFZexecutor_wait)r<   
_instancesclose)workerrJ   rJ   rK   cleanup_global_workersx   s    
r[   c                 C  s   | S rG   rJ   )loop_in_threadrJ   rJ   rK   loop}   s    r]   c                 #  s   t j  t jjddd}t   fdd}|t|}t j |fD ]^}| kr  \}}z
|V  W 5 |	|j
 X qL||krL|   W 5 Q R  W 5 Q R  d S qLW 5 Q R X W 5 Q R X d S )N   ztest IOLoop)Zthread_name_prefixc                    s0   t  } t } | |f | I d H  d S rG   )r   currentasyncior   Z
set_resultwait)io_loop
stop_eventZloop_startedrJ   rK   run   s    zloop_in_thread.<locals>.run)
concurrentfuturesZFutureZThreadPoolExecutorconfig_for_cluster_testssubmit_run_and_close_tornadoZas_completedresultadd_callbackset)cleanupZtpere   Zranfrb   rc   rJ   rd   rK   r\      s&    
  
r\   c                   C  s$   t jj  t jjtt d S rG   )daskconfigclearupdatecopydeepcopyoriginal_configrJ   rJ   rJ   rK   reset_config   s    rw   c                   s   t   fdd}|S )z
    A decorator to disable debug facilities during timing-sensitive tests.
    Warning: this doesn't affect already created IOLoops.
    c               	     sD   t jd}|d k	rt jd= z | |W S |d k	r>|t jd< X d S )NPYTHONASYNCIODEBUG)osenvironget)argskwargsZold_asyncio_debugfuncrJ   rK   wrapped   s    znodebug.<locals>.wrapped)	functoolswraps)r   r   rJ   r~   rK   nodebug   s    
r   c                 C  s$   t jd| _| jdk	r t jd= dS )za
    A setup_module() that you can install in a test module to disable
    debug facilities.
    rx   N)ry   rz   r{   _old_asyncio_debugmodulerJ   rJ   rK   nodebug_setup_module   s    
r   c                 C  s   | j dk	r| j tjd< dS )ze
    A teardown_module() that you can install in a test module to reenable
    debug facilities.
    Nrx   )r   ry   rz   r   rJ   rJ   rK   nodebug_teardown_module   s    
r   c                 C  s   | d S Nr^   rJ   xrJ   rJ   rK   inc   s    r   c                 C  s   | d S r   rJ   r   rJ   rJ   rK   dec   s    r   c                 C  s   | | S rG   rJ   r   yrJ   rJ   rK   mul   s    r   c                 C  s   | | S rG   rJ   r   rJ   rJ   rK   div   s    r   c                 C  s   t dd S )Nzhello!)RuntimeErrorr   rJ   rJ   rK   throws   s    r   c                 C  s   | d S N   rJ   r   rJ   rJ   rK   double   s    r   {Gz?c                 C  s   t | | d S r   r   r   delayrJ   rJ   rK   slowinc   s    r   c                 C  s   t | | d S r   r   r   rJ   rJ   rK   slowdec   s    r   c                 C  s   t | d|  S r   r   r   rJ   rJ   rK   
slowdouble   s    r   r^   c                 C  s"   ddl m } t| |  | d S )Nr   )randomr^   )r   r   )r   Zscaler   rJ   rJ   rK   	randominc   s    r   c                 C  s   t | | | S rG   r   )r   r   r   rJ   rJ   rK   slowadd   s    r   c                 C  s   t | t| S rG   )r   sum)seqr   rJ   rJ   rK   slowsum  s    r   c                  O  s0   | dd}t| t| dkr(| d S | S d S )Nr   r   r^   r   )r{   r   len)r|   r}   r   rJ   rJ   rK   slowidentity	  s
    r   c              
   C  s$   | | d W  5 Q R  S Q R X d S r   rJ   )r   lockrJ   rJ   rK   lock_inc  s    r   r   None)eventrF   c                 C  s   |    d S rG   )ra   )r   rJ   rJ   rK   block_on_event  s    r   c                   @  s   e Zd ZdZdd ZdS )_UnhashableCallableNc                 C  s   |d S r   rJ   )selfr   rJ   rJ   rK   __call__  s    z_UnhashableCallable.__call__)__name__
__module____qualname____hash__r   rJ   rJ   rJ   rK   r     s   r   c                 C  s   | |  }| |krq
dS )z*
    Burn CPU for *duration* seconds.
    NrJ   )durationZtimerdeadlinerJ   rJ   rK   run_for#  s    

r   zdefaultdict[str, int]_varying_dictc                   @  s   e Zd Zdd Zdd ZdS )_ModuleSlotc                 C  s   || _ || _d S rG   )modnameslotname)r   r   r   rJ   rJ   rK   __init__2  s    z_ModuleSlot.__init__c                 C  s   t tj| j | jS rG   )getattrrH   modulesr   r   r   rJ   rJ   rK   r{   6  s    z_ModuleSlot.getN)r   r   r   r   r{   rJ   rJ   rJ   rK   r   1  s   r   c                   s&   t tdtt fdd}|S )zi
    Return a function that returns a result (or raises an exception)
    from *items* at each call.
    r   c                    sN     } |  }|t kr"tn( | }|d | < t|trF|n|S d S r   )r{   r   
IndexErrorrL   	Exception)dctir   itemskeyZslotrJ   rK   r   F  s    
zvarying.<locals>.func)r   r   next_varying_key_gen)r   r   rJ   r   rK   varying:  s    	
r   c                 C  s   dd }|t tt| fS )zi
    Like *varying*, but return the full specification for a map() call
    on multiple items lists.
    c                 _  s
   | ||S rG   rJ   )r   r|   r}   rJ   rJ   rK   apply\  s    zmap_varying.<locals>.apply)listmapr   )Z
itemslistsr   rJ   rJ   rK   map_varyingV  s    r   c                   s   t |I d H  | d S r   )r`   r   r   rJ   rJ   rK   asyncincb  s    r   c             	     s:   d  fdd}zt| W S j dd X d S )Nc                     s   t   I d H S rG   )r   r_   rJ   r|   async_fnr}   Ztornado_looprJ   rK   inner_fnj  s    z(_run_and_close_tornado.<locals>.inner_fnT)Zall_fds)rY   r`   re   )r   r|   r}   r   rJ   r   rK   rj   g  s
    rj   c              	     s6   t j|   fdd}t| W 5 Q R X d S )Nc               
     s   z t f ddd I d H } W n: tk
rZ } ztD ]}| q:W 5 d }~X Y n*X tD ]}| j qd|  I d H  d S )NT	127.0.0.1)validatehostport)r   r   rangeputaddressfinished)	schedulerexc_r}   nputsr   qrJ   rK   r   x  s      zrun_scheduler.<locals>._)rp   rq   rm   rj   )r   r   rq   r   r}   r   rJ   r   rK   run_scheduleru  s    r   c              
     sl   t j|V ddlm  t  t 2 |  fdd}tt	rTt
| W 5 Q R X W 5 Q R X d S )Nr   )r<   c               
     sz   t  } z fddiI d H }W n0 tk
rV } z| |f W 5 d }~X Y n X | |jf | I d H  d S Nr   T)ry   getpidr   r   r   r   pidrZ   r   r<   r}   r   scheduler_addrrJ   rK   r     s     zrun_worker.<locals>._)rp   rq   rm   distributedr<   r:   r9   r{   rL   strrj   r   scheduler_qrq   r}   r   rJ   r   rK   
run_worker  s    
r   c              	     sF   t j|0 |  fdd}ttr8t| W 5 Q R X d S )Nc               
     sz   t  } ztfddi I d H }W n0 tk
rV } z| |f W 5 d }~X Y n X | |jf | I d H  d S r   )ry   r   r-   r   r   r   r   r   r}   r   r   rJ   rK   r     s     zrun_nanny.<locals>._)rp   rq   rm   r{   rL   r   rj   r   rJ   r   rK   	run_nanny  s
    
r   c                 #  sJ   t jdtdd ttj d V   fdd fdd}| | d S )Nz/check_active_rpc is deprecated - use gen_test()r   )
stacklevelc                     s   t dttj    d S Nz!some RPCs left active by test: %spytestfailrm   r)   activerJ   active_beforerJ   rK   r     s    zcheck_active_rpc.<locals>.failc                     s    t  fdddI d H  d S )Nc                     s   t ttj  dkS Nr   r   rm   r)   r   rJ   r   rJ   rK   rU     rV   z0check_active_rpc.<locals>.wait.<locals>.<lambda>timeout	fail_func)async_wait_forrJ   r   active_rpc_timeoutr   rJ   rK   ra     s
    
zcheck_active_rpc.<locals>.wait)warningswarnDeprecationWarningrm   r)   r   Zrun_sync)r]   r   ra   rJ   r   rK   check_active_rpc  s    
r   c                  s<   t tj d V   fdd}t fdd| |dI d H  d S )Nc                     s   t dttj    d S r   r   rJ   r   rJ   rK   r     s    z _acheck_active_rpc.<locals>.failc                     s   t ttj  dkS r   r   rJ   r   rJ   rK   rU     rV   z$_acheck_active_rpc.<locals>.<lambda>r   )rm   r)   r   r   )r   r   rJ   r   rK   _acheck_active_rpc  s    

r   c              	   c  s$   t  \}}||fV  W 5 Q R X d S rG   )cluster)r]   r   workersrJ   rJ   rK   cluster_fixture  s    r  c                 C  s   | \}}|S rG   rJ   r  r   r   rJ   rJ   rK   s  s    r  c                 C  s   | \}}|d S r   rJ   r  rJ   rJ   rK   a  s    r  c                 C  s   | \}}|d S r   rJ   r  rJ   rJ   rK   b  s    r  c              	   c  s.   |\}}t |d | d}|V  W 5 Q R X d S Nr   )r]   r   r]   r  r   r   clientrJ   rJ   rK   r	    s    r	  c                 c  sR   | j  }|r| j   | V  | j  }|r<|s<| j   n|sN|rN| j   dS )zSync client with the Active Memory Manager (AMM) turned off.
    This works regardless of the AMM being on or off in the dask config.
    N)Zammrunningstopstart)r	  beforeafterrJ   rJ   rK   client_no_amm  s    


r  c              	   c  s.   |\}}t |d | d}|V  W 5 Q R X d S r  r  r  rJ   rJ   rK   client_secondary  s    r  c              	   k  s\   |pt  }t| pi d|} t|p"i d|}tf | |d|\}}||fV  W 5 Q R X d S )Nsecurity)worker_kwargsscheduler_kwargs)tls_only_securityr   r   )r  r  r  r}   r  r   rJ   rJ   rK   tls_cluster_context%  s    
 r  c              	   c  s(   t |d\}}||fV  W 5 Q R X d S )N)r  )r  )r]   r  r   r   rJ   rJ   rK   tls_cluster3  s    r  c              	   c  s0   | \}}t |d ||d}|V  W 5 Q R X d S )Nr   )r  r]   r  )r  r]   r  r  r   r	  rJ   rJ   rK   
tls_client9  s    r  c                   C  s   t  S rG   )r  rJ   rJ   rJ   rK   r  @  s    r  c                 C  s0   | D ]}|   q| D ]}|  |  qd S rG   )killjoinrY   )	processesprocrJ   rJ   rK   _kill_join_processesE  s
    
r  c                 C  s   |    |   | j   d S rG   )rY   Zjoin_threadZ_writer)r   rJ   rJ   rK   _close_queueO  s    r  r   
   c                 #  sB  |pi }|pi }|pi }t   tdd t  t  |rJt}nt}t }g }|t	| t
  }	|t|	 t
 jdt|	 d |f|dd}
|
  ||
 i }t
  }|t| t D ]P}tdtjd|}t
 jd|||	|f|d}|  || d	|i||j< q|	 }t|tr>||t D ].}| \}}t|trj|||| d
< qJt z|d }d|diW n tk
r   i Y nX  fdd}t| d
idd | D fV  W 5 Q R X z
t }W n tk
r   Y n
X |   W 5 Q R X W 5 Q R X W 5 Q R X d S )NTcheckzDask cluster test: Schedulerr^   )rQ   targetr|   r}   daemon)nthreadsmemory_limitzDask cluster test: Worker)rQ   r!  r|   r}   r  r   r  Zconnection_argsr	  c               
     s^   t f4 I d H :} |  I d H }t| kr2qJt  dkrtdqW 5 Q I d H R X d S )N   zTimeout on cluster creation)r)   Zncores_runningr   r,   r   )r  r#  nworkersZ
rpc_kwargsZsaddrr  rJ   rK   wait_for_workers  s    z!cluster.<locals>.wait_for_workersc                 S  s$   g | ]}|d  t |d dqS )r   r  )r   r  )weakrefrefrP   wrJ   rJ   rK   
<listcomp>  s   zcluster.<locals>.<listcomp>)!r/   check_process_leakcheck_instancesrh   r   r   
contextlib	ExitStackcallbackr  r7   Queuer  Processr   r  appendr   r   r   MEMORY_LIMITr   r{   rL   r   r,   Zget_connection_argsKeyErrorrj   valuesr   
ValueErrorrY   )r'  Znannyr  r   r  rq   Z_run_workerstackr  r   r   Zworkers_by_pidr   r   r}   r  Zsaddr_or_exceptionr   Zaddr_or_exceptionr  r(  r	  rJ   r&  rK   r   U  s    	"




	
r   floatzdict[str, Any] | NonezCallable[[Callable], Callable])r   clean_kwargsrF   c                   s<   pi st dt rdfdd  fdd}|S )a  Coroutine test

    @pytest.mark.parametrize("param", [1, 2, 3])
    @gen_test(timeout=5)
    async def test_foo(param)
        await ... # use tornado coroutines


    @gen_test(timeout=5)
    async def test_foo():
        await ...  # use tornado coroutines
    \timeout should always be set and it should be smaller than the global one frompytest-timeout  c                  sP   t  4 I d H 2 tt| || I d H W  5 Q I d H R  S Q I d H R X d S rG   )r   r`   wait_forcreate_task)r   r|   r}   r   rJ   rK   async_fn_outer  s
     z gen_test.<locals>.async_fn_outerc                   s8   t  t tf  fdd}t |_|S )Nc                    s"   t stdt f| |S )Nz,gen_test only works for coroutine functions.)r8   r   rj   )r|   r}   )rB  r   rJ   rK   	test_func  s    z&gen_test.<locals>._.<locals>.test_func)r   r   rh   cleaninspect	signature__signature__)r   rC  )rB  r<  r~   rK   r     s    zgen_test.<locals>._)AssertionErrorrC   )r   r<  r   rJ   )rB  r<  r   rK   gen_test  s    rI  z-list[tuple[str, int] | tuple[str, int, dict]]r   z Security | dict[str, Any] | Noneztype[ServerNode]z"tuple[Scheduler, list[ServerNode]])r#  r   r  r<   r  r  rF   c                   s  |pi }pi t f dd|d|I d H  fddt| D }tj| I d H  t }tjt| k stdd j D stdd j	 D rt
d	I d H  t |d
 kr`tjdd |D  I d H   I d H  t t t tdq`|fS )NTr   )r   r  r   r   c              
     sP   g | ]H\}} j f|d  |d|d dt|dkrDt|d nqS )r^   Tr   )r#  rQ   r  r   r   r   )r   r   r   )rP   r   Zncorer<   r  r  r  rJ   rK   r-    s   	
z!start_cluster.<locals>.<listcomp>c                 s  s   | ]}|j tjkV  qd S rG   )statusr&   r
  )rP   wsrJ   rJ   rK   	<genexpr>  s     z start_cluster.<locals>.<genexpr>c                 s  s   | ]}|j d kV  qd S rG   )comm)rP   rN  rJ   rJ   rK   rM    s     {Gz?rT   c                 s  s   | ]}|j d dV  qdS )r^   rA  N)rY   r+  rJ   rJ   rK   rM    s     zCluster creation timeout)r   	enumerater`   gatherr,   r   r   anyr8  Zstream_commsr   rY    check_invalid_worker_transitionscheck_invalid_task_statescheck_worker_fail_hardTimeoutError)r#  r   r  r<   r  r  r   r  rJ   rJ  rK   start_cluster  s<    
rW  r   )r  rF   c                 C  s^   | j dsd S | j d D ]*\}}|d}td| ttf | qtdt| j d d S )Nzinvalid-worker-transitionrZ   Worker:z Invalid worker transitions found)eventsr{   popprintr?   r9  r   )r  r   msgrZ   rJ   rJ   rK   rS  "  s    

 rS  c                 C  sb   | j dsd S | j d D ]:\}}td|d  td|d  |d D ]}t| qFqtdd S )Nzinvalid-worker-task-staterX  rZ   zState:statestoryzInvalid worker task state)rY  r{   r[  r9  )r  r   r\  linerJ   rJ   rK   rT  0  s    rT  c                 C  s   | j dsd S | j d D ]z\}}| }|d}t|d j|d j|d< t|d j|d j|d< td| tf |\}}}|st	|
|qd S )Nzworker-fail-hardrZ   	exception	tracebackzFailed worker)rY  r{   rt   rZ  r0   headerframesr[  r'   rH  with_traceback)r  r   r\  rZ   r   tbrJ   rJ   rK   rU  =  s    

rU  c                   sb   t d dd  tj fdd|D  I d H  |  I d H  |   t|  t|  t|  d S )NzClosing out test clusterc              	     s,   t tjtt |  I d H  W 5 Q R X d S rG   )r	   r`   rV  r$   EnvironmentErrorrY   )r,  rJ   rJ   rK   
end_workerO  s    zend_cluster.<locals>.end_workerc                 3  s   | ]} |V  qd S rG   rJ   r+  rg  rJ   rK   rM  S  s     zend_cluster.<locals>.<genexpr>)	rR   debugr`   rQ  rY   r  rS  rT  rU  )r  r   rJ   rh  rK   end_clusterL  s    
rj  r   Ztest_cluster_dumpz4list[tuple[str, int] | tuple[str, int, dict]] | Nonezstr | Literal[False])r#  r   r   r  r<   r	  r  r  client_kwargsr   rq   r<  allow_unclosedcluster_dump_directoryrF   c                   s   ddl m  	d krddg	p"i p*i p2i p:i pBi sPtdt rZdttddd	d
tttjdd	d 	
fdd}|S )Nr   r  )r   r^   )r   r   r=  r>  Fz:0P  )Z	dashboardZdashboard_addresstransition_counter_max   )r$  Zdeath_timeoutro  c                   s   t stdttf dditf 	
fddt}d gdt  }r|	dd  |j
|  |j fdd	|j D d
_S )Nz/gen_cluster only works for coroutine functions.z!distributed.comm.timeouts.connectZ5sc               
     sj   t j	
fdd t jfdd f
ddfdd}t|S )	Nc              
    sH   r> | j fdd4 I d H }|V  W 5 Q I d H R X nd V  d S )NT)r  Zasynchronous)r   )r  c)r   r	  rk  r  rJ   rK   _client_factory  s    zBgen_cluster.<locals>._.<locals>.test_func.<locals>._client_factoryc                   s   g } d}ztdD ]}z"t dI d H \}}W nR tk
r } z4tjd|j	j
 d| ddd	 tdI d H  W 5 d }~X Y qX || d d <  qq|dkrtd
|| fV  W 5 t || I d H  tt dI d H  X d S )NFr^   <   )r  r<   r  r  zFailed to start gen_cluster: : z
; retryingT)exc_infozCould not start cluster)rj  r`   r?  r[   r   rW  r   rR   error	__class__r   r   )r   r  r   rL  e)r<   r#  r   r  r  r  rJ   rK   _cluster_factory  s4    "zCgen_cluster.<locals>._.<locals>.test_func.<locals>._cluster_factoryc                    s  d } t j  4 I d H \}} |4 I d H b}|g| }|d k	rZ|g| }zF|}t|}tt|	}|I d H } t|f|  W n tjk
r>   |st	t
 }|j|d rt||jdI d H  |  | stdI d H  qt|f|  td	 d|  d Y n\ tjjk
rX    Y nB tk
r   rtdst||jdI d H   Y nX W 5 Q I d H R X W 5 Q I d H R X z
t }W n tk
r   Y nX |jddI d H  d	d
 }	zft! }t! |d k r6t"#  |	 s"qZtdI d H  qrNt$d|	   nt%d|	 W 5 tj  t  |D ]:}
t|
dd rtz|
j  W n t k
r   Y nX qtX | W  5 Q R  S Q R X d S )N)file)r  rL  
output_dir	func_namerO  zTest timeout after z6s.
========== Test stack trace starts here ==========
xfailT)fastc                   S  s"   dd t jD dd t D  S )Nc                 S  s   g | ]}|  s|qS rJ   closedrP   rq  rJ   rJ   rK   r-     s      zegen_cluster.<locals>._.<locals>.test_func.<locals>.async_fn.<locals>.get_unclosed.<locals>.<listcomp>c                 S  s   g | ]}|j d kr|qS r  rK  r  rJ   rJ   rK   r-     s    
 )r   rX   r   r8  rJ   rJ   rJ   rK   get_unclosed  s    zQgen_cluster.<locals>._.<locals>.test_func.<locals>.async_fn.<locals>.get_uncloseddatars  皙?zUnclosed Comms: Unclosed Comms)&rp   rq   rm   r`   r@  r?  Zshieldvalidate_staterV  rH  ioStringIOprint_stackdump_cluster_stater   cancelZ	cancelledr   getvaluer   r}  r   has_pytestmarkr   r9  _closer   rX   rr   r   r   r  OSErrorr,   gcZcollectr[  r   )rk   r  r   rq  r|   coroZtaskZcoro2bufferr  r,  r  )
rr  ry  rl  rm  rq   r   r}   
outer_argsrC  r   rJ   rK   r     s    




 (

z;gen_cluster.<locals>._.<locals>.test_func.<locals>.async_fnc                     sr   t  d4 I d H P rDtj d dI d H W  5 Q I d H R  S  I d H W  5 Q I d H R  S Q I d H R X d S )N)r   r   rA  )r   r`   r?  rJ   )r   r   r   rJ   rK   rB  ?  s    ,zAgen_cluster.<locals>._.<locals>.test_func.<locals>.async_fn_outer)r0  asynccontextmanagerrj   )r  r}   rB  )r   r<   r   rl  r	  rk  rm  rq   r   r#  r   r  r  rC  r   r  )rr  ry  r   r}   r  rK   rC    s     fz)gen_cluster.<locals>._.<locals>.test_funcr^   r   c                   s   g | ]\}}| j kr|qS rJ   )	arguments)rP   rQ   p)boundrJ   rK   r-  O  s   
z*gen_cluster.<locals>._.<locals>.<listcomp>)
parameters)r8   r   r   r   rh   rD  rE  rF  r   insertbind_partialreplacer  r   rG  )r   Zorig_sigr|   r   r<   r   rl  r<  r	  rk  rm  rq   r#  r   r  r  r   r  )r  r   rC  rK   r     s$    0 


zgen_cluster.<locals>._)r   r   rH  rC   r   dictr   r6  )r#  r   r   r  r<   r	  r  r  rk  r   rq   r<  rl  rm  r   rJ   r  rK   gen_cluster[  sD    	( 6r  zlist[ServerNode])r  rL  r{  r|  rF   c           
   	     s   |   }t }|r"t|d tr2dd |D }n*| jddiddI dH }d	d | D }|||d
}tj|dd tj	
||d }t|d}	t||	 W 5 Q R X td|  dS )zA variant of Client.dump_cluster_state, which does not rely on any of the below
    to work:

    - Having a client at all
    - Client->Scheduler comms
    - Scheduler->Worker comms (unless using Nannies)
    r   c                 S  s   i | ]}|j | qS rJ   )r   _to_dictr+  rJ   rJ   rK   rS   j  s      z&dump_cluster_state.<locals>.<dictcomp>opZ
dump_staterF   )r\  on_errorNc                 S  s(   i | ] \}}|t |tr t|n|qS rJ   )rL   r   reprrP   kvrJ   rJ   rK   rS   m  s    )r   r   r   T)exist_okz.yamlr,  zDumped cluster state to )r  version_moduleget_versionsrL   r<   	broadcastr   ry   makedirspathr  openyamlZ	safe_dumpr[  )
r  rL  r{  r|  Zscheduler_infoZversions_infoZworkers_infor]  fnamefhrJ   rJ   rK   r  [  s"    
r  zScheduler | Worker | Nanny)serversrF   c                  G  sB   | D ]8}t |tr"|jr"|  qt |tr|jjr|  qdS )zRun validate_state() on the Scheduler and all the Workers of the cluster.
    Excludes workers wrapped by Nannies and workers manually started by the test.
    N)rL   r   r   r  r<   r]  )r  r  rJ   rJ   rK   r  ~  s
    
r  zsubprocess.Popen)r  terminate_timeoutrF   c                 C  sf   |   d krbtjdr&| tj n| tj z| j|d W 5 tt	 | 
  W 5 Q R X X d S )NwinrA  )pollrH   platform
startswithsend_signalsignalZCTRL_BREAK_EVENTSIGINTr	   r  r  communicate)r  r  rJ   rJ   rK   _terminate_process  s    
r  z	list[str]r   z!Iterator[subprocess.Popen[bytes]])r|   capture_outputr  kill_timeoutr}   rF   c                 k  s6  |rt j|d< t j|d< tjdr.t j|d< t| } tjdr^tj	
tjd| d | d< ntj	
tjd| d | d< t j| f|}z
|V  W 5 zt|| W 5 |j|d\}}|rtd	|j d
|  d tt|tr| n| |r$td|j d
|  d tt|tr| n| X X W 5 Q R X dS )a  Start a shell command in a subprocess.
    Yields a subprocess.Popen object.

    On exit, the subprocess is terminated.

    Parameters
    ----------
    args: list[str]
        Command line arguments
    capture_output: bool, default False
        Set to True if you need to read output from the subprocess.
        Stdout and stderr will both be piped to ``proc.stdout``.

        If False, the subprocess will write to stdout/stderr normally.

        When True, the test could deadlock if the stdout pipe's buffer gets full
        (Linux default is 65536 bytes; macOS and Windows may be smaller).
        Therefore, you may need to periodically read from ``proc.stdout``, or
        use ``proc.communicate``. All the deadlock warnings apply from
        https://docs.python.org/3/library/subprocess.html#subprocess.Popen.stderr.

        Note that ``proc.communicate`` is called automatically when the
        contextmanager exits. Calling code must not call ``proc.communicate``
        in a separate thread, since it's not thread-safe.

        When captured, the stdout/stderr of the process is always printed
        when the process exits for easier test debugging.
    terminate_timeout: optional, default 30
        When the contextmanager exits, SIGINT is sent to the subprocess.
        ``terminate_timeout`` sets how many seconds to wait for the subprocess
        to terminate after that. If the timeout expires, SIGKILL is sent to
        the subprocess (which cannot be blocked); see ``kill_timeout``.
        If this timeout expires, `subprocess.TimeoutExpired` is raised.
    kill_timeout: optional, default 10
        When the contextmanger exits, if the subprocess does not shut down
        after ``terminate_timeout`` seconds in response to SIGINT, SIGKILL
        is sent to the subprocess (which cannot be blocked). ``kill_timeout``
        controls how long to wait after SIGKILL to join the process.
        If this timeout expires, `subprocess.TimeoutExpired` is raised.
    kwargs: optional
        optional arguments to subprocess.Popen
    stdoutstderrr  creationflagsScriptsr   binrA  z------ stdout: returncode z,  ------z------ stderr: returncode N)
subprocessPIPESTDOUTrH   r  r  ZCREATE_NEW_PROCESS_GROUPr   ry   r  r  prefixPopenr  r[  
returncoderL   bytesdecoder  )r|   r  r  r  r}   r  outerrrJ   rJ   rK   popen  s*    2



r  r  c                 C  sH   t  | }|  sDt| t  |kr
|d k	r0|  td| d q
d S Nzcondition not reached until z seconds)r,   r   r   r   	predicater   r   Zperiodr   rJ   rJ   rK   r?    s    

r?  c                   sP   t  | }|  sLt|I d H  t  |kr
|d k	r8|  td| d q
d S r  )r,   r`   r   r   r   r  rJ   rJ   rK   r     s    

r   c                  C  s   t ddkrdS d } }zdzDttjtj} | d | d t| 	 dd }W W "dS  t
k
rz   Y W 
dS X W 5 |dk	r|  | dk	r|   X dS )	z
    Return whether IPv6 is locally functional.  This doesn't guarantee IPv6
    is properly configured outside of localhost.
    ZDISABLE_IPV61FN)z::r   r%  r   T)ry   getenvrY   socketAF_INET6SOCK_STREAMbindlistencreate_connectiongetsocknamer  )ZservclirJ   rJ   rK   has_ipv6  s    


r  c                 C  s   | S rG   rJ   )rC  rJ   rJ   rK   requires_ipv6  s    r  zipv6 required      ?c                   s&   t | fd|i|I dH }|  dS )zh
    Check that it is possible to connect to the distributed *addr*
    within the given *timeout*.
    r   N)r(   abort)addrr   r}   rN  rJ   rJ   rK   assert_can_connect  s    r  c              	     s<   t |( t| fd|i|I dH }|  W 5 Q R X dS )zj
    Check that it is impossible to connect to the distributed *addr*
    within the given *timeout*.
    r   N)r   raisesr(   r  )r  r   Zexception_classr}   rN  rJ   rJ   rK   assert_cannot_connect$  s    r  tcpc                   sv   t d|| f f|t d|t | f f|g}t rb|t d|| f f|t d|t | f f|g7 }tj| I dH  dS )zT
    Check that the local *port* is reachable from all IPv4 and IPv6 addresses.
    %s://127.0.0.1:%d
%s://%s:%d%s://[::1]:%d%s://[%s]:%dN)r  r5   r  r6   r`   rQ  r   protocolr}   rg   rJ   rJ   rK   &assert_can_connect_from_everywhere_4_60  s    r  c                   sv   t d|| f f|t d|t | f f|g}t rb|td|| f f|td|t | f f|g7 }tj| I dH  dS )zK
    Check that the local *port* is reachable from all IPv4 addresses.
    r  r  r  r  N)r  r5   r  r  r6   r`   rQ  r  rJ   rJ   rK   $assert_can_connect_from_everywhere_4@  s    r  c                   s|   t d|  f|g}t dkr8|tdt | f f|g7 }t rh|td|  f|tdt | f f|g7 }tj| I dH  dS )zR
    Check that the local *port* is only reachable from local IPv4 addresses.
    tcp://127.0.0.1:%dr   tcp://%s:%dtcp://[::1]:%dtcp://[%s]:%dN)r  r5   r  r  r6   r`   rQ  r   r}   rg   rJ   rJ   rK   assert_can_connect_locally_4R  s    
r  c                   sf   t  s
ttd|  f|tdt | f f|td|  f|tdt | f f|g}tj| I dH  dS )zK
    Check that the local *port* is reachable from all IPv6 addresses.
    r  r  r  r  Nr  rH  r  r5   r  r6   r`   rQ  r  rJ   rJ   rK   $assert_can_connect_from_everywhere_6a  s    
r  c                   sx   t  s
ttd|  f|tdt | f f|td|  f|g}t dkrd|tdt | f f|g7 }tj| I dH  dS )zR
    Check that the local *port* is only reachable from local IPv6 addresses.
    r  r  r  z::1r  Nr  r  rJ   rJ   rK   assert_can_connect_locally_6o  s    

r  c              
   c  s   t | trt| } | j}| jdd }|dk	r<| j}|| _t }t	|g| jdd< | 
| z
|V  W 5 || jdd< | 
| |dk	r|| _X dS )z%Capture output from the given Logger.N)rL   r   rM   	getLoggerrO   handlers	propagater  r  StreamHandlersetLevel)rR   rO   r  Z
orig_levelZorig_handlersZorig_propagatesiorJ   rJ   rK   captured_logger  s     




r  c                 c  s:   t | tjst| j}t | _z| jV  W 5 || _X dS )z4Capture output from the given logging.StreamHandler.N)rL   rM   r  rH  streamr  r  )handlerorig_streamrJ   rJ   rK   captured_handler  s    
r   c              	   c  sx   ddl m} tjj}t|}z8|  |t| tj||  t| dV  W 5 |  || t| X dS )z6
    Temporarily change configuration dictionary.
    r   )defaultsN)	distributed.configr  rp   rq   rt   ru   rr   rs   r#   )
new_configr  rq   Zorig_configrJ   rJ   rK   r    s    


r  c              	   c  s   ddl }tjd}tjdd\}}z\t|d}||	|  W 5 Q R X |tjd< z
dV  W 5 |rt|tjd< ntjd= X W 5 t| X dS )zH
    Temporarily change configuration file to match dictionary *c*.
    r   NZDASK_CONFIGzdask-configr  r,  )
r  ry   rz   r{   tempfilemkstempremovefdopenwritedump)rq  r  Zold_filefdr  ro   rJ   rJ   rK   new_config_file  s    

r  testsc                 C  s&   t jt| }t j|s"t||S )z;
    Get the path to one of the test TLS certificates.
    )ry   r  r  	certs_direxistsrH  )filenamer  rJ   rJ   rK   get_cert  s    r  c                  C  s6   t d} t d}ddd| d|id|id|idiiiS )z=
    A functional TLS configuration with our test certs.
    tls-ca-cert.pemztls-key-cert.pemr   rN  Ztlscert)zca-filer	  r   rZ   )r  )ca_fileZkeycertrJ   rJ   rK   
tls_config  s    r  c                  C  s   t  } d| d d d< | S )zg
    A functional TLS configuration with our test certs, disallowing
    plain TCP communications.
    Tr   rN  zrequire-encryption)r  )rq  rJ   rJ   rK   tls_only_config  s    r  c               	   C  s    t t  t } W 5 Q R X | S )z:
    A Security object with proper TLS configuration.
    )r  r  r2   secrJ   rJ   rK   tls_security  s    r  c               	   C  s*   t t  t } W 5 Q R X | js&t| S )zg
    A Security object with proper TLS configuration and disallowing plain
    TCP communications.
    )r  r  r2   Zrequire_encryptionrH  r  rJ   rJ   rK   r    s    
r  tls-cert.pemtls-key.pemr  c                 C  s<   t jt jjt|d}d|_t j|_|t| t| |S N)cafileF)	sslcreate_default_contextPurposeCLIENT_AUTHr  check_hostnameCERT_REQUIREDverify_modeload_cert_chaincertfilekeyfiler  ctxrJ   rJ   rK   get_server_ssl_context  s
    r*  c                 C  s<   t jt jjt|d}d|_t j|_|t| t| |S r  )	r  r  r   SERVER_AUTHr  r"  r#  r$  r%  r&  rJ   rJ   rK   get_client_ssl_context  s
    r,  c              
   C  sx   t d}z0|| \}}||k r8|| |t||f W n8 tk
rr } zt d| d|  W 5 d }~X Y nX d S )Nresourcezrlimit too low (z) and can't be increased: )r   importorskipZ	getrlimitZ	setrlimitminr   skip)limitZdesiredr-  Zsofthardrx  rJ   rJ   rK   bump_rlimit"  s    
r3  c                  K  s&   |  dddg tf dt d| S )Nr#  )tls://127.0.0.1r^   )r4  r   r4  )r   r  )
setdefaultr  r  )r}   rJ   rJ   rK   gen_tls_cluster,  s     r6  c               	   c  sb   t j} t j}z
d V  W 5 tt jD ]\}}||kr"t j|= q"t j D ]}|| krFt j|= qFX d S rG   )rH   r   r  rP  keys)Zold_modulesold_pathr   elemrJ   rJ   rK   save_sys_modules3  s    

r:  c               
   #  s   t   dV  t }  fddt  D }|s2qntd t | d krddlm} t }zft| dg}t|d	D ]F\}}|	d
| dt| d| d |	d
|||j  qxW 5 ~X tjd
|dd qdS )z7Context manager to ensure we haven't leaked any threadsNc                   s"   g | ]}| krd |j kr|qS )zwatch message queuerQ   )rP   threadZactive_threads_startrJ   rK   r-  K  s   
z%check_thread_leak.<locals>.<listcomp>rO  r%  r   )profilez! thread(s) were leaked from test
r^   z#------ Call stack of leaked thread /rt  r   
F)Zpytrace)	threadingrP  r,   r   r   r>  rH   _current_framesr   r5  r  Z
call_stackidentr   r   )r  Zbad_threadsr>  rc  linesr   r<  rJ   r=  rK   check_thread_leakB  s.    

rF  zlist[multiprocessing.Process])r   rF   c                 C  sH   t  }t  }|sg S | t   | }|dkr2|S |d j|d qdS )zWait until timeout for get_mp_context().active_children() to terminate.
    Return list of active subprocesses after the timeout expired.
    r   rA  N)r,   r7   active_childrenr  )r   t0childrenZjoin_timeoutrJ   rJ   rK   wait_active_childrenk  s    
rJ  c                 C  s^   t   }|D ]}|  qt| d}|D ]}|  q*tdd}|rZtd| tsZtdS )zSend SIGTERM to get_mp_context().active_children(), wait up to 3 seconds for processes
    to die, then send SIGKILL to the survivors
    rA  rT   z(Leaked unkillable children processes: %sN)	r7   rG  	terminaterJ  r  rR   warningr"   rH  )r   rI  r  rJ   rJ   rK   term_or_kill_active_children}  s    




rM  T(      zIterator[None])r   check_timeoutterm_timeoutrF   c              	   c  sF   t |d z*dV  | r2t|d}|r2td| W 5 t |d X dS )a  Terminate any currently-running subprocesses at both the beginning and end of this context

    Parameters
    ----------
    check : bool, optional
        If True, raise AssertionError if any processes survive at the exit
    check_timeout: float, optional
        Wait up to these many seconds for subprocesses to terminate before failing
    term_timeout: float, optional
        After sending SIGTERM to a subprocess, wait up to these many seconds before
        sending SIGKILL
    rA  NzTest leaked subprocesses: )rM  rJ  rH  )r   rP  rQ  rI  rJ   rJ   rK   r.    s    

r.  c               
   c  s  t j  tj  tj  tj  tj  tj  tj  t	j  t
  tj  d V  t } tt
rtd t | d k sntqnt
  tjD ]B}tt0 |jj|jdd |jtkr|j|j W 5 Q R X qtj  t } tdd tjD r&td t | d k stqtj  tdD ],}tdd tjD rZ qntd q8d	d
 tjD }tj  td|tdd t	jD stdd t	jD tdd tjD stttjtj  t	j  t  d S )Ng?r  FrW   c                 s  s   | ]}|j d kV  qdS )r  Nr  r  rJ   rJ   rK   rM    s     z"check_instances.<locals>.<genexpr>r%  c                 s  s   | ]}|  V  qd S rG   r  r  rJ   rJ   rK   rM    s     c                 S  s   g | ]}|  s|qS rJ   r  r  rJ   rJ   rK   r-    s      z#check_instances.<locals>.<listcomp>r  c                 s  s$   | ]}|j tjtjtjhkV  qd S rG   )rK  r&   r  initfailedrP   nrJ   rJ   rK   rM    s   c                 S  s   i | ]}||j qS rJ   r  rT  rJ   rJ   rK   rS     s      z#check_instances.<locals>.<dictcomp>c                 s  s   | ]}|j tjkV  qd S rG   )rK  r&   r  r  rJ   rJ   rK   rM    s     )r   rX   rr   r<   r   r*   Z_initialized_clientsSchedulerTaskStateWorkerTaskStater-   r   r   r,   rm   r   rH  r	   r   r]   rl   rY   rK  r;   rR  r   allr9  r   r3   Zclear_all_instances)r  r,  r   LrJ   rJ   rK   r/    sZ    

















r/  c               	   k  sZ   t   tjjt dddf| . t D ]\}}t	|
| q,dV  W 5 Q R X dS )zNSet recommended config values for tests that create or interact with clusters.z500 msF)Zlocal_directoryzdistributed.admin.tick.intervalz"distributed.worker.profile.enabledN)rw   rp   rq   rm   r  
gettempdirlogging_levelsr   rM   r  r  )Zextra_configrQ   rO   rJ   rJ   rK   rh     s    
rh   c                 c  sb   t d  | rt nt > t|d( |r2t nt  d V  W 5 Q R X W 5 Q R X W 5 Q R X d S )Nr  )r`   Zset_event_looprF  r   r.  r/  )threadsZ	instancesr  rJ   rJ   rK   rD    s
    
rD  c                	   c  s   t   d V  W 5 Q R X d S rG   )rD  rJ   rJ   rJ   rK   rn     s    rn   c                   @  s    e Zd ZdZdd Zdd ZdS )TaskStateMetadataPluginz)WorkPlugin to populate TaskState.metadatac                 C  s   |j j| _d S rG   )r]  tasks)r   rZ   rJ   rJ   rK   setup  s    zTaskStateMetadataPlugin.setupc                 K  sH   | j | }|dkr(|dkr(t |jd< n|dkrD|dkrDt |jd< d S )Nready	executing
start_timeZmemoryZ	stop_time)r^  r,   metadata)r   r   r  finishr}   tsrJ   rJ   rK   
transition  s
    
z"TaskStateMetadataPlugin.transitionN)r   r   r   __doc__r_  rf  rJ   rJ   rJ   rK   r]  
  s   r]  c                   @  s8   e Zd Zdd Zdd ZdddZdd	d
Zdd ZdS )
LockedCommc                 C  s0   || _ || _|| _|| _|| _t|ts,td S rG   )write_eventwrite_queue
read_event
read_queuerN  rL   r    rH  )r   rN  rk  rl  ri  rj  rJ   rJ   rK   r     s    zLockedComm.__init__c                 C  s   t | j|S rG   )r   rN  r   rQ   rJ   rJ   rK   __getattr__"  s    zLockedComm.__getattr__Nmessagec                   sN   | j r | j | jj|fI d H  | jr6| j I d H  | jj|||dI d H S )N)serializersr  )rj  r   rN  peer_addressri  ra   r	  r   r\  rp  r  rJ   rJ   rK   r	  %  s
    zLockedComm.writec                   sN   | j j|dI d H }| jr4| j| j j|fI d H  | jrJ| j I d H  |S )N)deserializers)rN  readrl  r   rq  rk  ra   )r   rs  r\  rJ   rJ   rK   rt  ,  s    zLockedComm.readc                   s   | j  I d H  d S rG   )rN  rY   r   rJ   rJ   rK   rY   4  s    zLockedComm.close)Nro  )N)r   r   r   r   rn  r	  rt  rY   rJ   rJ   rJ   rK   rh    s
   

rh  c                   @  s2   e Zd ZdZdddZdd Zdd Zd	d
 ZdS )_LockedCommPoolaH  A ConnectionPool wrapper to intercept network traffic between servers

    This wrapper can be attached to a running server to intercept outgoing read or write requests in test environments.

    Examples
    --------
    >>> w = await Worker(...)
    >>> read_event = asyncio.Event()
    >>> read_queue = asyncio.Queue()
    >>> w.rpc = _LockedCommPool(
            w.rpc,
            read_event=read_event,
            read_queue=read_queue,
        )
    # It might be necessary to remove all existing comms
    # if the wrapped pool has been used before
    >>> w.rpc.remove(remote_address)

    >>> async def ping_pong():
            return await w.rpc(remote_address).ping()
    >>> with pytest.raises(asyncio.TimeoutError):
    >>>     await asyncio.wait_for(ping_pong(), 0.01)
    >>> read_event.set()
    >>> await ping_pong()
    Nc                 C  s"   || _ || _|| _|| _|| _d S rG   )ri  rj  rk  rl  pool)r   rv  rk  rl  ri  rj  rJ   rJ   rK   r   S  s
    z_LockedCommPool.__init__c                 C  s   t | j|S rG   )r   rv  rm  rJ   rJ   rK   rn  \  s    z_LockedCommPool.__getattr__c                   s,   | j j||I d H }t|| j| j| j| jS rG   )rv  r(   rh  rk  rl  ri  rj  )r   r|   r}   rN  rJ   rJ   rK   r(   _  s        z_LockedCommPool.connectc                   s   | j  I d H  d S rG   )rv  rY   r   rJ   rJ   rK   rY   e  s    z_LockedCommPool.close)NNNN)r   r   r   rg  r   rn  r(   rY   rJ   rJ   rJ   rK   ru  8  s          
	ru  c                   C  sB   t d zt  W n& tk
r<   tr6t jdd  Y nX dS )zWork around https://github.com/dask/distributed/issues/5601 where any test that
    inits Security.temporary() crashes on MacOS GitHub Actions CI
    Zcryptographyzdistributed#5601)reasonN)r   r.  r2   	temporaryImportErrorr!   r}  rJ   rJ   rJ   rK   xfail_ssl_issue5601i  s    
rz  c                 C  s  t  }d}| D ]}zt|dks(tdt|ts:tdt|d trP|d sXtdt|d tsntd|r||d kstd	|d
 |d   k r|d ksn td|d }W q tk
r } z"td| d| dt|  W 5 d}~X Y qX qdS )aL  Test that a story is well formed.

    Parameters
    ----------
    story: list[tuple]
        Output of Worker.story
    ordered_timestamps: bool, optional
        If False, timestamps are not required to be monotically increasing.
        Useful for asserting stories composed from the scheduler and
        multiple workers
    g        r   z	too shortznot a tuplezstimulus_id not a stringzTimestamp is not a floatz*Timestamps are not monotonically ascendingr>  r^   zTimestamps is too oldzMalformed story event: z

Problem: z.
in story:
N)r,   r   rH  rL   tupler   r;  _format_story)r^  ordered_timestampsnowZprev_tsevr  rJ   rJ   rK   assert_valid_storyv  s     *r  )strictr  zlist[tuple])r^  expectr  r  rF   c                C  s   t | |d dd }zX|r0t| t|kr0t t| }|D ],}t|}|||sh||dd |r@q<q@q<W n8 tk
r   td|dt|  dt| dY nX dS )	a7  Test the output of ``Worker.story`` or ``Scheduler.story``

    Warning
    =======

    Tests with overly verbose stories introduce maintenance cost and should
    therefore be used with caution. This should only be used for very specific
    unit tests where the exact order of events is crucial and there is no other
    practical way to assert or observe what happened.
    A typical use case involves testing for race conditions where subtle changes
    of event ordering would cause harm.

    Parameters
    ==========
    story: list[tuple]
        Output of Worker.story
    expect: list[tuple]
        Expected events.
        The expected events either need to be exact matches or are allowed to
        not provide a stimulus_id and timestamp.
        e.g.
        `("log", "entry", "stim-id-9876", 1234)`
        is equivalent to
        `("log", "entry")`

        story (the last two fields are always the stimulus_id and the timestamp).

        Elements of the expect tuples can be

        - callables, which accept a single element of the event tuple as argument and
          return True for match and False for no match;
        - arbitrary objects, which are compared with a == b

        e.g.
        .. code-block:: python

            expect=[
                ("x", "missing", "fetch", "fetch", {}),
                ("gather-dependencies", worker_addr, lambda set_: "x" in set_),
            ]

    strict: bool, optional
        If True, the story must contain exactly as many events as expect.
        If False (the default), the story may contain more events than expect; extra
        events are ignored.
    ordered_timestamps: bool, optional
        If False, timestamps are not required to be monotically increasing.
        Useful for asserting stories composed from the scheduler and
        multiple workers
    )r  c                 S  s(   t | t |ko&tdd t| |D S )Nc                 s  s*   | ]"\}}t |r||n||kV  qd S rG   )callable)rP   r  exrJ   rJ   rK   rM    s    z5assert_story.<locals>._valid_event.<locals>.<genexpr>)r   rX  zip)r   	ev_expectrJ   rJ   rK   _valid_event  s    z"assert_story.<locals>._valid_eventNr{  zassert_story(strict=z) failed
story:
z	
expect:
)r  r   StopIterationiterr   rH  r~  )r^  r  r  r  r  Zstory_itr  r   rJ   rJ   rK   assert_story  s&    9
r  )r^  rF   c                 C  s    | sdS dd dd | D  S )Nz(empty story)z- z
- c                 s  s   | ]}t |V  qd S rG   )r   )rP   r  rJ   rJ   rK   rM    s     z _format_story.<locals>.<genexpr>)r  )r^  rJ   rJ   rK   r~    s    r~  c                   @  s@   e Zd ZdZdZdd Zdd Zdd Zdd	d
ZdddZ	dS )
BrokenCommr@  c                 C  s   d S rG   rJ   r   rJ   rJ   rK   rY     s    zBrokenComm.closec                 C  s   dS )NTrJ   r   rJ   rJ   rK   r    s    zBrokenComm.closedc                 C  s   d S rG   rJ   r   rJ   rJ   rK   r    s    zBrokenComm.abortNc                 C  s
   t  d S rG   r  )r   rs  rJ   rJ   rK   rt    s    zBrokenComm.readc                 C  s
   t  d S rG   r  rr  rJ   rJ   rK   r	    s    zBrokenComm.write)N)NN)
r   r   r   rq  Zlocal_addressrY   r  r  rt  r	  rJ   rJ   rJ   rK   r    s   
r  r   )rC  rQ   rF   c                   s"   t | dg }t fdd|D S )zReturn True if the test function is marked by the given @pytest.mark.<name>;
    False otherwise.

    FIXME doesn't work with individually marked parameters inside
          @pytest.mark.parametrize
    Z
pytestmarkc                 3  s   | ]}|j  kV  qd S rG   r;  )rP   markr;  rJ   rK   rM    s     z!has_pytestmark.<locals>.<genexpr>)r   rR  )rC  rQ   ZmarksrJ   r;  rK   r    s    r  z5type[BaseException] | tuple[type[BaseException], ...]z
str | NonezGenerator[(None, None, None)])expected_exceptionmatchexpected_causematch_causerF   c              	   c  sr   t j| |d}dV  W 5 Q R X |j}|js0tt|j|s@||rnt|t|jsntd| d|j ddS )zyContextmanager to assert that a certain exception with cause was raised

    Parameters
    ----------
    exc_type:
    )r  Nz
Pattern ``z`` not found in ``z``)	r   r  value	__cause__rH  rL   researchr   )r  r  r  r  ru  r   rJ   rJ   rK   raises_with_cause  s    
 r  c                 C  s   | d|d }t| dS )a   UCX exception handler for `ucx_loop` during test.

    Prints the exception and its message.

    Parameters
    ----------
    loop: object
        Reference to the running event loop
    context: dict
        Dictionary containing exception details.
    r`  ro  N)r{   r[  )r]   contextr\  rJ   rJ   rK   ucx_exception_handler-  s    r  function)Zscopec                  c  s   t d} t }|t |   |V  |   |  ddl}d|j	j
_ddlm} | }|jrddl}|j }|j  dS )zAllows UCX to cancel progress tasks before closing event loop.

    When UCX tasks are not completed in time (e.g., by unexpected Endpoint
    closure), clean up tasks before closing the event loop to prevent unwanted
    errors from being raised.
    ucpr   N)has_cuda_context)r   r.  r`   Znew_event_loopZset_exception_handlerr  resetrY   Zdistributed.comm.ucxrN  Zucxr  Zdistributed.diagnostics.nvmlr  Zhas_contextZ
numba.cudaZcudaZcurrent_contextZdevice)r  r]   r   r  r)  ZnumbarJ   rJ   rK   ucx_loop?  s    



r  r  zIO[bytes] | Nonez
int | None)r  r  	max_linesrF   c                 C  sZ   |st d}|dk	r0||kr0t | d| d| }t| | |krL|S |d7 }qdS )z
    Read lines from an IO stream until the match is found, and return the matching line.

    Prints each line to test stdout for easier debugging of failures.
    r   Nz not found in z( log lines. See test stdout for details.r^   )rH  readliner[  )r  r  r  r   r_  rJ   rJ   rK   wait_for_log_lineb  s    r  c                      s,   e Zd ZdZ fddZ fddZ  ZS )BlockedGatherDepa  A Worker that sets event `in_gather_dep` the first time it enters the gather_dep
    method and then does not initiate any comms, thus leaving the task(s) in flight
    indefinitely, until the test sets `block_gather_dep`

    Example
    -------
    .. code-block:: python

        @gen_test()
        async def test1(s, a, b):
            async with BlockedGatherDep(s.address) as x:
                # [do something to cause x to fetch data from a or b]
                await x.in_gather_dep.wait()
                # [do something that must happen while the tasks are in flight]
                x.block_gather_dep.set()
                # [from this moment on, x is a regular worker]

    See also
    --------
    BlockedGetData
    BlockedExecute
    c                   s&   t  | _t  | _t j|| d S rG   )r`   r   in_gather_depblock_gather_depsuperr   r   r|   r}   rw  rJ   rK   r     s    

zBlockedGatherDep.__init__c                   s.   | j   | j I d H  t j||I d H S rG   )r  rm   r  ra   r  
gather_depr  r  rJ   rK   r    s    
zBlockedGatherDep.gather_dep)r   r   r   rg  r   r  __classcell__rJ   rJ   r  rK   r  x  s   r  c                      s,   e Zd ZdZ fddZ fddZ  ZS )BlockedGetDataa  A Worker that sets event `in_get_data` the first time it enters the get_data
    method and then does not answer the comms, thus leaving the task(s) in flight
    indefinitely, until the test sets `block_get_data`

    See also
    --------
    BlockedGatherDep
    BlockedExecute
    c                   s&   t  | _t  | _t j|| d S rG   )r`   r   in_get_datablock_get_datar  r   r  r  rJ   rK   r     s    

zBlockedGetData.__init__c                   s4   | j   | j I d H  t j|f||I d H S rG   )r  rm   r  ra   r  get_data)r   rN  r|   r}   r  rJ   rK   r    s    
zBlockedGetData.get_data)r   r   r   rg  r   r  r  rJ   rJ   r  rK   r    s   
r  c                      sJ   e Zd ZdZ fddZdddd fddZd	d
d fddZ  ZS )BlockedExecutea  A Worker that sets event `in_execute` the first time it enters the execute
    method and then does not proceed, thus leaving the task in executing state
    indefinitely, until the test sets `block_execute`.

    After that, the worker sets `in_deserialize_task` to simulate the moment when a
    large run_spec is being deserialized in a separate thread. The worker will block
    again until the test sets `block_deserialize_task`.

    Finally, the worker sets `in_execute_exit` when execute() terminates, but before the
    worker state has processed its exit callback. The worker will block one last time
    until the test sets `block_execute_exit`.

    Note
    ----
    In the vast majority of the test cases, it is simpler and more readable to just
    submit to a regular Worker a task that blocks on a distributed.Event:

    .. code-block:: python

        def f(in_task, block_task):
            in_task.set()
            block_task.wait()

        in_task = distributed.Event()
        block_task = distributed.Event()
        fut = c.submit(f, in_task, block_task)
        await in_task.wait()
        await block_task.set()

    See also
    --------
    BlockedGatherDep
    BlockedGetData
    c                   sN   t  | _t  | _t  | _t  | _t  | _t  | _t j	|| d S rG   )
r`   r   
in_executeblock_executein_deserialize_taskblock_deserialize_taskin_execute_exitblock_execute_exitr  r   r  r  rJ   rK   r     s    





zBlockedExecute.__init__r   rA   )r   stimulus_idrF   c                  sV   | j   | j I d H  zt j||dI d H W S | j  | j I d H  X d S )N)r  )r  rm   r  ra   r  r  r  execute)r   r   r  r  rJ   rK   r    s    

zBlockedExecute.executerW  z&tuple[Callable, tuple, dict[str, Any]])re  rF   c                   s,   | j   | j I d H  t |I d H S rG   )r  rm   r  ra   r  _maybe_deserialize_task)r   re  r  rJ   rK   r    s    
z&BlockedExecute._maybe_deserialize_task)r   r   r   rg  r   r  r  r  rJ   rJ   r  rK   r    s   #
	r  )
jump_startr<   )r,  r  rF   c                c  sN   | j j}| j j}d| j _d| j _dV  || j _|| j _|rJtj| _tj| _dS )a  Prevent any task from transitioning from fetch to flight on the worker while
    inside the context, simulating a situation where the worker's network comms are
    saturated.

    This is not the same as setting the worker to Status=paused, which would also
    inform the Scheduler and prevent further tasks to be enqueued on the worker.

    Parameters
    ----------
    w: Worker
        The Worker on which tasks will not transition from fetch to flight
    jump_start: bool
        If False, tasks will remain in fetch state after exiting the context, until
        something else triggers ensure_communicating.
        If True, trigger ensure_communicating on exit; this simulates e.g. an unrelated
        worker moving out of in_flight_workers.
    r   N)r]  Ztransfer_incoming_count_limitZ*transfer_incoming_bytes_throttle_thresholdr&   ZpausedrK  r
  )r,  r  Zold_count_limitZold_thresholdrJ   rJ   rK   freeze_data_fetching  s    r  r   zIterator[LockedComm])bcommrF   c                 c  sp   |   rt| jst| j  r$t| j}t }t }t|dd|| | _}z
|V  W 5 |  || _X dS )a  
    Contextmanager blocking writes to a `BatchedSend` from sending over the network.

    The returned `LockedComm` object can be used for control flow and inspection via its
    ``read_event``, ``read_queue``, ``write_event``, and ``write_queue`` attributes.

    On exit, any writes that were blocked are un-blocked, and the original comm of the
    `BatchedSend` is restored.
    N)r  rH  rN  r`   r   r3  rh  rm   )r  Z	orig_commri  rj  Zlocked_commrJ   rJ   rK   freeze_batched_send	  s"    
    

r  rO  )intervalzstr | Collection[str]zWorker | Scheduler)r   r]  dask_workerr  rF   c                  s   t |tr|jj}nt |tr&|j}nt|t |tr>|f}t|dkrZtt	t
|nt|}z,| |ksz||  j|krt|I dH  qdW nf tjtjfk
r   | |krd|  d||  jd|j d| }nd|  d|j }t|  Y nX dS )zWait for a task to appear on a Worker or on the Scheduler and to be in a specific
    state or one of a set of possible states.
    r^   Nztasks[z].state=z on z; expected state=z] not found on )rL   r<   r]  r^  r   	TypeErrorr   r   r  r   r  r`   r   ZCancelledErrorrV  r   r[  )r   r]  r  r  r^  Z	state_strr\  rJ   rJ   rK   wait_for_state-	  s$    



$"r  z=type[StateMachineEvent] | tuple[type[StateMachineEvent], ...]rA   )type_r  r  matchesrF   c                  sr   |j j}d}|r\|d |k	r\|d }|D ]2 t | s8q(t fdd| D r(   S q(t|I dH  qdS )zEWait for a specific stimulus to appear in the log of the WorkerState.Nr|  c                 3  s    | ]\}}t  ||kV  qd S rG   )r   r  r  rJ   rK   rM  e	  s     z$wait_for_stimulus.<locals>.<genexpr>)r]  Zstimulus_logrL   rX  r   r`   r   )r  r  r  r  logZlast_evrJ   r  rK   wait_for_stimulusU	  s    

r  c                  c  s   t ddd} | V  |   dS )zAn empty WorkerStatez127.0.0.1:1rn  )r   ro  N)rB   r  )r]  rJ   rJ   rK   rL  j	  s    rL  ra  long-running)paramsc                 c  s   ddi| _ ddi| _| tjdddidd}|tdddgksDt|jdkrb| tddd	d
 | j	d j
|jksxt| V  dS )zA WorkerState running a single task 'x' with resources {R: 1}.

    The task may or may not raise secede(); the tests using this fixture runs twice.
    Rr^   r   Zcompute)r   Zresource_restrictionsr  )r   r  r  g      ?Zsecede)r   Zcompute_durationr  N)Zavailable_resourcesZtotal_resourcesZhandle_stimulusr=   dummyr>   rH  paramr@   r^  r]  )rL  requestZinstructionsrJ   rJ   rK   ws_with_running_taskr	  s     

  
r  c                 C  s
   | j j S rG   )nodeZnodeid)r  rJ   rJ   rK   name_of_test	  s    r  c              
   c  s   t  }tdd }dg}t  | tk rz@t .}|D ]}|||d q6W 5 Q R  W qW 5 Q R X W q tk
r } z.|jtjkrt	d|   t
d W Y qW 5 d }~X Y qX qtd|  d V  d S )Nc              	   s  sP   t  t jt j6}|t jt jd |d| f |d |V  W 5 Q R X d S )Nr^   r@  )r  AF_INETr  
setsockopt
SOL_SOCKETSO_REUSEADDRr  r  )r   r  rJ   rJ   rK   
_bind_port	  s
    
z*requires_default_ports.<locals>._bind_portiR"  )r   z4Address already in use. Waiting before running test r^   z)Default ports didn't open up in time for )r,   r   _TEST_TIMEOUTr0  r1  enter_contextr  errnoZ
EADDRINUSEr[  r   rV  )r  r  r  Zdefault_portsr:  r   r  rJ   rJ   rK   requires_default_ports	  s&    

r  int)r   rF   c                   s8   t  }|d|  dI d H }|jdks,t|jdS )Nzhttp://localhost:z/metrics   utf8)r   fetchcoderH  bodyr  )r   http_clientresponserJ   rJ   rK   fetch_metrics_body	  s    r  zdict[str, Any])r   r  rF   c                   s4   ddl m} t| I d H } fdd||D }|S )Nr   text_string_to_metric_familiesc                   s(   i | ] } d ks|j  r|j |qS rG   )rQ   r  rP   familyr  rJ   rK   rS   	  s
     z!fetch_metrics.<locals>.<dictcomp>)prometheus_client.parserr  r  )r   r  r  txtZfamiliesrJ   r  rK   fetch_metrics	  s    
r  zset[str]c                   s<   ddl m} t| I dH }t j fdd||D  }|S )z
    Get all the names of samples returned by Prometheus.

    This mostly matches list of metric families, but when there's `foo` (gauge) and `foo_total` (count)
    these will both have `foo` as the family.
    r   r  Nc                   s0   g | ](} d ks|j  rdd |jD qS )Nc                 S  s   h | ]
}|j qS rJ   r;  )rP   samplerJ   rJ   rK   	<setcomp>	  s     z8fetch_metrics_sample_names.<locals>.<listcomp>.<setcomp>)rQ   r  Zsamplesr  r  rJ   rK   r-  	  s    z.fetch_metrics_sample_names.<locals>.<listcomp>)r  r  r  rm   union)r   r  r  r  Zsample_namesrJ   r  rK   fetch_metrics_sample_names	  s    
r  c                  C  s   G dd d} t |  S )Nc                   @  s   e Zd Zdd ZdS )z'_get_gc_overhead.<locals>._CustomObjectc                 S  s   dS r   rJ   r   rJ   rJ   rK   
__sizeof__	  s    z2_get_gc_overhead.<locals>._CustomObject.__sizeof__N)r   r   r   r  rJ   rJ   rJ   rK   _CustomObject	  s   r  )rH   	getsizeof)r  rJ   rJ   rK   _get_gc_overhead	  s    r  c                   @  s.   e Zd ZdZdddddZdddd	Zd
S )SizeOfzT
    An object that returns exactly nbytes when inspected by dask.sizeof.sizeof
    r  r   nbytesrF   c                 C  sH   t |tstdt| |tk r:tdt d| d|t | _d S )Nz$Expected integer for nbytes but got zExpected a value larger than z integer but got .)rL   r  r  type	_size_objr9  _nbytes)r   r  rJ   rJ   rK   r   	  s    
zSizeOf.__init__rE   c                 C  s   | j S rG   )r  r   rJ   rJ   rK   r  	  s    zSizeOf.__sizeof__N)r   r   r   rg  r   r  rJ   rJ   rJ   rK   r  	  s   	r  r  c                 C  s   t | S )zEA function that emulates exactly nbytes on the worker data structure.)r  )r  rJ   rJ   rK   
gen_nbytes	  s    r  )r   )r   )r   )r^   )r   )r   )r   )r   )r^   )r^   )NNN)r   FNr  NN)FrT   r  )Nr  )Nr  )r  )r  )r  )r  r  r  )r  r  r  )TrN  rO  )TTT)T)r  )N)N(  
__future__r   r`   concurrent.futuresrf   r0  rt   r  r   r  rE  r  rM   logging.configmultiprocessingry   r  r  r  r  r  rH   r  rB  r   r)  collectionsr   collections.abcr   r   r   r   r   r	   	itertoolsr
   r,   r   typingr   r   r   r   r   r   r  Ztlzr   r   r   Ztornado.httpclientr   Ztornado.ioloopr   rp   r   r   r   r   r   r  Zdistributed.batchedr   Zdistributed.clientr   r   r   Zdistributed.commr   Zdistributed.comm.tcpr    Zdistributed.compatibilityr!   r"   r  r#   Zdistributed.corer$   r%   r&   r'   r(   r)   Zdistributed.deployr*   Zdistributed.diagnostics.pluginr+   Zdistributed.metricsZdistributed.nannyr-   Zdistributed.noder.   Zdistributed.proctitler/   Zdistributed.protocolr0   Zdistributed.schedulerr1   rV  Zdistributed.securityr2   Zdistributed.utilsr3   r4   r5   r6   r7   r8   r9   r:   Zdistributed.workerr;   r<   Z distributed.worker_state_machiner=   r>   r?   r@   rA   rW  rB   Z
dask.arrayry  Zpytest_timeoutrC   r  r   rR   rootmanager
loggerDictr   r[  r  ri   rk   ZNO_AMMr[   Zfixturer]   r\   ru   rq   rv   rw   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r   r  r   __annotations__r   r   r   r   r   rj   r   r   r   r   r  r   r  r  r  r  r	  r  rq  r  r  r  r  r  r  r  r   rI  rW  rS  rT  rU  rj  r  r  r  r  r  r?  r   r  r  r  r0  r  rf  r  r  r  r  r  r  INFOr  r   r  r  r  abspathr  dirname__file__r  r  r  r  r  r  r*  r,  r3  r6  r:  rF  rJ  rM  r.  r/  rh   rD  rn   r]  rh  ru  rz  r  r  r~  r  r  r  r  r  r  r  r  r  r  r  r  r  rL  r  r  r  r  r  r  r  r  r  r  rJ   rJ   rJ   rK   <module>   sH    (




		
	







    



     g27,  #  M




 



	
	    
    


(     ;1&&V*#&#?("(,* $$
