U
    ÷Õ/er  ã                   @  s°   d dl mZ d dlZd dlZd dlZd dlmZ d dlZd dlm	Z	 d dl
mZ e e¡ ZZG dd„ dƒZG dd	„ d	ƒZG d
d„ dƒZeƒ Zd ae ¡ Zdd„ Zddd„ZdS )é    )ÚannotationsN)Údeque)Úformat_bytes)Úthread_timec                   @  s"   e Zd ZdZd	dd„Zdd„ ZdS )
ÚThrottledGCa	  Wrap gc.collect to protect against excessively repeated calls.

    Allows to run throttled garbage collection in the workers as a
    countermeasure to e.g.: https://github.com/dask/zict/issues/19

    collect() does nothing when repeated calls are so costly and so frequent
    that the thread would spend more than max_in_gc_frac doing GC.

    warn_if_longer is a duration in seconds (10s by default) that can be used
    to log a warning level message whenever an actual call to gc.collect()
    lasts too long.
    çš™™™™™©?é   Nc                 C  s0   || _ || _tƒ | _d| _|d k	r&|nt| _d S )Nr   )Úmax_in_gc_fracÚwarn_if_longerr   Úlast_collectÚlast_gc_durationÚ_loggerÚlogger)Úselfr	   r
   r   © r   ú:/tmp/pip-unpacked-wheel-g426oqom/distributed/utils_perf.pyÚ__init__   s
    zThrottledGC.__init__c                 C  sž   d}t ƒ }t|| j |ƒ}| j| | jk rˆ| j d|¡ t ¡  || _tt ƒ | |ƒ| _| j| j	krv| j 
d| j¡ qš| j d| j¡ n| j d| j|¡ d S )Ngíµ ÷Æ°>z9Calling gc.collect(). %0.3fs elapsed since previous call.z¤gc.collect() took %0.3fs. This is usually a sign that some tasks handle too many Python objects at the same time. Rechunking the work into smaller tasks might help.zgc.collect() took %0.3fszNgc.collect() lasts %0.3fs but only %0.3fs elapsed since last call: throttling.)r   Úmaxr   r   r	   r   ÚdebugÚgcÚcollectr
   Úwarning)r   ZMIN_RUNTIMEZcollect_startÚelapsedr   r   r   r   &   s,     ÿú	üzThrottledGC.collect)r   r   N)Ú__name__Ú
__module__Ú__qualname__Ú__doc__r   r   r   r   r   r   r      s   
r   c                   @  sD   e Zd ZdZdZefdd„Zdd„ Zdd„ Zd	d
„ Z	e
dd„ ƒZdS )ÚFractionalTimerz 
    An object that measures runtimes, accumulates them and computes
    a running fraction of the recent runtimes over the corresponding
    elapsed time.
    g    eÍÍAc                 C  s2   || _ || _tƒ | _tƒ | _d | _d | _d | _d S ©N)Ú_timerÚ
_n_samplesr   Ú_start_stopsÚ
_durationsÚ
_cur_startÚ_running_sumÚ_running_fraction)r   Ú	n_samplesZtimerr   r   r   r   Q   s    zFractionalTimer.__init__c           
      C  sä   | j }| j}||k s(|r,||d d k r,d S t|| | j ƒ}| ||f¡ | |¡ t|ƒ}|t|ƒksnt‚|| jkrà| jd krœ|| jkst‚t	|ƒ| _nD| 
¡ \}}| 
¡ }	|  j||	 7  _||krà| j||  | j | _d S )Néÿÿÿÿr   )r!   r"   ÚintÚMULTÚappendÚlenÚAssertionErrorr    r$   ÚsumÚpopleftr%   )
r   ÚstartÚstopZstart_stopsZ	durationsÚdurationÚnZ	old_startZold_stopZold_durationr   r   r   Ú_add_measurementZ   s&    


ÿz FractionalTimer._add_measurementc                 C  s   | j d kst‚|  ¡ | _ d S r   )r#   r,   r   ©r   r   r   r   Ústart_timingu   s    zFractionalTimer.start_timingc                 C  s0   |   ¡ }| j}d | _|d k	s t‚|  ||¡ d S r   )r   r#   r,   r3   )r   r0   r/   r   r   r   Ústop_timingy   s
    zFractionalTimer.stop_timingc                 C  s   | j S r   )r%   r4   r   r   r   Úrunning_fraction€   s    z FractionalTimer.running_fractionN)r   r   r   r   r)   r   r   r3   r5   r6   Úpropertyr7   r   r   r   r   r   H   s   	r   c                   @  sR   e Zd ZdZdZddd„Zdd„ Zd	d
