from __future__ import annotations import uuid from collections.abc import AsyncIterator, Iterator from contextlib import asynccontextmanager, contextmanager from datetime import datetime from typing import TYPE_CHECKING, Any, cast from distributed.compatibility import PeriodicCallback if TYPE_CHECKING: # Optional runtime dependencies import pandas as pd # Circular dependencies from distributed.client import Client from distributed.scheduler import Scheduler class MemorySampler: """Sample cluster-wide memory usage every seconds. **Usage** .. code-block:: python client = Client() ms = MemorySampler() with ms.sample("run 1"): with ms.sample("run 2"): ... ms.plot() or with an asynchronous client: .. code-block:: python client = await Client(asynchronous=True) ms = MemorySampler() async with ms.sample("run 1"): async with ms.sample("run 2"): ... ms.plot() """ samples: dict[str, list[tuple[float, int]]] def __init__(self): self.samples = {} def sample( self, label: str | None = None, *, client: Client | None = None, measure: str = "process", interval: float = 0.5, ) -> Any: """Context manager that records memory usage in the cluster. This is synchronous if the client is synchronous and asynchronous if the client is asynchronous. The samples are recorded in ``self.samples[