U
    /ed,                     @   s   d dl mZ d dlmZ d dlmZmZmZ d dlm	Z	 d dl
mZ d dlmZ d dlmZ edd	ZG d
d deZeddZG dd deZG dd deZeddZG dd deZdS )    )
namedtuple)starmap)PipeProcesscurrent_process)sleep)default_timer)Callback)import_requiredTaskData)keytask
start_timeZend_timeZ	worker_idc                       s`   e Zd ZdZdd Z fddZdd Zdd	 Zd
d Zdd Z	dd Z
dd Zdd Z  ZS )Profilera  A profiler for dask execution at the task level.

    Records the following information for each task:
        1. Key
        2. Task
        3. Start time in seconds since the epoch
        4. Finish time in seconds since the epoch
        5. Worker id

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> from dask.diagnostics import Profiler
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with Profiler() as prof:
    ...     get(dsk, 'z')
    22

    >>> prof.results        # doctest: +SKIP
    [TaskData(key='y', task=(add, 'x', 10), start_time=..., end_time=..., worker_id=...),
     TaskData(key='z', task=(mul, 'y', 2), start_time=..., end_time=..., worker_id=...)]

    These results can be visualized in a bokeh plot using the ``visualize``
    method. Note that this requires bokeh to be installed.

    >>> prof.visualize()    # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()
    >>> prof.unregister()

    c                 C   s   i | _ g | _i | _d S N)_resultsresults_dskself r   </tmp/pip-unpacked-wheel-dbjnr7gq/dask/diagnostics/profile.py__init__:   s    zProfiler.__init__c                    s   |    t  S r   clearsuper	__enter__r   	__class__r   r   r   ?   s    zProfiler.__enter__c                 C   s   | j | d S r   )r   updater   dskr   r   r   _startC   s    zProfiler._startc                 C   s   t  }||| |f| j|< d S r   r   r   )r   r   r!   statestartr   r   r   _pretaskF   s    zProfiler._pretaskc                 C   s    t  }| j|  ||f7  < d S r   r#   )r   r   valuer!   r$   idendr   r   r   	_posttaskJ   s    zProfiler._posttaskc                 C   s>   dd | j  D }|  jttt| 7  _| j   d S )Nc                 S   s"   i | ]\}}t |d kr||qS )   )len).0kvr   r   r   
<dictcomp>O   s       z$Profiler._finish.<locals>.<dictcomp>)r   itemsr   listr   r   valuesr   )r   r!   r$   failedr   r   r   r   _finishN   s    zProfiler._finishc                 K   s   ddl m} || j| jf|S )Nr   )
plot_tasks)"dask.diagnostics.profile_visualizer6   r   r   )r   kwargsr6   r   r   r   _plotS   s    zProfiler._plotc                 K   s   ddl m} || f|S zVisualize the profiling run in a bokeh plot.

        See also
        --------
        dask.diagnostics.profile_visualize.visualize
        r   )	visualizer7   r;   r   r8   r;   r   r   r   r;   X   s    zProfiler.visualizec                 C   s    | j   | jdd= i | _dS z#Clear out old results from profilerN)r   r   r   r   r   r   r   r   r   c   s    
zProfiler.clear)__name__
__module____qualname____doc__r   r   r"   r&   r*   r5   r9   r;   r   __classcell__r   r   r   r   r      s   )r   ResourceData)timememcpuc                       s   e Zd ZdZdddZdd Zdd Zd	d
 Z fddZ fddZ	dd Z
dd Zdd ZeZdd Zdd Zdd Z  ZS )ResourceProfilera   A profiler for resource use.

    Records the following each timestep
        1. Time in seconds since the epoch
        2. Memory usage in MB
        3. % CPU usage

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with ResourceProfiler() as prof:
    ...     get(dsk, 'z')
    22

    These results can be visualized in a bokeh plot using the ``visualize``
    method. Note that this requires bokeh to be installed.

    >>> prof.visualize() # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()

    Note that when used as a context manager data will be collected throughout
    the duration of the enclosed block. In contrast, when registered globally
    data will only be collected while a dask scheduler is active.

    >>> prof.unregister()
       c                 C   s   || _ d| _d | _g | _d S NF)_dt_entered_trackerr   r   dtr   r   r   r      s    zResourceProfiler.__init__c                 C   s   | j d k	o| j  S r   )rM   is_aliver   r   r   r   _is_running   s    zResourceProfiler._is_runningc                 C   s0   |   st| j| _| j  | jjd d S )Ncollect)rQ   _TrackerrK   rM   r%   parent_connsendr   r   r   r   _start_collect   s    
zResourceProfiler._start_collectc                 C   s4   |   r0| jjd | jtt| jj  d S )N	send_data)	rQ   rM   rT   rU   r   extendr   rD   recvr   r   r   r   _stop_collect   s    zResourceProfiler._stop_collectc                    s    d| _ |   |   t  S NT)rL   r   rV   r   r   r   r   r   r   r      s    zResourceProfiler.__enter__c                    s&   d| _ |   |   t j|  d S rJ   )rL   rZ   closer   __exit__)r   argsr   r   r   r]      s    zResourceProfiler.__exit__c                 C   s   |    d S r   )rV   r    r   r   r   r"      s    zResourceProfiler._startc                 C   s   | j s|   d S r   )rL   rZ   )r   r!   r$   r4   r   r   r   r5      s    zResourceProfiler._finishc                 C   s   |   r| j  d| _dS )z%Shutdown the resource tracker processN)rQ   rM   shutdownr   r   r   r   r\      s    
zResourceProfiler.closec                 C   s
   g | _ d S r   )r   r   r   r   r   r      s    zResourceProfiler.clearc                 K   s   ddl m} || jf|S )Nr   )plot_resources)r7   r`   r   )r   r8   r`   r   r   r   r9      s    zResourceProfiler._plotc                 K   s   ddl m} || f|S r:   r<   r=   r   r   r   r;      s    zResourceProfiler.visualize)rI   )r?   r@   rA   rB   r   rQ   rV   rZ   r   r]   r"   r5   r\   __del__r   r9   r;   rC   r   r   r   r   rH   m   s   &
rH   c                       s:   e Zd ZdZd fdd	Zdd Zdd Zd	d
 Z  ZS )rS   z.Background process for tracking resource usagerI   c                    s2   t    d| _|| _t j| _t \| _| _	d S r[   )
r   r   daemonrO   r   pid
parent_pidr   rT   
child_connrN   r   r   r   r      s
    

z_Tracker.__init__c                 C   s*   | j js| j d | j   |   d S )Nr_   )rT   closedrU   r\   joinr   r   r   r   r_      s    
z_Tracker.shutdownc                    s    | j g fdd| j  D  S )Nc                    s&   g | ]}|j  kr| d kr|qS )Zzombie)rc   status)r-   prc   r   r   
<listcomp>   s    
  z)_Tracker._update_pids.<locals>.<listcomp>)parentchildren)r   rc   r   rj   r   _update_pids   s    z_Tracker._update_pidsc              	   C   s  t dd}|| j| _t }g }z| j }W n tk
rH   Y q"Y nX |dkrXqq"|dkr| |}|rx| j	 st
 }d }}|D ]@}	z|	 j}
|	 }W n tk
r   Y qX ||
7 }||7 }q|||d |f t| j qjq"|dkr"| j| g }q"| j  d S )Npsutilz9Tracking resource usage requires `psutil` to be installedr_   rR   r   g    .ArW   )r
   r   rd   rl   r   re   rY   KeyboardInterruptrn   pollr   Zmemory_infoZrssZcpu_percent	Exceptionappendr   rO   rU   r\   )r   ro   rc   datamsgZpsZticrF   rG   ri   Zmem2Zcpu2r   r   r   run   s@     



z_Tracker.run)rI   )	r?   r@   rA   rB   r   r_   rn   rv   rC   r   r   r   r   rS      s
   rS   	CacheData)r   r   metricZ
cache_timeZ	free_timec                       sZ   e Zd ZdZdddZ fddZdd Zd	d
 Zdd Zdd Z	dd Z
dd Z  ZS )CacheProfilera]  A profiler for dask execution at the scheduler cache level.

    Records the following information for each task:
        1. Key
        2. Task
        3. Size metric
        4. Cache entry time in seconds since the epoch
        5. Cache exit time in seconds since the epoch

    Examples
    --------

    >>> from operator import add, mul
    >>> from dask.threaded import get
    >>> from dask.diagnostics import CacheProfiler
    >>> dsk = {'x': 1, 'y': (add, 'x', 10), 'z': (mul, 'y', 2)}
    >>> with CacheProfiler() as prof:
    ...     get(dsk, 'z')
    22

    >>> prof.results    # doctest: +SKIP
    [CacheData(key='y', task=(add, 'x', 10), metric=1, cache_time=..., free_time=...),
     CacheData(key='z', task=(mul, 'y', 2), metric=1, cache_time=..., free_time=...)]

    The default is to count each task (``metric`` is 1 for all tasks). Other
    functions may used as a metric instead through the ``metric`` keyword. For
    example, the ``nbytes`` function found in ``cachey`` can be used to measure
    the number of bytes in the cache.

    >>> from cachey import nbytes                   # doctest: +SKIP
    >>> with CacheProfiler(metric=nbytes) as prof:  # doctest: +SKIP
    ...     get(dsk, 'z')
    22

    The profiling results can be visualized in a bokeh plot using the
    ``visualize`` method. Note that this requires bokeh to be installed.

    >>> prof.visualize() # doctest: +SKIP

    You can activate the profiler globally

    >>> prof.register()

    If you use the profiler globally you will need to clear out old results
    manually.

    >>> prof.clear()
    >>> prof.unregister()

    Nc                 C   s>   |    |r|ndd | _|r&|| _n|r4|j| _nd| _d S )Nc                 S   s   dS )NrI   r   )r'   r   r   r   <lambda>N      z(CacheProfiler.__init__.<locals>.<lambda>count)r   _metric_metric_namer?   )r   rx   Zmetric_namer   r   r   r   L  s    
zCacheProfiler.__init__c                    s   |    t  S r   r   r   r   r   r   r   V  s    zCacheProfiler.__enter__c                 C   s   | j | | jst | _d S r   )r   r   _start_timer   r    r   r   r   r"   Z  s    zCacheProfiler._startc           
   	   C   sb   t  }| ||f| j|< |d | j @ D ]0}| j|\}}	| jt||| ||	| q,d S )NZreleased)r   r}   _cachekeyspopr   rs   rw   )
r   r   r'   r!   r$   r(   tr.   rx   r%   r   r   r   r*   _  s
    zCacheProfiler._posttaskc              	   C   sH   t  }| j D ](\}\}}| jt||| ||| q| j  d S r   )r   r   r1   r   rs   rw   r   )r   r!   r$   r4   r   r.   rx   r%   r   r   r   r5   f  s    zCacheProfiler._finishc                 K   s&   ddl m} || j| j| j| jf|S )Nr   )
plot_cache)r7   r   r   r   r   r~   )r   r8   r   r   r   r   r9   l  s       zCacheProfiler._plotc                 K   s   ddl m} || f|S r:   r<   r=   r   r   r   r;   s  s    zCacheProfiler.visualizec                 C   s   g | _ i | _i | _d| _dS r>   )r   r   r   r   r   r   r   r   r   ~  s    zCacheProfiler.clear)NN)r?   r@   rA   rB   r   r   r"   r*   r5   r9   r;   r   rC   r   r   r   r   ry     s   3

ry   N)collectionsr   	itertoolsr   multiprocessingr   r   r   rE   r   Ztimeitr   Zdask.callbacksr	   Z
dask.utilsr
   r   r   rD   rH   rS   rw   ry   r   r   r   r   <module>   s&    Z
j< 