U
    /eM                     @  s  d Z ddlmZ ddlZddlZddlZddlZddlZddlm	Z	m
Z
 ddlmZ ddlmZ ddlmZmZmZmZmZ ddlZddlZddlmZ dd	lmZmZmZ dd
lmZ ddl m!Z!m"Z" ddl#m$Z$ ddl%m&Z& ddl'm(Z(m)Z) ddl*m+Z+m,Z, ddl-m.Z. er*ddl/m0Z0 ddl1m2Z2 G dd dZ3G dd dZ4efddddddddZ5dddd d!d"d#Z6d$dd%d&d'd(Z7G d)d* d*Z8G d+d, d,Z9dS )-a  Encapsulated manager for in-memory tasks on a worker.

This module covers:
- spill/unspill data depending on the 'distributed.worker.memory.target' threshold
- spill/unspill data depending on the 'distributed.worker.memory.spill' threshold
- pause/unpause the worker depending on the 'distributed.worker.memory.pause' threshold
- kill the worker depending on the 'distributed.worker.memory.terminate' threshold

This module does *not* cover:
- Changes in behaviour in Worker, Scheduler, task stealing, Active Memory Manager, etc.
  caused by the Worker being in paused status
- Worker restart after it's been killed
- Scheduler-side heuristics regarding memory usage, e.g. the Active Memory Manager

See also:
- :mod:`distributed.spill`, which implements the spill-to-disk mechanism and is wrapped
  by this module. Unlike this module, :mod:`distributed.spill` is agnostic to the
  Worker.
- :mod:`distributed.active_memory_manager`, which runs on the scheduler side
    )annotationsN)CallableMutableMapping)suppress)partial)TYPE_CHECKINGAny	ContainerLiteralcast)	CPU_COUNT)format_bytesparse_bytesparse_timedelta)system)WINDOWSPeriodicCallback)Status)	monotonic)ManualEvictProtoSpillBuffer)has_arg
log_errors)ThrottledGC)Nanny)Workerc                	   @  s   e Zd ZU dZded< ded< ded< ded< ded	< d
ed< ded< ded< ddddddddddddddddZedddddZddddd d!Zddddd"d#Z	d$d%d&d'd(d)d*Z
dS )+WorkerMemoryManagera  Management of worker memory usage

    Parameters
    ----------
    worker
        Worker to manage

    For meaning of the remaining parameters, see the matching
    parameter names in :class:`~.distributed.worker.Worker`.

    Notes
    -----

    If data is a callable and has the argument ``worker_local_directory`` in its
    signature, it will be filled with the worker's attr:``local_directory``.

    zMutableMapping[str, object]data