„ Zedd„ ƒZ	dd„ Z
dd„ Zdd„ ZdS )ÚGCDiagnosiszß
    An object that hooks itself into the gc callbacks to collect
    timing and memory statistics, and log interesting info.

    Don't instantiate this directly except for tests.
    Instead, use the global instance.
    é   çš™™™™™¹?ç    ÐcAc                 C  s   || _ || _d| _d S ©NF)Ú_warn_over_fracÚ_info_over_rss_winÚ_enabled)r   Zwarn_over_fracZinfo_over_rss_winr   r   r   r      s    zGCDiagnosis.__init__c                 C  sL   | j r
t‚t| jd| _t ¡ | _| j}|t	j
ks6t‚t	j
 |¡ d| _ d S )N)r&   T)r@   r,   r   Ú	N_SAMPLESÚ_fractional_timerÚpsutilÚProcessÚ_procÚ_gc_callbackr   Ú	callbacksr*   )r   Úcbr   r   r   Úenable•   s    

zGCDiagnosis.enablec                 C  s"   | j s
t‚tj | j¡ d| _ d S r=   )r@   r,   r   rG   ÚremoverF   r4   r   r   r   Údisable    s    
zGCDiagnosis.disablec                 C  s   | j S r   )r@   r4   r   r   r   Úenabled¥   s    zGCDiagnosis.enabledc                 C  s   |   ¡  | S r   )rI   r4   r   r   r   Ú	__enter__©   s    zGCDiagnosis.__enter__c                 C  s   |   ¡  d S r   )rK   )r   Úexc_typeÚ	exc_valueÚ	tracebackr   r   r   Ú__exit__­   s    zGCDiagnosis.__exit__c                 C  sÒ   |d dkrd S | j  ¡ j}|dkr8| j ¡  || _d S |dksDt‚| j ¡  | jj}|d k	r€|| j	kr€t
 dd| d| j	 ¡ | j| }|| jkr²t
 dt|ƒ|d t| jƒ¡ |d	 d
krÎt
 d|d	 ¡ d S )NZ
generationé   r/   r0   zFfull garbage collections took %d%% CPU time recently (threshold: %d%%)éd   zLfull garbage collection released %s from %d reference cycles (threshold: %s)Z	collectedZuncollectabler   zHgarbage collector couldn't collect %d objects, please look in gc.garbage)rE   Zmemory_infoÚrssrB   r5   Z_gc_rss_beforer,   r6   r7   r>   r   r   r?   Úinfor   )r   ÚphaserU   rT   ÚfracZ	rss_savedr   r   r   rF   °   s:    

ü

ûýzGCDiagnosis._gc_callbackN)r;   r<   )r   r   r   r   rA   r   rI   rK   r8   rL   rM   rQ   rF   r   r   r   r   r9   …   s   

r9   c                	   C  s8   t * tdkrt ¡  n
tjs"t‚td7 aW 5 Q R X dS )z,
    Ask to enable global GC diagnosis.
    r   r   N)Ú_gc_diagnosis_lockÚ_gc_diagnosis_usersÚ_gc_diagnosisrI   rL   r,   r   r   r   r   Úenable_gc_diagnosisÛ   s
    

r[   Fc              	   C  sR   t D tdkrDtd8 atdkr(t ¡  n| r:t ¡  dan
tjsDt‚W 5 Q R X dS )z-
    Ask to disable global GC diagnosis.
    r   r   N)rX   rY   rZ   rK   rL   r,   )Úforcer   r   r   Údisable_gc_diagnosisè   s    
r]   )F)Ú
__future__r   r   ÚloggingÚ	threadingÚcollectionsr   rC   Z
dask.utilsr   Zdistributed.metricsr   Ú	getLoggerr   r   r   r   r   r9   rZ   rY   ÚLockrX   r[   r]   r   r   r   r   Ú<module>   s    7=Q