U
    /eX                     @  s   d dl mZ d dlZd dlmZmZ d dlmZmZ d dl	m	Z	 d dl
mZmZmZ d dlmZ erd dlZd dlmZ d d	lmZ G d
d dZG dd dZdS )    )annotationsN)AsyncIteratorIterator)asynccontextmanagercontextmanager)datetime)TYPE_CHECKINGAnycast)PeriodicCallback)Client)	Schedulerc                   @  s   e Zd ZU dZded< dd Zd#dddd	d
dddddddZed
dddddddZe	d
dddddddZ
dddddddZdddddd d!d"ZdS )$MemorySamplerav  Sample cluster-wide memory usage every <interval> seconds.

    **Usage**

    .. code-block:: python

       client = Client()
       ms = MemorySampler()

       with ms.sample("run 1"):
           <run first workflow>
       with ms.sample("run 2"):
           <run second workflow>
       ...
       ms.plot()

    or with an asynchronous client:

    .. code-block:: python

       client = await Client(asynchronous=True)
       ms = MemorySampler()

       async with ms.sample("run 1"):
           <run first workflow>
       async with ms.sample("run 2"):
           <run second workflow>
       ...
       ms.plot()
    "dict[str, list[tuple[float, int]]]samplesc                 C  s
   i | _ d S N)r   )self r   J/tmp/pip-unpacked-wheel-g426oqom/distributed/diagnostics/memory_sampler.py__init__6   s    zMemorySampler.__init__Nprocessg      ?clientmeasureintervalz
str | NonezClient | Nonestrfloatr	   )labelr   r   r   returnc                C  s@   |sddl m} | }|jr,| ||||S | ||||S dS )a:  Context manager that records memory usage in the cluster.
        This is synchronous if the client is synchronous and
        asynchronous if the client is asynchronous.

        The samples are recorded in ``self.samples[<label>]``.

        Parameters
        ==========
        label: str, optional
            Tag to record the samples under in the self.samples dict.
            Default: automatically generate a random label
        client: Client, optional
            client used to connect to the scheduler.
            Default: use the global client
        measure: str, optional
            One of the measures from :class:`distributed.scheduler.MemoryState`.
            Default: sample process memory
        interval: float, optional
            sampling interval, in seconds.
            Default: 0.5
        r   )
get_clientN)distributed.clientr   Zasynchronous_sample_async_sample_sync)r   r   r   r   r   r   r   r   r   sample9   s    zMemorySampler.sampler   zIterator[None]c              
   c  sJ   |j |jj|j||d}z
d V  W 5 |j |jj|d}|| j|pB|< X d S Nr   )key)sync	schedulermemory_sampler_startidmemory_sampler_stopr   r   r   r   r   r   r%   r   r   r   r   r"   `   s    
zMemorySampler._sample_synczAsyncIterator[None]c              	   C sN   |j j|j||dI d H }z
d V  W 5 |j j|dI d H }|| j|pF|< X d S r$   )r'   r(   r)   r*   r   r+   r   r   r   r!   p   s      
zMemorySampler._sample_asyncFalignboolzpd.DataFrame)r-   r   c                C  s   ddl }i }| j D ]d\}}|s&t||dd }|j|jdd|_||_|rr| jt	|j
|jd 8  _|||< q||}t|dkr| ||  }|S )ai  Return the data series as a pandas.Dataframe.

        Parameters
        ==========
        align : bool, optional
            If True, change the absolute timestamps into time deltas from the first
            sample of each series, so that different series can be visualized side by
            side. If False (the default), use absolute timestamps.
        r   N   s)unit)pandasr   itemsAssertionErrorZ	DataFrameZ	set_indexZto_datetimeindexnamer
   Z	TimestamplenZffillwhereZisnaZbfill)r   r-   pdssr   Zs_listr0   dfr   r   r   	to_pandas}   s    


zMemorySampler.to_pandas)r-   kwargsr   c                K  s&   | j |dd }|jf ddd|S )aW  Plot data series collected so far

        Parameters
        ==========
        align : bool (optional)
            See :meth:`~distributed.diagnostics.MemorySampler.to_pandas`
        kwargs
            Passed verbatim to :meth:`pandas.DataFrame.plot`

        Returns
        =======
        Output of :meth:`pandas.DataFrame.plot`
        r,   i   @timezCluster memory (GiB))ZxlabelZylabel)r<   plot)r   r-   r=   r;   r   r   r   r?      s    zMemorySampler.plot)N)__name__
__module____qualname____doc____annotations__r   r#   r   r"   r   r!   r<   r?   r   r   r   r   r      s   
 '!r   c                   @  sT   e Zd ZU dZded< ded< ddddZd	d	d
d	dddZd	ddddZdS )MemorySamplerExtensionz2Scheduler extension - server side of MemorySamplerr   r'   r   r   )r'   c                 C  s8   || _ | | j jd< | j| j jd< | j| j jd< i | _d S )NZmemory_samplerr(   r*   )r'   
extensionsstarthandlersstopr   )r   r'   r   r   r   r      s
    zMemorySamplerExtension.__init__r   r   )r   r   r   r   c                   s~    drtttjjts&ttt	 g j
<  fdd}t||d }|jjd < |  |  S )z"Start periodically sampling memory_c                    sJ    j jkr<t  } tj j}j | |f n
	 d S r   )
r'   Zclientsr   now	timestampgetattrmemoryr   appendrI   )tsnbytesr   r%   r   r   r   r   r#      s
    z,MemorySamplerExtension.start.<locals>.samplei  MemorySampler-)
startswithr4   
isinstancerM   r'   rN   intr   uuidZuuid4r   r   periodic_callbacksrG   )r   r   r   r   r#   pcr   rR   r   rG      s    
zMemorySamplerExtension.startzlist[tuple[float, int]])r%   r   c                 C  s&   | j jd| }|  | j|S )z$Stop sampling and return the samplesrS   )r'   rX   poprI   r   )r   r%   rY   r   r   r   rI      s    zMemorySamplerExtension.stopN)r@   rA   rB   rC   rD   r   rG   rI   r   r   r   r   rE      s   
rE   )
__future__r   rW   collections.abcr   r   
contextlibr   r   r   typingr   r	   r
   Zdistributed.compatibilityr   r2   r9   r    r   Zdistributed.schedulerr   r   rE   r   r   r   r   <module>   s    !