int | Nonememory_limitfloat | Literal[False]memory_target_fractionmemory_spill_fractionmemory_pause_fractionzint | Literal[False]	max_spillfloatmemory_monitor_intervalr   _throttled_gcautoN)r   r   r!   r"   r#   r   intstr | floatzMutableMapping[str, Any] | Callable[[], MutableMapping[str, Any]] | Callable[[str], MutableMapping[str, Any]] | tuple[Callable[..., MutableMapping[str, Any]], dict[str, Any]] | Nonefloat | Literal[False] | None)workernthreadsr   r   r!   r"   r#   c                C  s  t d| _t||| jd| _tdd|| _tdd|| _tdd|| _t	j
d	}|d
kr`d
nt|| _t|tr||| _nt|rt|drtd|}||j| _ntd|}| | _nt|tr|\}	}
t|	stdt|	dr|	f d|ji|
| _n|	f |
| _nf| jrp| js(| jrp| jrJt| j| jpB| j }ntj}ttj|jd|| jd| _ni | _tt	j
dd
d| _t| jtt fst!| jr| jd
k	s| jd
k	r| jd k	st!t"t#| j$|| jd }||j%d< t&| jd| _'d S )Nzdistributed.worker.memoryloggerz distributed.worker.memory.targetr!   zdistributed.worker.memory.spillr"   zdistributed.worker.memory.pauser#   z#distributed.worker.memory.max-spillFZworker_local_directoryz)Callable[[str], MutableMapping[str, Any]]z&Callable[[], MutableMapping[str, Any]]zExpecting a callableZstorage)targetr$   *distributed.worker.memory.monitor-intervaldefault  memory_monitor)(logging	getLoggerr/   parse_memory_limitr   _parse_thresholdr!   r"   r#   daskconfiggetr   r$   
isinstancer   r   callabler   r   Zlocal_directorytuple
ValueErrorr)   sysmaxsizer   ospathjoinr   r&   r%   AssertionErrorr   r   r5   periodic_callbacksr   r'   )selfr,   r-   r   r   r!   r"   r#   r$   funckwargsr0   pc rL   =/tmp/pip-unpacked-wheel-g426oqom/distributed/worker_memory.py__init__Q   s      









zWorkerMemoryManager.__init__None)r,   returnc                   s,   |j  }| || | ||I dH  dS )a9  Track this process's memory usage and act accordingly.
        If process memory rises above the spill threshold (70%), start dumping data to
        disk until it goes below the target threshold (60%).
        If process memory rises above the pause threshold (80%), stop execution of new
        tasks.
        N)monitorget_process_memory_maybe_pause_or_unpause_maybe_spill)rH   r,   memoryrL   rL   rM   r5      s    
z"WorkerMemoryManager.memory_monitor)r,   rU   rP   c                 C  s   | j dkrd S | jst|| j }|| j kr~| j  |jtjkr| j	dt
|d t|| jd k	rnt| jnd tj|_nF|jtjkr| j	dt
|d t|| jd k	rt| jnd tj|_d S )NFz^Worker is at %d%% memory usage. Pausing worker.  Process memory: %s -- Worker memory limit: %sd   rO   z^Worker is at %d%% memory usage. Resuming worker. Process memory: %s -- Worker memory limit: %s)r#   r   rF   r'   collectstatusr   runningr/   warningr)   r   Zpaused)rH   r,   rU   fracrL   rL   rM   rS      s4    





	

	z+WorkerMemoryManager._maybe_pause_or_unpausec                   s  | j dkrd S t| jdr&t| jds*d S tt| j}| js@t|| j }|| j krXd S d}| jd|d  | j| j	p|| j  }d}|| }t
  }	}
||krt|js| jdt|t| j qt| }|dkrؐqt||7 }|d	7 }|j }||kr||kr| j  |j }t
 }||	 | jkr@| || |}	||
 d
kr|d||
  tdI d H  t
 }
qt
 }||
 dkr|d||
  |r| jd|t| d S )NFfastevictr   z>Worker is at %.0f%% memory usage. Start spilling data to disk.rV   a  Unmanaged memory use is high. This may indicate a memory leak or the memory may not be released to the OS; see https://distributed.dask.org/en/latest/worker-memory.html#memory-not-released-back-to-the-os for more information. -- Unmanaged memory: %s -- Worker memory limit: %s   g      ?zdisk-write-spill-durationg{Gzt?zMoved %d tasks worth %s to disk)r"   hasattrr   r   r   r   rF   r/   debugr!   r   r\   rZ   r   r]   rQ   rR   r'   rW   r&   rS   Zdigest_metricasynciosleep)rH   r,   rU   r   r[   Ztotal_spilledr0   countZneedZlast_checked_for_pauseZlast_yieldedZweightnowrL   rL   rM   rT      sl    






	


z WorkerMemoryManager._maybe_spillrL   )excludezContainer[str]dict)rf   rP   c                C  s.   dd | j  D }|d= t| j|d< |S )Nc                 S  s    i | ]\}}| d s||qS )_)
startswith).0kvrL   rL   rM   
<dictcomp>I  s     
  z0WorkerMemoryManager._to_dict.<locals>.<dictcomp>r/   r   )__dict__itemsrg   fromkeysr   )rH   rf   inforL   rL   rM   _to_dictH  s    zWorkerMemoryManager._to_dict)__name__
__module____qualname____doc____annotations__rN   r   r5   rS   rT   rr   rL   rL   rL   rM   r   5   s(   
h!_r   c                   @  sT   e Zd ZU ded< ded< ded< ded< d	d
dddddZdddddZdS )NannyMemoryManagerr   r   r    memory_terminate_fractionzfloat | Noner&   r)   _last_terminated_pidr(   )r   r   r*   )nannyr   c                C  s   t d| _t||j| jd| _tjd| _	t
tjddd| _t| jttfsXtd| _| jr| j	dk	rtt| j|| jd }||jd	< d S )
Nzdistributed.nanny.memoryr.   z#distributed.worker.memory.terminater1   Fr2   r^   r4   r5   )r6   r7   r/   r8   r-   r   r:   r;   r<   ry   r   r&   r=   r)   r%   rF   rz   r   r   r5   rG   )rH   r{   r   rK   rL   rL   rM   rN   U  s*      

zNannyMemoryManager.__init__rO   )r{   rP   c              
   C  s  |j tjks0|jdks0|jjdks0|jjjdkr4dS |jj}zt|j j}W n  t	tj
tjfk
rr   Y dS X || j | jkrdS | j|jkr| jd|j d|j d| jd dd |j| _|  n0| jd|j d|j dtrd	nd
 |  dS )zCTrack worker's memory. Restart if it goes above terminate fraction.NzWorker z (pid=z) exceeded rV   z.0fz% memory budget. Restarting...z) is slow to %szterminate; trying againzaccept SIGTERM; sending SIGKILL)rX   r   rY   processpidpsutilProcessZmemory_infoZrssProcessLookupErrorZNoSuchProcessZAccessDeniedr   ry   rz   r/   rZ   Zworker_address	terminater   kill)rH   r{   r|   rU   rL   rL   rM   r5   p  s:    

"
z!NannyMemoryManager.memory_monitorN)rs   rt   ru   rw   rN   r5   rL   rL   rL   rM   rx   O  s   
rx   zstr | float | Noner)   zlogging.Loggerr   )r   r-   total_coresr/   rP   c             	   C  s   | d krd S | }| dkr0t tjtd||  } ttt. t| } t| trd| dkrdt | tj } W 5 Q R X t| t	rt
| } nt | } t| t st| dkrd S tj| k r|d|ttj tjS | S d S )Nr(   r_   r   zBIgnoring provided memory limit %s due to system memory limit of %s)r)   r   ZMEMORY_LIMITminr   r@   	TypeErrorr%   r=   strr   rF   rZ   r   )r   r-   r   r/   origrL   rL   rM   r8     s.    


r8   r   r+   r    )
config_keydeprecated_param_namedeprecated_param_valuerP   c                 C  s2   |d k	r&t d| d|  dt |S tj| S )Nz
Parameter zY has been deprecated and will be removed in a future version; please use dask config key z instead)warningswarnFutureWarningr:   r;   r<   )r   r   r   rL   rL   rM   r9     s    r9   Nanny | WorkerrO   )wnamerP   c              
   C  s4   t dt| j d| dt| j d| t d S )NzThe `.z` attribute has been moved to `z.memory_manager.)r   r   typers   r   )r   r   rL   rL   rM   _warn_deprecated  s    &r   c                   @  sL   e Zd ZU ded< ddddddZddd	d
ddZdd	ddddZdS ) DeprecatedMemoryManagerAttributer   r   r   rO   )ownerr   rP   c                 C  s
   || _ d S N)r   )rH   r   r   rL   rL   rM   __set_name__  s    z-DeprecatedMemoryManagerAttribute.__set_name__Nanny | Worker | Noner   instancer   rP   c                 C  s&   |d krd S t || j t|j| jS r   )r   r   getattrmemory_managerrH   r   r   rL   rL   rM   __get__  s    z(DeprecatedMemoryManagerAttribute.__get__r   )r   valuerP   c                 C  s    t || j t|j| j| d S r   )r   r   setattrr   )rH   r   r   rL   rL   rM   __set__  s    z(DeprecatedMemoryManagerAttribute.__set__N)rs   rt   ru   rw   r   r   r   rL   rL   rL   rM   r     s   
r   c                   @  s   e Zd ZddddddZdS )DeprecatedMemoryMonitorr   r   r   r   c                 C  s$   |d krd S t |d t|jj|S )Nr5   )r   r   r   r5   r   rL   rL   rM   r     s    
zDeprecatedMemoryMonitor.__get__N)rs   rt   ru   r   rL   rL   rL   rM   r     s   r   ):rv   
__future__r   rb   r6   rC   rA   r   collections.abcr   r   
contextlibr   	functoolsr   typingr   r   r	   r
   r   r~   Zdask.configr:   Zdask.systemr   Z
dask.utilsr   r   r   Zdistributedr   Zdistributed.compatibilityr   r   Zdistributed.corer   Zdistributed.metricsr   Zdistributed.spillr   r   Zdistributed.utilsr   r   Zdistributed.utils_perfr   Zdistributed.nannyr   Zdistributed.workerr   r   rx   r8   r9   r   r   r   rL   rL   rL   rM   <module>   sD     X$