from __future__ import annotations import logging import math import operator import os from collections import OrderedDict, defaultdict from collections.abc import Iterable from datetime import datetime from numbers import Number from typing import Any, TypeVar import numpy as np from bokeh.core.properties import value, without_property_validation from bokeh.io import curdoc from bokeh.layouts import column, row from bokeh.models import ( AdaptiveTicker, Arrow, BasicTicker, BoxSelectTool, BoxZoomTool, CDSView, ColorBar, ColumnDataSource, CustomJSHover, DataRange1d, FactorRange, GroupFilter, HoverTool, HTMLTemplateFormatter, NumberFormatter, NumeralTickFormatter, OpenURL, PanTool, Range1d, ResetTool, Tabs, TapTool, Title, VeeHead, WheelZoomTool, ) from bokeh.models.widgets import DataTable, TableColumn from bokeh.models.widgets.markups import Div from bokeh.palettes import Viridis11 from bokeh.plotting import figure from bokeh.themes import Theme from bokeh.transform import cumsum, factor_cmap, linear_cmap, stack from jinja2 import Environment, FileSystemLoader from tlz import curry, pipe, valmap from tlz.curried import concat, groupby, map from tornado import escape import dask from dask import config from dask.utils import ( format_bytes, format_time, funcname, key_split, parse_bytes, parse_timedelta, ) from distributed.core import Status from distributed.dashboard.components import add_periodic_callback from distributed.dashboard.components.shared import ( DashboardComponent, ProfileServer, ProfileTimePlot, SystemMonitor, ) from distributed.dashboard.core import TabPanel from distributed.dashboard.utils import ( _DATATABLE_STYLESHEETS_KWARGS, BOKEH_VERSION, PROFILING, transpose, update, ) from distributed.diagnostics.graph_layout import GraphLayout from distributed.diagnostics.progress import GroupTiming from distributed.diagnostics.progress_stream import color_of, progress_quads from distributed.diagnostics.task_stream import TaskStreamPlugin from distributed.diagnostics.task_stream import color_of as ts_color_of from distributed.diagnostics.task_stream import colors as ts_color_lookup from distributed.metrics import time from distributed.scheduler import Scheduler from distributed.utils import Log, log_errors if dask.config.get("distributed.dashboard.export-tool"): from distributed.dashboard.export_tool import ExportTool else: ExportTool = None # type: ignore T = TypeVar("T") logger = logging.getLogger(__name__) env = Environment( loader=FileSystemLoader( os.path.join(os.path.dirname(__file__), "..", "..", "http", "templates") ) ) BOKEH_THEME = Theme( filename=os.path.join(os.path.dirname(__file__), "..", "theme.yaml") ) TICKS_1024 = {"base": 1024, "mantissas": [1, 2, 4, 8, 16, 32, 64, 128, 256, 512]} XLABEL_ORIENTATION = -math.pi / 9 # slanted downwards 20 degrees logos_dict = { "numpy": "statics/images/numpy.png", "pandas": "statics/images/pandas.png", "builtins": "statics/images/python.png", } class Occupancy(DashboardComponent): """Occupancy (in time) per worker""" @log_errors def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "occupancy": [0, 0], "worker": ["a", "b"], "x": [0.0, 0.1], "y": [1, 2], "ms": [1, 2], "color": ["red", "blue"], "escaped_worker": ["a", "b"], } ) self.root = figure( title="Occupancy", tools="", toolbar_location="above", x_axis_type="datetime", min_border_bottom=50, **kwargs, ) rect = self.root.rect( source=self.source, x="x", width="ms", y="y", height=0.9, color="color" ) rect.nonselection_glyph = None self.root.xaxis.minor_tick_line_alpha = 0 self.root.yaxis.visible = False self.root.ygrid.visible = False # fig.xaxis[0].formatter = NumeralTickFormatter(format='0.0s') self.root.x_range.start = 0 tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) hover = HoverTool() hover.tooltips = "@worker : @occupancy s." hover.point_policy = "follow_mouse" self.root.add_tools(hover, tap) @without_property_validation @log_errors def update(self): workers = self.scheduler.workers.values() y = list(range(len(workers))) occupancy = [ws.occupancy for ws in workers] ms = [occ * 1000 for occ in occupancy] x = [occ / 500 for occ in occupancy] total = sum(occupancy) color = [] for ws in workers: if ws in self.scheduler.idle: color.append("red") elif ws in self.scheduler.saturated: color.append("green") else: color.append("blue") if total: self.root.title.text = ( f"Occupancy -- total time: {format_time(total)} " f"wall time: {format_time(total / self.scheduler.total_nthreads)}" ) else: self.root.title.text = "Occupancy" if occupancy: result = { "occupancy": occupancy, "worker": [ws.address for ws in workers], "ms": ms, "color": color, "escaped_worker": [escape.url_escape(ws.address) for ws in workers], "x": x, "y": y, } update(self.source, result) class ProcessingHistogram(DashboardComponent): """How many tasks are on each worker""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( {"left": [1, 2], "right": [10, 10], "top": [0, 0]} ) self.root = figure( title="Tasks Processing (count)", name="processing", y_axis_label="frequency", tools="", **kwargs, ) self.root.xaxis.minor_tick_line_alpha = 0 self.root.ygrid.visible = False self.root.toolbar_location = None self.root.quad( source=self.source, left="left", right="right", bottom=0, top="top", color="deepskyblue", fill_alpha=0.5, ) @without_property_validation def update(self): L = [len(ws.processing) for ws in self.scheduler.workers.values()] counts, x = np.histogram(L, bins=40) self.source.data.update({"left": x[:-1], "right": x[1:], "top": counts}) class MemoryColor: """Change the color of the memory bars from blue to orange when process memory goes above the ``target`` threshold and to red when the worker pauses. Workers in ``closing_gracefully`` state will also be orange. If ``target`` is disabled, change to orange on ``spill`` instead. If spilling is completely disabled, never turn orange. If pausing is disabled, change to red when passing the ``terminate`` threshold instead. If both pause and terminate are disabled, turn red when passing ``memory_limit``. Note ---- A worker will start spilling when managed memory alone passes the target threshold. However, here we're switching to orange when the process memory goes beyond target, which is usually earlier. This is deliberate for the sake of simplicity and also because, when the process memory passes the spill threshold, it will keep spilling until it falls below the target threshold - so it's not completely wrong. Again, we don't want to track the hysteresis cycle of the spill system here for the sake of simplicity. In short, orange should be treated as "the worker *may* be spilling". """ orange: float red: float def __init__(self): target = dask.config.get("distributed.worker.memory.target") spill = dask.config.get("distributed.worker.memory.spill") terminate = dask.config.get("distributed.worker.memory.terminate") # These values can be False. It's also common to configure them to impossibly # high values to achieve the same effect. self.orange = min(target or math.inf, spill or math.inf) self.red = min(terminate or math.inf, 1.0) def _memory_color(self, current: int, limit: int, status: Status) -> str: if status != Status.running: return "red" if not limit: return "blue" if current >= limit * self.red: return "red" if current >= limit * self.orange: return "orange" return "blue" class ClusterMemory(DashboardComponent, MemoryColor): """Total memory usage on the cluster""" @log_errors def __init__(self, scheduler, width=600, **kwargs): DashboardComponent.__init__(self) MemoryColor.__init__(self) self.scheduler = scheduler self.source = ColumnDataSource( { "width": [0] * 4, "x": [0] * 4, "y": [0] * 4, "color": ["blue", "blue", "blue", "grey"], "alpha": [1, 0.7, 0.4, 1], "proc_memory": [0] * 4, "managed": [0] * 4, "unmanaged_old": [0] * 4, "unmanaged_recent": [0] * 4, "spilled": [0] * 4, } ) self.root = figure( title="Bytes stored on cluster", tools="", width=int(width / 2), name="cluster_memory", min_border_bottom=50, **kwargs, ) rect = self.root.rect( source=self.source, x="x", y="y", width="width", height=0.9, color="color", alpha="alpha", ) rect.nonselection_glyph = None self.root.axis[0].ticker = BasicTicker(**TICKS_1024) self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.xaxis.minor_tick_line_alpha = 0 self.root.x_range = Range1d(start=0) self.root.yaxis.visible = False self.root.ygrid.visible = False self.root.toolbar_location = None self.root.yaxis.visible = False hover = HoverTool( point_policy="follow_mouse", tooltips="""
Process memory (RSS):  @proc_memory{0.00 b}
Managed:  @managed{0.00 b}
Unmanaged (old):  @unmanaged_old{0.00 b}
Unmanaged (recent):  @unmanaged_recent{0.00 b}
Spilled to disk:  @spilled{0.00 b}
""", ) self.root.add_tools(hover) def _cluster_memory_color(self) -> str: colors = { self._memory_color( current=ws.memory.process, limit=getattr(ws, "memory_limit", 0), status=ws.status, ) for ws in self.scheduler.workers.values() } assert colors.issubset({"red", "orange", "blue"}) if "red" in colors: return "red" elif "orange" in colors: return "orange" else: return "blue" @without_property_validation @log_errors def update(self): limit = sum(ws.memory_limit for ws in self.scheduler.workers.values()) meminfo = self.scheduler.memory color = self._cluster_memory_color() width = [ meminfo.managed, meminfo.unmanaged_old, meminfo.unmanaged_recent, meminfo.spilled, ] result = { "width": width, "x": [sum(width[:i]) + w / 2 for i, w in enumerate(width)], "color": [color, color, color, "grey"], "proc_memory": [meminfo.process] * 4, "managed": [meminfo.managed] * 4, "unmanaged_old": [meminfo.unmanaged_old] * 4, "unmanaged_recent": [meminfo.unmanaged_recent] * 4, "spilled": [meminfo.spilled] * 4, } x_end = max(limit, meminfo.process + meminfo.spilled) self.root.x_range.end = x_end title = f"Bytes stored: {format_bytes(meminfo.process)}" if meminfo.spilled: title += f" + {format_bytes(meminfo.spilled)} spilled to disk" self.root.title.text = title update(self.source, result) class WorkersMemory(DashboardComponent, MemoryColor): """Memory usage for single workers""" @log_errors def __init__(self, scheduler, width=600, **kwargs): DashboardComponent.__init__(self) MemoryColor.__init__(self) self.scheduler = scheduler self.source = ColumnDataSource( { "width": [], "x": [], "y": [], "color": [], "alpha": [], "worker": [], "escaped_worker": [], "proc_memory": [], "managed": [], "unmanaged_old": [], "unmanaged_recent": [], "spilled": [], } ) self.root = figure( title="Bytes stored per worker", tools="", width=int(width / 2), name="workers_memory", min_border_bottom=50, **kwargs, ) rect = self.root.rect( source=self.source, x="x", y="y", width="width", height=0.9, color="color", fill_alpha="alpha", line_width=0, ) rect.nonselection_glyph = None self.root.axis[0].ticker = BasicTicker(**TICKS_1024) self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.xaxis.minor_tick_line_alpha = 0 self.root.x_range = Range1d(start=0) self.root.yaxis.visible = False self.root.ygrid.visible = False self.root.toolbar_location = None tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) self.root.add_tools(tap) hover = HoverTool( point_policy="follow_mouse", tooltips="""
Worker:  @worker
Process memory (RSS):  @proc_memory{0.00 b}
Managed:  @managed{0.00 b}
Unmanaged (old):  @unmanaged_old{0.00 b}
Unmanaged (recent):  @unmanaged_recent{0.00 b}
Spilled to disk:  @spilled{0.00 b}
""", ) self.root.add_tools(hover) @without_property_validation @log_errors def update(self): def quadlist(i: Iterable[T]) -> list[T]: out = [] for ii in i: out += [ii, ii, ii, ii] return out workers = self.scheduler.workers.values() width = [] x = [] color = [] max_limit = 0 procmemory = [] managed = [] spilled = [] unmanaged_old = [] unmanaged_recent = [] for ws in workers: meminfo = ws.memory limit = getattr(ws, "memory_limit", 0) max_limit = max(max_limit, limit, meminfo.process + meminfo.spilled) color_i = self._memory_color(meminfo.process, limit, ws.status) width += [ meminfo.managed, meminfo.unmanaged_old, meminfo.unmanaged_recent, meminfo.spilled, ] x += [sum(width[-4:i]) + width[i] / 2 for i in range(-4, 0)] color += [color_i, color_i, color_i, "grey"] # memory info procmemory.append(meminfo.process) managed.append(meminfo.managed) unmanaged_old.append(meminfo.unmanaged_old) unmanaged_recent.append(meminfo.unmanaged_recent) spilled.append(meminfo.spilled) result = { "width": width, "x": x, "color": color, "alpha": [1, 0.7, 0.4, 1] * len(workers), "worker": quadlist(ws.address for ws in workers), "escaped_worker": quadlist(escape.url_escape(ws.address) for ws in workers), "y": quadlist(range(len(workers))), "proc_memory": quadlist(procmemory), "managed": quadlist(managed), "unmanaged_old": quadlist(unmanaged_old), "unmanaged_recent": quadlist(unmanaged_recent), "spilled": quadlist(spilled), } # Remove rectangles with width=0 result = {k: [vi for vi, w in zip(v, width) if w] for k, v in result.items()} self.root.x_range.end = max_limit update(self.source, result) class WorkersMemoryHistogram(DashboardComponent): """Histogram of memory usage, showing how many workers there are in each bucket of usage. Replaces the per-worker graph when there are >= 50 workers. """ @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( {"left": [1, 2], "right": [10, 10], "top": [0, 0]} ) self.root = figure( title="Bytes stored per worker", name="workers_memory", y_axis_label="frequency", tools="", **kwargs, ) self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024) self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.xaxis.minor_tick_line_alpha = 0 self.root.ygrid.visible = False self.root.toolbar_location = None self.root.quad( source=self.source, left="left", right="right", bottom=0, top="top", color="deepskyblue", fill_alpha=0.5, ) @without_property_validation def update(self): nbytes = np.asarray( [ws.metrics["memory"] for ws in self.scheduler.workers.values()] ) counts, x = np.histogram(nbytes, bins=40) d = {"left": x[:-1], "right": x[1:], "top": counts} update(self.source, d) class WorkersTransferBytes(DashboardComponent): """Size of open data transfers from/to other workers per worker""" @log_errors def __init__(self, scheduler, width=600, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "escaped_worker": [], "transfer_incoming_bytes": [], "transfer_outgoing_bytes": [], "worker": [], "y_incoming": [], "y_outgoing": [], } ) self.root = figure( title=f"Bytes transferring: {format_bytes(0)}", tools="", width=int(width / 2), name="workers_transfer_bytes", min_border_bottom=50, **kwargs, ) # transfer_incoming_bytes self.root.hbar( name="transfer_incoming_bytes", y="y_incoming", right="transfer_incoming_bytes", line_color=None, left=0, height=0.5, fill_color="red", source=self.source, ) # transfer_outgoing_bytes self.root.hbar( name="transfer_outgoing_bytes", y="y_outgoing", right="transfer_outgoing_bytes", line_color=None, left=0, height=0.5, fill_color="blue", source=self.source, ) self.root.axis[0].ticker = BasicTicker(**TICKS_1024) self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.xaxis.minor_tick_line_alpha = 0 self.root.x_range = Range1d(start=0) self.root.yaxis.visible = False self.root.ygrid.visible = False self.root.toolbar_location = None tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) hover = HoverTool( tooltips=[ ("Worker", "@worker"), ("Incoming", "@transfer_incoming_bytes{0.00 b}"), ("Outgoing", "@transfer_outgoing_bytes{0.00 b}"), ], point_policy="follow_mouse", ) self.root.add_tools(hover, tap) @without_property_validation @log_errors def update(self): wss = self.scheduler.workers.values() h = 0.1 y_incoming = [i + 0.75 + i * h for i in range(len(wss))] y_outgoing = [i + 0.25 + i * h for i in range(len(wss))] transfer_incoming_bytes = [ ws.metrics["transfer"]["incoming_bytes"] for ws in wss ] transfer_outgoing_bytes = [ ws.metrics["transfer"]["outgoing_bytes"] for ws in wss ] workers = [ws.address for ws in wss] escaped_workers = [escape.url_escape(worker) for worker in workers] if wss: x_limit = max( max(transfer_incoming_bytes), max(transfer_outgoing_bytes), max(ws.memory_limit for ws in wss), ) else: x_limit = 0 self.root.x_range.end = x_limit result = { "escaped_worker": escaped_workers, "transfer_incoming_bytes": transfer_incoming_bytes, "transfer_outgoing_bytes": transfer_outgoing_bytes, "worker": workers, "y_incoming": y_incoming, "y_outgoing": y_outgoing, } self.root.title.text = ( f"Bytes transferring: {format_bytes(sum(transfer_incoming_bytes))}" ) update(self.source, result) class Hardware(DashboardComponent): """Occupancy (in time) per worker""" @log_errors def __init__(self, scheduler, **kwargs): self.scheduler = scheduler # Disk self.disk_source = ColumnDataSource( { "size": [], "bandwidth": [], } ) self.disk_figure = figure( title="Disk Bandwidth -- Computing ...", tools="", toolbar_location="above", x_range=FactorRange(factors=[]), **kwargs, ) self.disk_figure.vbar( x="size", top="bandwidth", width=0.9, source=self.disk_source ) hover = HoverTool( mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] ) self.disk_figure.add_tools(hover) self.disk_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.disk_figure.xgrid.visible = False # Memory self.memory_source = ColumnDataSource( { "size": [], "bandwidth": [], } ) self.memory_figure = figure( title="Memory Bandwidth -- Computing ...", tools="", toolbar_location="above", x_range=FactorRange(factors=[]), **kwargs, ) self.memory_figure.vbar( x="size", top="bandwidth", width=0.9, source=self.memory_source ) hover = HoverTool( mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] ) self.memory_figure.add_tools(hover) self.memory_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.memory_figure.xgrid.visible = False # Network self.network_source = ColumnDataSource( { "size": [], "bandwidth": [], } ) self.network_figure = figure( title="Network Bandwidth -- Computing ...", tools="", toolbar_location="above", x_range=FactorRange(factors=[]), **kwargs, ) self.network_figure.vbar( x="size", top="bandwidth", width=0.9, source=self.network_source ) hover = HoverTool( mode="vline", tooltips=[("Bandwidth", "@bandwidth{0.00 b}/s")] ) self.network_figure.add_tools(hover) self.network_figure.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.network_figure.xgrid.visible = False self.root = row( self.memory_figure, self.disk_figure, self.network_figure, ) self.memory_data = { "size": [], "bandwidth": [], } self.disk_data = { "size": [], "bandwidth": [], } self.network_data = { "size": [], "bandwidth": [], } async def f(): result = await self.scheduler.benchmark_hardware() for size in sorted(result["disk"], key=parse_bytes): bandwidth = result["disk"][size] self.disk_data["size"].append(size) self.disk_data["bandwidth"].append(bandwidth) for size in sorted(result["memory"], key=parse_bytes): bandwidth = result["memory"][size] self.memory_data["size"].append(size) self.memory_data["bandwidth"].append(bandwidth) for size in sorted(result["network"], key=parse_bytes): bandwidth = result["network"][size] self.network_data["size"].append(size) self.network_data["bandwidth"].append(bandwidth) self.scheduler.loop.add_callback(f) def update(self): if ( not self.disk_data["size"] or self.disk_figure.title.text == "Disk Bandwidth" ): return self.network_figure.x_range.factors = self.network_data["size"] self.disk_figure.x_range.factors = self.disk_data["size"] self.memory_figure.x_range.factors = self.memory_data["size"] update(self.disk_source, self.disk_data) update(self.memory_source, self.memory_data) update(self.network_source, self.network_data) self.memory_figure.title.text = "Memory Bandwidth" self.disk_figure.title.text = "Disk Bandwidth" self.network_figure.title.text = "Network Bandwidth" class BandwidthTypes(DashboardComponent): """Bar chart showing bandwidth per type""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( { "bandwidth": [1, 2], "bandwidth-half": [0.5, 1], "type": ["a", "b"], "bandwidth_text": ["1", "2"], } ) self.root = figure( title="Bandwidth by Type", tools="", name="bandwidth_type_histogram", y_range=["a", "b"], **kwargs, ) self.root.xaxis.major_label_orientation = -0.5 rect = self.root.rect( source=self.source, x="bandwidth-half", y="type", width="bandwidth", height=0.9, color="blue", ) self.root.x_range.start = 0 self.root.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.xaxis.ticker = AdaptiveTicker(**TICKS_1024) rect.nonselection_glyph = None self.root.xaxis.minor_tick_line_alpha = 0 self.root.ygrid.visible = False self.root.toolbar_location = None hover = HoverTool() hover.tooltips = "@type: @bandwidth_text / s" hover.point_policy = "follow_mouse" self.root.add_tools(hover) @without_property_validation @log_errors def update(self): bw = self.scheduler.bandwidth_types self.root.y_range.factors = list(sorted(bw)) result = { "bandwidth": list(bw.values()), "bandwidth-half": [b / 2 for b in bw.values()], "type": list(bw.keys()), "bandwidth_text": [format_bytes(x) for x in bw.values()], } self.root.title.text = "Bandwidth: " + format_bytes(self.scheduler.bandwidth) update(self.source, result) class BandwidthWorkers(DashboardComponent): """How many tasks are on each worker""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( { "bandwidth": [1, 2], "source": ["a", "b"], "destination": ["a", "b"], "bandwidth_text": ["1", "2"], } ) values = [hex(x)[2:] for x in range(64, 256)][::-1] mapper = linear_cmap( field_name="bandwidth", palette=["#" + x + x + "FF" for x in values], low=0, high=1, ) self.root = figure( title="Bandwidth by Worker", tools="", name="bandwidth_worker_heatmap", x_range=["a", "b"], y_range=["a", "b"], **kwargs, ) self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.rect( source=self.source, x="source", y="destination", color=mapper, height=1, width=1, ) self.color_map = mapper["transform"] color_bar = ColorBar( color_mapper=self.color_map, label_standoff=12, border_line_color=None, location=(0, 0), ) color_bar.formatter = NumeralTickFormatter(format="0.0 b") color_bar.ticker = AdaptiveTicker(**TICKS_1024) self.root.add_layout(color_bar, "right") self.root.toolbar_location = None hover = HoverTool() hover.tooltips = """

Source: @source

Destination: @destination

Bandwidth: @bandwidth_text / s

""" hover.point_policy = "follow_mouse" self.root.add_tools(hover) @without_property_validation @log_errors def update(self): bw = self.scheduler.bandwidth_workers if not bw: return def name(address): try: ws = self.scheduler.workers[address] except KeyError: return address if ws.name is not None: return str(ws.name) return address x, y, value = zip(*((name(a), name(b), c) for (a, b), c in bw.items())) self.color_map.high = max(value) factors = list(sorted(set(x + y))) self.root.x_range.factors = factors self.root.y_range.factors = factors[::-1] result = { "source": x, "destination": y, "bandwidth": value, "bandwidth_text": list(map(format_bytes, value)), } self.root.title.text = "Bandwidth: " + format_bytes(self.scheduler.bandwidth) update(self.source, result) class WorkerNetworkBandwidth(DashboardComponent): """Worker network bandwidth chart Plots horizontal bars with the host_net_io.read_bps and host_net_io.write_bps worker state """ @log_errors def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "y_read": [], "y_write": [], "x_read": [], "x_write": [], "x_read_disk": [], "x_write_disk": [], } ) self.bandwidth = figure( title="Worker Network Bandwidth", tools="", name="worker_network_bandwidth", **kwargs, ) # host_net_io.read_bps self.bandwidth.hbar( y="y_read", right="x_read", line_color=None, left=0, height=0.5, fill_color="red", legend_label="read", source=self.source, ) # host_net_io.write_bps self.bandwidth.hbar( y="y_write", right="x_write", line_color=None, left=0, height=0.5, fill_color="blue", legend_label="write", source=self.source, ) self.bandwidth.axis[0].ticker = BasicTicker(**TICKS_1024) self.bandwidth.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.bandwidth.xaxis.major_label_orientation = XLABEL_ORIENTATION self.bandwidth.xaxis.minor_tick_line_alpha = 0 self.bandwidth.x_range = Range1d(start=0) self.bandwidth.yaxis.visible = False self.bandwidth.ygrid.visible = False self.bandwidth.toolbar_location = None self.disk = figure( title="Workers Disk", tools="", name="worker_disk", **kwargs, ) # host_disk_io.read_bps self.disk.hbar( y="y_read", right="x_read_disk", line_color=None, left=0, height=0.5, fill_color="red", legend_label="read", source=self.source, ) # host_disk_io.write_bps self.disk.hbar( y="y_write", right="x_write_disk", line_color=None, left=0, height=0.5, fill_color="blue", legend_label="write", source=self.source, ) self.disk.axis[0].ticker = BasicTicker(**TICKS_1024) self.disk.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.disk.xaxis.major_label_orientation = XLABEL_ORIENTATION self.disk.xaxis.minor_tick_line_alpha = 0 self.disk.x_range = Range1d(start=0) self.disk.yaxis.visible = False self.disk.ygrid.visible = False self.disk.toolbar_location = None @without_property_validation @log_errors def update(self): workers = self.scheduler.workers.values() h = 0.1 y_read = [i + 0.75 + i * h for i in range(len(workers))] y_write = [i + 0.25 + i * h for i in range(len(workers))] x_read = [] x_write = [] x_read_disk = [] x_write_disk = [] for ws in workers: x_read.append(ws.metrics["host_net_io"]["read_bps"]) x_write.append(ws.metrics["host_net_io"]["write_bps"]) x_read_disk.append(ws.metrics.get("host_disk_io", {}).get("read_bps", 0)) x_write_disk.append(ws.metrics.get("host_disk_io", {}).get("write_bps", 0)) if self.scheduler.workers: self.bandwidth.x_range.end = max( max(x_read), max(x_write), 100_000_000, 0.95 * self.bandwidth.x_range.end, ) self.disk.x_range.end = max( max(x_read_disk), max(x_write_disk), 100_000_000, 0.95 * self.disk.x_range.end, ) else: self.bandwidth.x_range.end = 100_000_000 self.disk.x_range.end = 100_000_000 result = { "y_read": y_read, "y_write": y_write, "x_read": x_read, "x_write": x_write, "x_read_disk": x_read_disk, "x_write_disk": x_write_disk, } update(self.source, result) class SystemTimeseries(DashboardComponent): """Timeseries for worker network bandwidth, cpu, memory and disk. bandwidth Plots the average of host_net_io.read_bps and host_net_io.write_bps for the workers as a function of time cpu Plots the average of cpu for the workers as a function of time memory Plots the average of memory for the workers as a function of time disk Plots the average of host_disk_io.read_bps and host_disk_io.write_bps for the workers as a function of time The metrics plotted come from the aggregation of from ws.metrics[key] for ws in scheduler.workers.values() divided by number of workers. """ @log_errors def __init__(self, scheduler, follow_interval=20000, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "time": [], "host_net_io.read_bps": [], "host_net_io.write_bps": [], "cpu": [], "memory": [], "host_disk_io.read_bps": [], "host_disk_io.write_bps": [], } ) update(self.source, self.get_data()) x_range = DataRange1d( follow="end", follow_interval=follow_interval, range_padding=0 ) tools = "reset, xpan, xwheel_zoom" self.bandwidth = figure( title="Worker Network Bandwidth (average)", x_axis_type="datetime", tools=tools, x_range=x_range, name="worker_network_bandwidth-timeseries", **kwargs, ) self.bandwidth.line( source=self.source, x="time", y="host_net_io.read_bps", color="red", legend_label="read (mean)", ) self.bandwidth.line( source=self.source, x="time", y="host_net_io.write_bps", color="blue", legend_label="write (mean)", ) self.bandwidth.legend.location = "top_left" self.bandwidth.yaxis.axis_label = "bytes / second" self.bandwidth.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") self.bandwidth.y_range.start = 0 self.bandwidth.yaxis.minor_tick_line_alpha = 0 self.bandwidth.xgrid.visible = False self.cpu = figure( title="Worker CPU Utilization (average)", x_axis_type="datetime", tools=tools, x_range=x_range, name="worker_cpu-timeseries", **kwargs, ) self.cpu.line( source=self.source, x="time", y="cpu", ) self.cpu.yaxis.axis_label = "Utilization" self.cpu.y_range.start = 0 self.cpu.yaxis.minor_tick_line_alpha = 0 self.cpu.xgrid.visible = False self.memory = figure( title="Worker Memory Use (average)", x_axis_type="datetime", tools=tools, x_range=x_range, name="worker_memory-timeseries", **kwargs, ) self.memory.line( source=self.source, x="time", y="memory", ) self.memory.yaxis.axis_label = "Bytes" self.memory.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") self.memory.y_range.start = 0 self.memory.yaxis.minor_tick_line_alpha = 0 self.memory.xgrid.visible = False self.disk = figure( title="Worker Disk Bandwidth (average)", x_axis_type="datetime", tools=tools, x_range=x_range, name="worker_disk-timeseries", **kwargs, ) self.disk.line( source=self.source, x="time", y="host_disk_io.read_bps", color="red", legend_label="read (mean)", ) self.disk.line( source=self.source, x="time", y="host_disk_io.write_bps", color="blue", legend_label="write (mean)", ) self.disk.legend.location = "top_left" self.disk.yaxis.axis_label = "bytes / second" self.disk.yaxis[0].formatter = NumeralTickFormatter(format="0.0b") self.disk.y_range.start = 0 self.disk.yaxis.minor_tick_line_alpha = 0 self.disk.xgrid.visible = False def get_data(self): workers = self.scheduler.workers.values() net_read_bps = 0 net_write_bps = 0 cpu = 0 memory = 0 disk_read_bps = 0 disk_write_bps = 0 time = 0 for ws in workers: net_read_bps += ws.metrics["host_net_io"]["read_bps"] net_write_bps += ws.metrics["host_net_io"]["write_bps"] cpu += ws.metrics["cpu"] memory += ws.metrics["memory"] disk_read_bps += ws.metrics.get("host_disk_io", {}).get("read_bps", 0) disk_write_bps += ws.metrics.get("host_disk_io", {}).get("write_bps", 0) time += ws.metrics["time"] result = { # use `or` to avoid ZeroDivision when no workers "time": [time / (len(workers) or 1) * 1000], "host_net_io.read_bps": [net_read_bps / (len(workers) or 1)], "host_net_io.write_bps": [net_write_bps / (len(workers) or 1)], "cpu": [cpu / (len(workers) or 1)], "memory": [memory / (len(workers) or 1)], "host_disk_io.read_bps": [disk_read_bps / (len(workers) or 1)], "host_disk_io.write_bps": [disk_write_bps / (len(workers) or 1)], } return result @without_property_validation @log_errors def update(self): self.source.stream(self.get_data(), 1000) if self.scheduler.workers: y_end_cpu = sum( ws.nthreads or 1 for ws in self.scheduler.workers.values() ) / len(self.scheduler.workers.values()) y_end_mem = sum( ws.memory_limit for ws in self.scheduler.workers.values() ) / len(self.scheduler.workers.values()) else: y_end_cpu = 1 y_end_mem = 100_000_000 self.cpu.y_range.end = y_end_cpu * 100 self.memory.y_range.end = y_end_mem class ComputePerKey(DashboardComponent): """Bar chart showing time spend in action by key prefix""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler if TaskStreamPlugin.name not in self.scheduler.plugins: self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) compute_data = { "times": [0.2, 0.1], "formatted_time": ["0.2 ms", "2.8 us"], "angles": [3.14, 0.785], "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], "names": ["sum", "sum_partial"], } self.compute_source = ColumnDataSource(data=compute_data) fig = figure( title="Compute Time Per Task", tools="", name="compute_time_per_key", x_range=["a", "b"], **kwargs, ) rect = fig.vbar( source=self.compute_source, x="names", top="times", width=0.7, color="color", ) fig.y_range.start = 0 fig.yaxis.axis_label = "Time (s)" fig.yaxis[0].formatter = NumeralTickFormatter(format="0") fig.yaxis.ticker = AdaptiveTicker(**TICKS_1024) fig.xaxis.major_label_orientation = XLABEL_ORIENTATION rect.nonselection_glyph = None fig.xaxis.minor_tick_line_alpha = 0 fig.xgrid.visible = False fig.toolbar_location = None hover = HoverTool() hover.tooltips = """

Name: @names

Time: @formatted_time

""" hover.point_policy = "follow_mouse" fig.add_tools(hover) fig.add_layout( Title( text="Note: tasks less than 2% of max are not displayed", text_font_style="italic", ), "below", ) self.fig = fig tab1 = TabPanel(child=fig, title="Bar Chart") fig2 = figure( title="Compute Time Per Task", tools="", name="compute_time_per_key-pie", x_range=(-0.5, 1.0), **kwargs, ) fig2.wedge( x=0, y=1, radius=0.4, start_angle=cumsum("angles", include_zero=True), end_angle=cumsum("angles"), line_color="white", fill_color="color", legend_field="names", source=self.compute_source, ) fig2.axis.axis_label = None fig2.axis.visible = False fig2.grid.grid_line_color = None fig2.add_layout( Title( text="Note: tasks less than 2% of max are not displayed", text_font_style="italic", ), "below", ) hover = HoverTool() hover.tooltips = """

Name: @names

Time: @formatted_time

""" hover.point_policy = "follow_mouse" fig2.add_tools(hover) self.wedge_fig = fig2 tab2 = TabPanel(child=fig2, title="Pie Chart") self.root = Tabs(tabs=[tab1, tab2], sizing_mode="stretch_both") @without_property_validation @log_errors def update(self): compute_times = defaultdict(float) for key, ts in self.scheduler.task_prefixes.items(): name = key_split(key) for action, t in ts.all_durations.items(): if action == "compute": compute_times[name] += t # order by largest time first compute_times = sorted(compute_times.items(), key=lambda x: x[1], reverse=True) # keep only time which are 2% of max or greater if compute_times: max_time = compute_times[0][1] * 0.02 compute_times = [(n, t) for n, t in compute_times if t > max_time] compute_colors = list() compute_names = list() compute_time = list() total_time = 0 for name, t in compute_times: compute_names.append(name) compute_colors.append(ts_color_of(name)) compute_time.append(t) total_time += t angles = [t / total_time * 2 * math.pi for t in compute_time] self.fig.x_range.factors = compute_names compute_result = dict( angles=angles, times=compute_time, color=compute_colors, names=compute_names, formatted_time=[format_time(t) for t in compute_time], ) update(self.compute_source, compute_result) class AggregateAction(DashboardComponent): """Bar chart showing time spend in action by key prefix""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler if TaskStreamPlugin.name not in self.scheduler.plugins: self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) action_data = { "times": [0.2, 0.1], "formatted_time": ["0.2 ms", "2.8 us"], "color": [ts_color_lookup["transfer"], ts_color_lookup["compute"]], "names": ["transfer", "compute"], } self.action_source = ColumnDataSource(data=action_data) self.root = figure( title="Aggregate Per Action", tools="", name="aggregate_per_action", x_range=["a", "b"], **kwargs, ) rect = self.root.vbar( source=self.action_source, x="names", top="times", width=0.7, color="color", ) self.root.y_range.start = 0 self.root.yaxis[0].formatter = NumeralTickFormatter(format="0") self.root.yaxis.axis_label = "Time (s)" self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024) self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION self.root.xaxis.major_label_text_font_size = "16px" rect.nonselection_glyph = None self.root.xaxis.minor_tick_line_alpha = 0 self.root.xgrid.visible = False self.root.toolbar_location = None hover = HoverTool() hover.tooltips = """

Name: @names

Time: @formatted_time

""" hover.point_policy = "follow_mouse" self.root.add_tools(hover) @without_property_validation @log_errors def update(self): agg_times = defaultdict(float) for ts in self.scheduler.task_prefixes.values(): for action, t in ts.all_durations.items(): agg_times[action] += t # order by largest time first agg_times = sorted(agg_times.items(), key=lambda x: x[1], reverse=True) agg_colors = list() agg_names = list() agg_time = list() for action, t in agg_times: agg_names.append(action) if action == "compute": agg_colors.append("purple") else: agg_colors.append(ts_color_lookup[action]) agg_time.append(t) self.root.x_range.factors = agg_names self.root.title.text = "Aggregate Time Per Action" action_result = dict( times=agg_time, color=agg_colors, names=agg_names, formatted_time=[format_time(t) for t in agg_time], ) update(self.action_source, action_result) class MemoryByKey(DashboardComponent): """Bar chart showing memory use by key prefix""" @log_errors def __init__(self, scheduler, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( { "name": ["a", "b"], "nbytes": [100, 1000], "count": [1, 2], "color": ["blue", "blue"], } ) self.root = figure( title="Memory Use", tools="", name="memory_by_key", x_range=["a", "b"], **kwargs, ) rect = self.root.vbar( source=self.source, x="name", top="nbytes", width=0.9, color="color" ) self.root.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.root.yaxis.ticker = AdaptiveTicker(**TICKS_1024) self.root.xaxis.major_label_orientation = XLABEL_ORIENTATION rect.nonselection_glyph = None self.root.xaxis.minor_tick_line_alpha = 0 self.root.ygrid.visible = False self.root.toolbar_location = None hover = HoverTool() hover.tooltips = "@name: @nbytes_text" hover.tooltips = """

Name: @name

Bytes: @nbytes_text

Count: @count objects

""" hover.point_policy = "follow_mouse" self.root.add_tools(hover) @without_property_validation @log_errors def update(self): counts = defaultdict(int) nbytes = defaultdict(int) for ws in self.scheduler.workers.values(): for ts in ws.has_what: ks = key_split(ts.key) counts[ks] += 1 nbytes[ks] += ts.nbytes names = list(sorted(counts)) self.root.x_range.factors = names result = { "name": names, "count": [counts[name] for name in names], "nbytes": [nbytes[name] for name in names], "nbytes_text": [format_bytes(nbytes[name]) for name in names], "color": [color_of(name) for name in names], } self.root.title.text = "Total Use: " + format_bytes(sum(nbytes.values())) update(self.source, result) class CurrentLoad(DashboardComponent): """Tasks and CPU usage on each worker""" @log_errors def __init__(self, scheduler, width=600, **kwargs): self.last = 0 self.scheduler = scheduler self.source = ColumnDataSource( { "nprocessing": [], "nprocessing-half": [], "nprocessing-color": [], "cpu": [], "cpu-half": [], "y": [], "worker": [], "escaped_worker": [], } ) processing = figure( title="Tasks Processing", tools="", name="processing", width=int(width / 2), min_border_bottom=50, **kwargs, ) rect = processing.rect( source=self.source, x="nprocessing-half", y="y", width="nprocessing", height=0.9, color="nprocessing-color", ) processing.x_range.start = 0 rect.nonselection_glyph = None cpu = figure( title="CPU Utilization", tools="", width=int(width / 2), name="cpu_hist", x_range=(0, 100), min_border_bottom=50, **kwargs, ) rect = cpu.rect( source=self.source, x="cpu-half", y="y", width="cpu", height=0.9, color="blue", ) rect.nonselection_glyph = None for fig in (processing, cpu): fig.xaxis.minor_tick_line_alpha = 0 fig.yaxis.visible = False fig.ygrid.visible = False tap = TapTool(callback=OpenURL(url="./info/worker/@escaped_worker.html")) fig.add_tools(tap) fig.toolbar_location = None fig.yaxis.visible = False hover = HoverTool() hover.tooltips = "@worker : @nprocessing tasks" hover.point_policy = "follow_mouse" processing.add_tools(hover) hover = HoverTool() hover.tooltips = "@worker : @cpu %" hover.point_policy = "follow_mouse" cpu.add_tools(hover) self.processing_figure = processing self.cpu_figure = cpu @without_property_validation @log_errors def update(self): workers = self.scheduler.workers.values() now = time() if not any(ws.processing for ws in workers) and now < self.last + 1: return self.last = now cpu = [int(ws.metrics["cpu"]) for ws in workers] nprocessing = [len(ws.processing) for ws in workers] nprocessing_color = [] for ws in workers: if ws in self.scheduler.idle: nprocessing_color.append("red") elif ws in self.scheduler.saturated: nprocessing_color.append("green") else: nprocessing_color.append("blue") result = { "cpu": cpu, "cpu-half": [c / 2 for c in cpu], "nprocessing": nprocessing, "nprocessing-half": [np / 2 for np in nprocessing], "nprocessing-color": nprocessing_color, "worker": [ws.address for ws in workers], "escaped_worker": [escape.url_escape(ws.address) for ws in workers], "y": list(range(len(workers))), } if self.scheduler.workers: xrange = max(ws.nthreads or 1 for ws in workers) else: xrange = 1 self.cpu_figure.x_range.end = xrange * 100 update(self.source, result) class StealingTimeSeries(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "time": [time() * 1000, time() * 1000 + 1], "idle": [0, 0], "saturated": [0, 0], } ) x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) self.root = figure( title="Idle and Saturated Workers Over Time", x_axis_type="datetime", tools="", x_range=x_range, **kwargs, ) self.root.line(source=self.source, x="time", y="idle", color="red") self.root.line(source=self.source, x="time", y="saturated", color="green") self.root.yaxis.minor_tick_line_color = None self.root.add_tools( ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width") ) @without_property_validation @log_errors def update(self): result = { "time": [time() * 1000], "idle": [len(self.scheduler.idle)], "saturated": [len(self.scheduler.saturated)], } if PROFILING: curdoc().add_next_tick_callback(lambda: self.source.stream(result, 10000)) else: self.source.stream(result, 10000) class StealingEvents(DashboardComponent): def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.steal = scheduler.extensions["stealing"] self.last = 0 self.source = ColumnDataSource( { "time": [time() - 60, time()], "level": [0, 15], "color": ["white", "white"], "duration": [0, 0], "radius": [1, 1], "cost_factor": [0, 10], "count": [1, 1], } ) x_range = DataRange1d(follow="end", follow_interval=20000, range_padding=0) self.root = figure( title="Stealing Events", x_axis_type="datetime", tools="", x_range=x_range, **kwargs, ) self.root.circle( source=self.source, x="time", y="level", color="color", size="radius", alpha=0.5, ) self.root.yaxis.axis_label = "Level" hover = HoverTool() hover.tooltips = "Level: @level, Duration: @duration, Count: @count, Cost factor: @cost_factor" hover.point_policy = "follow_mouse" self.root.add_tools( hover, ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width"), ) def convert(self, msgs): """Convert a log message to a glyph""" total_duration = 0 for msg in msgs: time, level, key, duration, sat, occ_sat, idl, occ_idl = msg[:8] total_duration += duration try: color = Viridis11[level] except (KeyError, IndexError): color = "black" radius = math.sqrt(min(total_duration, 10)) * 30 + 2 d = { "time": time * 1000, "level": level, "count": len(msgs), "color": color, "duration": total_duration, "radius": radius, "cost_factor": self.steal.cost_multipliers[level], } return d @without_property_validation @log_errors def update(self): log = self.scheduler.get_events(topic="stealing") current = len(self.scheduler.events["stealing"]) n = current - self.last log = [log[-i][1][1] for i in range(1, n + 1) if log[-i][1][0] == "request"] self.last = current if log: new = pipe( log, map(groupby(1)), map(dict.values), concat, map(self.convert), list, transpose, ) if PROFILING: curdoc().add_next_tick_callback(lambda: self.source.stream(new, 10000)) else: self.source.stream(new, 10000) class Events(DashboardComponent): def __init__(self, scheduler, name, height=150, **kwargs): self.scheduler = scheduler self.action_ys = dict() self.last = 0 self.name = name self.source = ColumnDataSource( {"time": [], "action": [], "hover": [], "y": [], "color": []} ) x_range = DataRange1d(follow="end", follow_interval=200000) self.root = figure( title=name, x_axis_type="datetime", height=height, tools="", x_range=x_range, **kwargs, ) self.root.circle( source=self.source, x="time", y="y", color="color", size=50, alpha=0.5, legend_field="action", ) self.root.yaxis.axis_label = "Action" self.root.legend.location = "top_left" hover = HoverTool() hover.tooltips = "@action
@hover" hover.point_policy = "follow_mouse" self.root.add_tools( hover, ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width"), ) @without_property_validation @log_errors def update(self): log = self.scheduler.events[self.name] n = self.scheduler.event_counts[self.name] - self.last if log: log = [log[-i] for i in range(1, n + 1)] self.last = self.scheduler.event_counts[self.name] if log: actions = [] times = [] hovers = [] ys = [] colors = [] for msg_time, msg in log: times.append(msg_time * 1000) action = msg["action"] actions.append(action) try: ys.append(self.action_ys[action]) except KeyError: self.action_ys[action] = len(self.action_ys) ys.append(self.action_ys[action]) colors.append(color_of(action)) hovers.append("TODO") new = { "time": times, "action": actions, "hover": hovers, "y": ys, "color": colors, } if PROFILING: curdoc().add_next_tick_callback(lambda: self.source.stream(new, 10000)) else: self.source.stream(new, 10000) class TaskStream(DashboardComponent): def __init__(self, scheduler, n_rectangles=1000, clear_interval="20s", **kwargs): self.scheduler = scheduler self.offset = 0 if TaskStreamPlugin.name not in self.scheduler.plugins: self.scheduler.add_plugin(TaskStreamPlugin(self.scheduler)) self.plugin = self.scheduler.plugins[TaskStreamPlugin.name] self.index = max(0, self.plugin.index - n_rectangles) self.workers = dict() self.n_rectangles = n_rectangles clear_interval = parse_timedelta(clear_interval, default="ms") self.clear_interval = clear_interval self.last = 0 self.last_seen = 0 self.source, self.root = task_stream_figure(clear_interval, **kwargs) # Required for update callback self.task_stream_index = [0] @without_property_validation @log_errors def update(self): if self.index == self.plugin.index: return if self.index and len(self.source.data["start"]): start = min(self.source.data["start"]) duration = max(self.source.data["duration"]) boundary = (self.offset + start - duration) / 1000 else: boundary = self.offset rectangles = self.plugin.rectangles( istart=self.index, workers=self.workers, start_boundary=boundary ) n = len(rectangles["name"]) self.index = self.plugin.index if not rectangles["start"]: return # If it has been a while since we've updated the plot if time() > self.last_seen + self.clear_interval: new_start = min(rectangles["start"]) - self.offset old_start = min(self.source.data["start"]) old_end = max( map( operator.add, self.source.data["start"], self.source.data["duration"], ) ) density = ( sum(self.source.data["duration"]) / len(self.workers) / (old_end - old_start) ) # If whitespace is more than 3x the old width if (new_start - old_end) > (old_end - old_start) * 2 or density < 0.05: self.source.data.update({k: [] for k in rectangles}) # clear self.offset = min(rectangles["start"]) # redefine offset rectangles["start"] = [x - self.offset for x in rectangles["start"]] self.last_seen = time() # Convert to numpy for serialization speed if n >= 10 and np: for k, v in rectangles.items(): if isinstance(v[0], Number): rectangles[k] = np.array(v) if PROFILING: curdoc().add_next_tick_callback( lambda: self.source.stream(rectangles, self.n_rectangles) ) else: self.source.stream(rectangles, self.n_rectangles) def task_stream_figure(clear_interval="20s", **kwargs): """ kwargs are applied to the bokeh.models.plots.Plot constructor """ clear_interval = parse_timedelta(clear_interval, default="ms") source = ColumnDataSource( data=dict( start=[time() - clear_interval], duration=[0.1], key=["start"], name=["start"], color=["white"], duration_text=["100 ms"], worker=["foo"], y=[0], worker_thread=[1], alpha=[0.0], ) ) x_range = DataRange1d(range_padding=0) y_range = DataRange1d(range_padding=0) root = figure( name="task_stream", title="Task Stream", x_range=x_range, y_range=y_range, toolbar_location="above", x_axis_type="datetime", y_axis_location=None, tools="", min_border_bottom=50, **kwargs, ) rect = root.rect( source=source, x="start", y="y", width="duration", height=0.4, fill_color="color", line_color="color", line_alpha=0.6, fill_alpha="alpha", line_width=3, ) rect.nonselection_glyph = None root.yaxis.major_label_text_alpha = 0 root.yaxis.minor_tick_line_alpha = 0 root.yaxis.major_tick_line_alpha = 0 root.xgrid.visible = False hover = HoverTool( point_policy="follow_mouse", tooltips="""
@name:  @duration_text
""", ) tap = TapTool(callback=OpenURL(url="./profile?key=@name")) root.add_tools( hover, tap, BoxZoomTool(), ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width"), ) if ExportTool: # type: ignore export = ExportTool() export.register_plot(root) root.add_tools(export) return source, root class TaskGraph(DashboardComponent): """ A dynamic node-link diagram for the task graph on the scheduler See also the GraphLayout diagnostic at distributed/diagnostics/graph_layout.py """ def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.layout = GraphLayout(scheduler) scheduler.add_plugin(self.layout) self.invisible_count = 0 # number of invisible nodes self.node_source = ColumnDataSource( {"x": [], "y": [], "name": [], "state": [], "visible": [], "key": []} ) self.edge_source = ColumnDataSource({"x": [], "y": [], "visible": []}) filter = GroupFilter(column_name="visible", group="True") if BOKEH_VERSION.major < 3: filter_kwargs = {"filters": [filter]} else: filter_kwargs = {"filter": filter} node_view = CDSView(**filter_kwargs) edge_view = CDSView(**filter_kwargs) # Bokeh >= 3.0 automatically infers the source to use if BOKEH_VERSION.major < 3: node_view.source = self.node_source edge_view.source = self.edge_source node_colors = factor_cmap( "state", factors=["waiting", "queued", "processing", "memory", "released", "erred"], palette=["gray", "yellow", "green", "red", "blue", "black"], ) self.root = figure(title="Task Graph", **kwargs) self.subtitle = Title(text=" ", text_font_style="italic") self.root.add_layout(self.subtitle, "above") self.root.multi_line( xs="x", ys="y", source=self.edge_source, line_width=1, view=edge_view, color="black", alpha=0.3, ) rect = self.root.square( x="x", y="y", size=10, color=node_colors, source=self.node_source, view=node_view, legend_field="state", ) self.root.xgrid.grid_line_color = None self.root.ygrid.grid_line_color = None self.root.xaxis.visible = False self.root.yaxis.visible = False hover = HoverTool( point_policy="follow_mouse", tooltips="@name: @state", renderers=[rect], ) tap = TapTool(callback=OpenURL(url="info/task/@key.html"), renderers=[rect]) rect.nonselection_glyph = None self.root.add_tools(hover, tap) self.max_items = config.get("distributed.dashboard.graph-max-items", 5000) @without_property_validation @log_errors def update(self): # If there are too many tasks in the scheduler we'll disable this # compoonents to not overload scheduler or client. Once we drop # below the threshold, the data is filled up again as usual if len(self.scheduler.tasks) > self.max_items: self.subtitle.text = "Scheduler has too many tasks to display." for container in [self.node_source, self.edge_source]: container.data = {col: [] for col in container.column_names} else: # occasionally reset the column data source to remove old nodes if self.invisible_count > len(self.node_source.data["x"]) / 2: self.layout.reset_index() self.invisible_count = 0 update = True else: update = False new, self.layout.new = self.layout.new, [] new_edges = self.layout.new_edges self.layout.new_edges = [] self.add_new_nodes_edges(new, new_edges, update=update) self.patch_updates() if len(self.scheduler.tasks) == 0: self.subtitle.text = "Scheduler is empty." else: self.subtitle.text = " " @without_property_validation def add_new_nodes_edges(self, new, new_edges, update=False): if new or update: node_key = [] node_x = [] node_y = [] node_state = [] node_name = [] edge_x = [] edge_y = [] x = self.layout.x y = self.layout.y tasks = self.scheduler.tasks for key in new: try: task = tasks[key] except KeyError: continue xx = x[key] yy = y[key] node_key.append(escape.url_escape(key)) node_x.append(xx) node_y.append(yy) node_state.append(task.state) node_name.append(task.prefix.name) for a, b in new_edges: try: edge_x.append([x[a], x[b]]) edge_y.append([y[a], y[b]]) except KeyError: pass node = { "x": node_x, "y": node_y, "state": node_state, "name": node_name, "key": node_key, "visible": ["True"] * len(node_x), } edge = {"x": edge_x, "y": edge_y, "visible": ["True"] * len(edge_x)} if update or not len(self.node_source.data["x"]): # see https://github.com/bokeh/bokeh/issues/7523 self.node_source.data.update(node) self.edge_source.data.update(edge) else: self.node_source.stream(node) self.edge_source.stream(edge) @without_property_validation def patch_updates(self): """ Small updates like color changes or lost nodes from task transitions """ n = len(self.node_source.data["x"]) m = len(self.edge_source.data["x"]) if self.layout.state_updates: state_updates = self.layout.state_updates self.layout.state_updates = [] updates = [(i, c) for i, c in state_updates if i < n] self.node_source.patch({"state": updates}) if self.layout.visible_updates: updates = self.layout.visible_updates updates = [(i, c) for i, c in updates if i < n] self.layout.visible_updates = [] self.node_source.patch({"visible": updates}) self.invisible_count += len(updates) if self.layout.visible_edge_updates: updates = self.layout.visible_edge_updates updates = [(i, c) for i, c in updates if i < m] self.layout.visible_edge_updates = [] self.edge_source.patch({"visible": updates}) def __del__(self): self.scheduler.remove_plugin(name=self.layout.name) class TaskGroupGraph(DashboardComponent): """ Task Group Graph Creates a graph layout for TaskGroups on the scheduler. It assigns (x, y) locations to all the TaskGroups and lays them out by according to their dependencies. The layout gets updated every time that new TaskGroups are added. Each task group node incodes information about task progress, memory, and output type into glyphs, as well as a hover tooltip with more detailed information on name, computation time, memory, and tasks status. """ def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.nodes_layout = {} self.arrows_layout = {} self.old_counter = -1 self.nodes_source = ColumnDataSource( { "x": [], "y": [], "w_box": [], "h_box": [], "name": [], "tot_tasks": [], "color": [], "x_start": [], "x_end": [], "y_start": [], "y_end": [], "x_end_progress": [], "mem_alpha": [], "node_line_width": [], "comp_tasks": [], "url_logo": [], "x_logo": [], "y_logo": [], "w_logo": [], "h_logo": [], "in_processing": [], "in_memory": [], "in_released": [], "in_erred": [], "compute_time": [], "memory": [], } ) self.arrows_source = ColumnDataSource({"xs": [], "ys": [], "xe": [], "ye": []}) self.root = figure(title="Task Groups Graph", match_aspect=True, **kwargs) self.root.axis.visible = False self.subtitle = Title(text=" ", text_font_style="italic") self.root.add_layout(self.subtitle, "above") rect = self.root.rect( x="x", y="y", width="w_box", height="h_box", color="color", fill_alpha="mem_alpha", line_color="black", line_width="node_line_width", source=self.nodes_source, ) # plot tg log self.root.image_url( url="url_logo", x="x_logo", y="y_logo", w="w_logo", h="h_logo", anchor="center", source=self.nodes_source, ) # progress bar plain box self.root.quad( left="x_start", right="x_end", bottom="y_start", top="y_end", color=None, line_color="black", source=self.nodes_source, ) # progress bar self.root.quad( left="x_start", right="x_end_progress", bottom="y_start", top="y_end", color="color", line_color=None, fill_alpha=0.6, source=self.nodes_source, ) self.arrows = Arrow( end=VeeHead(size=8), line_color="black", line_alpha=0.5, line_width=1, x_start="xs", y_start="ys", x_end="xe", y_end="ye", source=self.arrows_source, ) self.root.add_layout(self.arrows) self.root.xgrid.grid_line_color = None self.root.ygrid.grid_line_color = None self.root.x_range.range_padding = 0.5 self.root.y_range.range_padding = 0.5 hover = HoverTool( point_policy="follow_mouse", tooltips="""
Name:  @name
Compute time:  @compute_time
Memory:  @memory
Tasks:  @tot_tasks
Completed:  @comp_tasks
Processing:  @in_processing
In memory:  @in_memory
Erred:  @in_erred
Released:  @in_released
""", renderers=[rect], ) self.root.add_tools(hover) @without_property_validation @log_errors def update_layout(self): # Get dependencies per task group. # In some cases there are tg that have themselves as dependencies - we remove those. dependencies = { k: {ds.name for ds in ts.dependencies if ds.name != k} for k, ts in self.scheduler.task_groups.items() } import dask order = dask.order.order( dsk={group.name: 1 for k, group in self.scheduler.task_groups.items()}, dependencies=dependencies, ) ordered = sorted(self.scheduler.task_groups, key=order.get) xs = {} ys = {} locations = set() nodes_layout = {} arrows_layout = {} for tg in ordered: if dependencies[tg]: x = max(xs[dep] for dep in dependencies[tg]) + 1 y = max(ys[dep] for dep in dependencies[tg]) if ( len(dependencies[tg]) > 1 and len({ys[dep] for dep in dependencies[tg]}) == 1 ): y += 1 else: x = 0 y = max(ys.values()) + 1 if ys else 0 while (x, y) in locations: # avoid collisions by moving up y += 1 locations.add((x, y)) xs[tg], ys[tg] = x, y # info needed for node layout to column data source nodes_layout[tg] = {"x": xs[tg], "y": ys[tg]} # info needed for arrow layout arrows_layout[tg] = { "nstart": dependencies[tg], "nend": [tg] * len(dependencies[tg]), } return nodes_layout, arrows_layout def compute_size(self, x, min_box, max_box): start = 0.4 end = 0.8 y = (end - start) / (max_box - min_box) * (x - min_box) + start return y @without_property_validation def update(self): if self.scheduler.transition_counter == self.old_counter: return self.old_counter = self.scheduler.transition_counter if not self.scheduler.task_groups: self.subtitle.text = "Scheduler is empty." else: self.subtitle.text = " " if self.nodes_layout.keys() != self.scheduler.task_groups.keys(): self.nodes_layout, self.arrows_layout = self.update_layout() nodes_data = { "x": [], "y": [], "w_box": [], "h_box": [], "name": [], "color": [], "tot_tasks": [], "x_start": [], "x_end": [], "y_start": [], "y_end": [], "x_end_progress": [], "mem_alpha": [], "node_line_width": [], "comp_tasks": [], "url_logo": [], "x_logo": [], "y_logo": [], "w_logo": [], "h_logo": [], "in_processing": [], "in_memory": [], "in_released": [], "in_erred": [], "compute_time": [], "memory": [], } arrows_data = { "xs": [], "ys": [], "xe": [], "ye": [], } durations = set() nbytes = set() for tg in self.scheduler.task_groups.values(): if tg.duration and tg.nbytes_total: durations.add(tg.duration) nbytes.add(tg.nbytes_total) durations_min = min(durations, default=0) durations_max = max(durations, default=0) nbytes_min = min(nbytes, default=0) nbytes_max = max(nbytes, default=0) box_dim = {} for key, tg in self.scheduler.task_groups.items(): comp_tasks = ( tg.states["released"] + tg.states["memory"] + tg.states["erred"] ) tot_tasks = sum(tg.states.values()) # compute width and height of boxes if ( tg.duration and tg.nbytes_total and comp_tasks and len(durations) > 1 and len(nbytes) > 1 ): # scale duration (width) width_box = self.compute_size( tg.duration / comp_tasks * tot_tasks, min_box=durations_min / comp_tasks * tot_tasks, max_box=durations_max / comp_tasks * tot_tasks, ) # need to scale memory (height) height_box = self.compute_size( tg.nbytes_total / comp_tasks * tot_tasks, min_box=nbytes_min / comp_tasks * tot_tasks, max_box=nbytes_max / comp_tasks * tot_tasks, ) else: width_box = 0.6 height_box = width_box / 2 box_dim[key] = {"width": width_box, "height": height_box} for key, tg in self.scheduler.task_groups.items(): x = self.nodes_layout[key]["x"] y = self.nodes_layout[key]["y"] width = box_dim[key]["width"] height = box_dim[key]["height"] # main boxes layout nodes_data["x"].append(x) nodes_data["y"].append(y) nodes_data["w_box"].append(width) nodes_data["h_box"].append(height) comp_tasks = ( tg.states["released"] + tg.states["memory"] + tg.states["erred"] ) tot_tasks = sum(tg.states.values()) nodes_data["name"].append(tg.prefix.name) nodes_data["color"].append(color_of(tg.prefix.name)) nodes_data["tot_tasks"].append(tot_tasks) # memory alpha factor by 0.4 if not get's too dark nodes_data["mem_alpha"].append( (tg.states["memory"] / sum(tg.states.values())) * 0.4 ) # main box line width if tg.states["processing"]: nodes_data["node_line_width"].append(5) else: nodes_data["node_line_width"].append(1) # progress bar data update nodes_data["x_start"].append(x - width / 2) nodes_data["x_end"].append(x + width / 2) nodes_data["y_start"].append(y - height / 2) nodes_data["y_end"].append(y - height / 2 + height * 0.4) nodes_data["x_end_progress"].append( x - width / 2 + width * comp_tasks / tot_tasks ) # arrows arrows_data["xs"] += [ self.nodes_layout[k]["x"] + box_dim[k]["width"] / 2 for k in self.arrows_layout[key]["nstart"] ] arrows_data["ys"] += [ self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nstart"] ] arrows_data["xe"] += [ self.nodes_layout[k]["x"] - box_dim[k]["width"] / 2 for k in self.arrows_layout[key]["nend"] ] arrows_data["ye"] += [ self.nodes_layout[k]["y"] for k in self.arrows_layout[key]["nend"] ] # LOGOS if len(tg.types) == 1: logo_type = next(iter(tg.types)).split(".")[0] try: url_logo = logos_dict[logo_type] except KeyError: url_logo = "" else: url_logo = "" nodes_data["url_logo"].append(url_logo) nodes_data["x_logo"].append(x + width / 3) nodes_data["y_logo"].append(y + height / 3) ratio = width / height if ratio > 1: nodes_data["h_logo"].append(height * 0.3) nodes_data["w_logo"].append(width * 0.3 / ratio) else: nodes_data["h_logo"].append(height * 0.3 * ratio) nodes_data["w_logo"].append(width * 0.3) # compute_time and memory nodes_data["compute_time"].append(format_time(tg.duration)) nodes_data["memory"].append(format_bytes(tg.nbytes_total)) # Add some status to hover tasks_processing = tg.states["processing"] tasks_memory = tg.states["memory"] tasks_relased = tg.states["released"] tasks_erred = tg.states["erred"] nodes_data["comp_tasks"].append( f"{comp_tasks} ({comp_tasks / tot_tasks * 100:.0f} %)" ) nodes_data["in_processing"].append( f"{tasks_processing} ({tasks_processing/ tot_tasks * 100:.0f} %)" ) nodes_data["in_memory"].append( f"{tasks_memory} ({tasks_memory/ tot_tasks * 100:.0f} %)" ) nodes_data["in_released"].append( f"{tasks_relased} ({tasks_relased/ tot_tasks * 100:.0f} %)" ) nodes_data["in_erred"].append( f"{ tasks_erred} ({tasks_erred/ tot_tasks * 100:.0f} %)" ) self.nodes_source.data.update(nodes_data) self.arrows_source.data.update(arrows_data) class TaskGroupProgress(DashboardComponent): """Stacked area chart showing task groups through time""" def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource() # The length of timeseries to chart (in units of plugin.dt) self.npts = 180 if GroupTiming.name not in scheduler.plugins: scheduler.add_plugin(plugin=GroupTiming(scheduler)) self.plugin = scheduler.plugins[GroupTiming.name] self.source.add(np.array(self.plugin.time) * 1000.0, "time") x_range = DataRange1d(range_padding=0) y_range = Range1d(0, max(self.plugin.nthreads)) self.root = figure( title="Task Group Progress", name="task_group_progress", toolbar_location="above", min_border_bottom=50, x_range=x_range, y_range=y_range, tools="", x_axis_type="datetime", y_axis_location=None, **kwargs, ) self.root.yaxis.major_label_text_alpha = 0 self.root.yaxis.minor_tick_line_alpha = 0 self.root.yaxis.major_tick_line_alpha = 0 self.root.xgrid.visible = False self.root.add_tools( BoxZoomTool(), ResetTool(), PanTool(dimensions="width"), WheelZoomTool(dimensions="width"), ) self._hover = None self._last_drawn = None self._offset = time() self._last_transition_count = scheduler.transition_counter # OrderedDict so we can make a reverse iterator later and get the # most-recently-added glyphs. self._renderers = OrderedDict() self._line_renderers = OrderedDict() def _should_add_new_renderers(self) -> bool: """ Whether to add new renderers to the chart. When a new set of task groups enters the scheduler we'd like to start rendering them. But it can be expensive to add new glyps, so we do it deliberately, checking whether we have to do it and whether the scheduler seems busy. """ # Always draw if we have not before if not self._last_drawn: return True # Don't draw if there have been no new tasks completed since the last update, # or if the scheduler CPU is occupied. if ( self._last_transition_count == self.scheduler.transition_counter or self.scheduler.proc.cpu_percent() > 50 ): return False # Only return true if there are new task groups that we have not yet added # to the ColumnDataSource. return not set(self.plugin.compute.keys()) <= set(self.source.data.keys()) def _should_update(self) -> bool: """ Whether to update the ColumnDataSource. This is cheaper than redrawing, but still not free, so we check whether we need it and whether the scheudler is busy. """ return ( self._last_transition_count != self.scheduler.transition_counter and self.scheduler.proc.cpu_percent() < 50 ) def _get_timeseries(self, restrict_to_existing=False): """ Update the ColumnDataSource with our time series data. restrict_to_existing determines whether to add new task groups which might have been added since the last time we rendered. This is important as we want to add new stackers very deliberately. """ # Get the front/back indices for most recent npts bins out of the timeseries front = max(len(self.plugin.time) - self.npts, 0) back = None # Remove any periods of zero compute at the front or back of the timeseries if len(self.plugin.compute): agg = sum(np.array(v[front:]) for v in self.plugin.compute.values()) front2 = len(agg) - len(np.trim_zeros(agg, trim="f")) front += front2 back = len(np.trim_zeros(agg, trim="b")) - len(agg) or None prepend = ( self.plugin.time[front - 1] if front >= 1 else self.plugin.time[front] - self.plugin.dt ) timestamps = np.array(self.plugin.time[front:back]) dt = np.diff(timestamps, prepend=prepend) if restrict_to_existing: new_data = { k: np.array(v[front:back]) / dt for k, v in self.plugin.compute.items() if k in self.source.data } else: new_data = valmap( lambda x: np.array(x[front:back]) / dt, self.plugin.compute, ) new_data["time"] = ( timestamps - self._offset ) * 1000.0 # bokeh likes milliseconds new_data["nthreads"] = np.array(self.plugin.nthreads[front:back]) return new_data @without_property_validation @log_errors def update(self): """ Maybe update the chart. This is somewhat expensive to draw, so we update it pretty defensively. """ if self._should_add_new_renderers(): # Update the chart, allowing for new task groups to be added. new_data = self._get_timeseries(restrict_to_existing=False) self.source.data = new_data # Possibly update the y range if the number of threads has increased. max_nthreads = max(self.plugin.nthreads) if self.root.y_range.end != max_nthreads: self.root.y_range.end = max_nthreads stackers = list(self.plugin.compute.keys()) colors = [color_of(key_split(k)) for k in stackers] for i, (group, color) in enumerate(zip(stackers, colors)): # If we have already drawn the group, but it is all zero, # set it to be invisible. if group in self._renderers: if not np.count_nonzero(new_data[group]) > 0: self._renderers[group].visible = False self._line_renderers[group].visible = False else: self._renderers[group].visible = True self._line_renderers[group].visible = True continue # Draw the new area and line glyphs. renderer = self.root.varea( x="time", y1=stack(*stackers[:i]), y2=stack(*stackers[: i + 1]), color=color, alpha=0.5, source=self.source, ) self._renderers[group] = renderer line_renderer = self.root.line( x="time", y=stack(*stackers[: i + 1]), color=color, alpha=1.0, source=self.source, ) self._line_renderers[group] = line_renderer # Don't add hover until there is something to show, as bokehjs seems to # have trouble with custom hovers when there are no renderers. if self.plugin.compute and self._hover is None: # Add a hover that will show occupancy for all currently active # task groups. This is a little tricky, bokeh doesn't (yet) support # hit tests for stacked area charts: https://github.com/bokeh/bokeh/issues/9182 # Instead, show a single vline hover which lists the currently active task # groups. A custom formatter in JS-land pulls the relevant data index and # assembles the tooltip. formatter = CustomJSHover(code="return '';") self._hover = HoverTool( tooltips="""
Worker thread occupancy
$index{custom}
""", mode="vline", line_policy="nearest", attachment="horizontal", formatters={"$index": formatter}, ) self.root.add_tools(self._hover) if self._hover: # Create a custom tooltip that: # 1. Includes nthreads # 2. Filters out inactive task groups # (ones without any compute during the relevant dt) # 3. Colors the labels appropriately. formatter = CustomJSHover( code=""" const colormap = %s; const divs = []; for (let k of Object.keys(source.data)) { const val = source.data[k][value]; const color = colormap[k]; if (k === "time" || k === "nthreads" || val < 1.e-3) { continue; } const label = k.length >= 20 ? k.slice(0, 20) + '…' : k; // Unshift so that the ordering of the labels is the same as // the ordering of the stackers. divs.unshift( '
' + '' + label + '' + ': ' + val.toFixed(1) + '
' ) } divs.unshift( '
' + 'nthreads: ' + source.data.nthreads[value] + '
' ); return divs.join('\\n') """ % dict( zip(stackers, colors) ), # sneak the color mapping into the callback args={"source": self.source}, ) # Add the HoverTool to the top line renderer. top_line = None for line in reversed(self._line_renderers.values()): if line.visible: top_line = line break self._hover.renderers = [top_line] self._hover.formatters = {"$index": formatter} self._last_drawn = time() self._last_transition_count = self.scheduler.transition_counter elif self._should_update(): # Possibly update the y range if new threads have been added max_nthreads = max(self.plugin.nthreads) if self.root.y_range.end != max_nthreads: self.root.y_range.end = max_nthreads # Update the data, only including existing columns, rather than redrawing # the whole chart. self.source.data = self._get_timeseries(restrict_to_existing=True) self._last_transition_count = self.scheduler.transition_counter class TaskProgress(DashboardComponent): """Progress bars per task type""" def __init__(self, scheduler, **kwargs): self.scheduler = scheduler data = progress_quads( dict(all={}, memory={}, erred={}, released={}, processing={}, queued={}) ) self.source = ColumnDataSource(data=data) x_range = DataRange1d(range_padding=0) y_range = Range1d(-8, 0) self.root = figure( title="Progress", name="task_progress", x_range=x_range, y_range=y_range, toolbar_location=None, tools="", min_border_bottom=50, **kwargs, ) self.root.line( # just to define early ranges x=[0, 0.9], y=[-1, 0], line_color="#FFFFFF", alpha=0.0 ) self.root.quad( source=self.source, top="top", bottom="bottom", left="left", right="right", fill_color="#aaaaaa", line_color="#aaaaaa", fill_alpha=0.1, line_alpha=0.3, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="left", right="released-loc", fill_color="color", line_color="color", fill_alpha=0.6, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="released-loc", right="memory-loc", fill_color="color", line_color="color", fill_alpha=1.0, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="memory-loc", right="erred-loc", fill_color="black", fill_alpha=0.5, line_alpha=0, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="erred-loc", right="processing-loc", fill_color="gray", fill_alpha=0.35, line_alpha=0, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="processing-loc", right="queued-loc", fill_color="gray", hatch_pattern="/", hatch_color="white", fill_alpha=0.35, line_alpha=0, ) self.root.quad( source=self.source, top="top", bottom="bottom", left="queued-loc", right="no-worker-loc", fill_color="red", hatch_pattern="/", hatch_color="black", fill_alpha=0.35, line_alpha=0, ) self.root.text( source=self.source, text="show-name", y="bottom", x="left", x_offset=5, text_font_size=value("10pt"), ) self.root.text( source=self.source, text="done", y="bottom", x="right", x_offset=-5, text_align="right", text_font_size=value("10pt"), ) self.root.ygrid.visible = False self.root.yaxis.minor_tick_line_alpha = 0 self.root.yaxis.visible = False self.root.xgrid.visible = False self.root.xaxis.minor_tick_line_alpha = 0 self.root.xaxis.visible = False hover = HoverTool( point_policy="follow_mouse", tooltips="""
Name:  @name
All:  @all
Queued:  @queued
No-worker:  @no_worker
Processing:  @processing
Memory:  @memory
Erred:  @erred
""", ) self.root.add_tools(hover) @without_property_validation @log_errors def update(self): state = { "memory": {}, "erred": {}, "released": {}, "processing": {}, "waiting": {}, "queued": {}, "no_worker": {}, } for tp in self.scheduler.task_prefixes.values(): active_states = tp.active_states if any(active_states.get(s) for s in state.keys()): state["memory"][tp.name] = active_states["memory"] state["erred"][tp.name] = active_states["erred"] state["released"][tp.name] = active_states["released"] state["processing"][tp.name] = active_states["processing"] state["waiting"][tp.name] = active_states["waiting"] state["queued"][tp.name] = active_states["queued"] state["no_worker"][tp.name] = active_states["no-worker"] state["all"] = {k: sum(v[k] for v in state.values()) for k in state["memory"]} if not state["all"] and not len(self.source.data["all"]): return d = progress_quads(state) update(self.source, d) totals = { k: sum(state[k].values()) for k in [ "all", "memory", "erred", "released", "waiting", "queued", "no_worker", ] } totals["processing"] = totals["all"] - sum( v for k, v in totals.items() if k != "all" ) self.root.title.text = ( "Progress -- total: %(all)s, " "waiting: %(waiting)s, " "queued: %(queued)s, " "processing: %(processing)s, " "in-memory: %(memory)s, " "no-worker: %(no_worker)s, " "erred: %(erred)s" % totals ) class EventLoop(DashboardComponent): """Event Loop Health""" @log_errors def __init__(self, scheduler, **kwargs): self.scheduler = scheduler self.source = ColumnDataSource( { "names": ["Scheduler", "Workers"], "values": [0, 0], "text": ["0", "0"], } ) self.root = figure( title="Event Loop Health", x_range=["Scheduler", "Workers"], y_range=[ 0, parse_timedelta(dask.config.get("distributed.admin.tick.interval")) * 25, ], tools="", toolbar_location="above", **kwargs, ) self.root.vbar(x="names", top="values", width=0.9, source=self.source) self.root.xaxis.minor_tick_line_alpha = 0 self.root.ygrid.visible = True self.root.xgrid.visible = False hover = HoverTool(tooltips=[("Interval", "@text s")], mode="vline") self.root.add_tools(hover) @without_property_validation @log_errors def update(self): s = self.scheduler data = { "names": ["Scheduler", "Workers"], "values": [ s._tick_interval_observed, sum(w.metrics["event_loop_interval"] for w in s.workers.values()) / (len(s.workers) or 1), ], } data["text"] = [format_time(x) for x in data["values"]] update(self.source, data) class ExceptionsTable(DashboardComponent): """ Exceptions logged in tasks. Since there might be many related exceptions (e.g., all tasks in a given task group fail for the same reason), we make a best-effort attempt to (1) aggregate to the task group, and (2) deduplicate similar looking tasks. """ scheduler: Scheduler def __init__(self, scheduler: Scheduler, width: int = 1000, **kwargs: Any): self.scheduler = scheduler self.names = [ "Task", "Exception", "Traceback", "Worker(s)", "Count", ] self.source = ColumnDataSource({k: [] for k in self.names}) code_formatter = HTMLTemplateFormatter( template='<%= value %>' ) columns = [ TableColumn( field="Task", title="Task", formatter=code_formatter, width=150, ), TableColumn( field="Exception", title="Exception", formatter=code_formatter, width=300, ), TableColumn( field="Traceback", title="Traceback", formatter=code_formatter, width=300, ), TableColumn( field="Worker(s)", title="Worker(s)", formatter=code_formatter, width=200, ), TableColumn( field="Count", title="Count", formatter=NumberFormatter(format="0,0"), width=50, ), ] if "sizing_mode" in kwargs: sizing_mode = {"sizing_mode": kwargs["sizing_mode"]} else: sizing_mode = {} self.root = DataTable( source=self.source, columns=columns, reorderable=True, sortable=True, width=width, index_position=None, **_DATATABLE_STYLESHEETS_KWARGS, **sizing_mode, ) @without_property_validation def update(self): new_data = {name: [] for name in self.names} erred_tasks = self.scheduler.erred_tasks for ts in erred_tasks: new_data["Task"].append(ts.key) new_data["Exception"].append(ts.exception_text) new_data["Traceback"].append(ts.traceback_text) new_data["Worker(s)"].append(",\n".join(ts.erred_on)) new_data["Count"].append(len(ts.erred_on)) update(self.source, new_data) class WorkerTable(DashboardComponent): """Status of the current workers This is two plots, a text-based table for each host and a thin horizontal plot laying out hosts by their current memory use. """ excluded_names = { "executing", "in_flight", "in_memory", "ready", "time", # Use scheduler.WorkerState.memory.managed instead of # scheduler.WorkerState.metrics["managed_bytes"]; the two measures are slightly # different. See explanation in scheduler.WorkerState.memory(). "managed_bytes", "spilled_bytes", } def __init__(self, scheduler, width=800, **kwargs): self.scheduler = scheduler self.names = [ "name", "address", "nthreads", "cpu", "memory", "memory_limit", "memory_percent", "memory_managed", "memory_unmanaged_old", "memory_unmanaged_recent", "memory_spilled", "num_fds", "host_net_io.read_bps", "host_net_io.write_bps", "host_disk_io.read_bps", "host_disk_io.write_bps", "cpu_fraction", ] workers = self.scheduler.workers.values() self.extra_names = sorted( { m for ws in workers for m, v in ws.metrics.items() if m not in self.names and isinstance(v, (str, int, float)) } - self.excluded_names ) table_names = [ "name", "address", "nthreads", "cpu", "memory", "memory_limit", "memory_percent", "memory_managed", "memory_unmanaged_old", "memory_unmanaged_recent", "memory_spilled", "num_fds", "host_net_io.read_bps", "host_net_io.write_bps", "host_disk_io.read_bps", "host_disk_io.write_bps", ] column_title_renames = { "memory_limit": "limit", "memory_percent": "memory %", "memory_managed": "managed", "memory_unmanaged_old": "unmanaged old", "memory_unmanaged_recent": "unmanaged recent", "memory_spilled": "spilled", "num_fds": "# fds", "host_net_io.read_bps": "net read", "host_net_io.write_bps": "net write", "host_disk_io.read_bps": "disk read", "host_disk_io.write_bps": "disk write", } self.source = ColumnDataSource({k: [] for k in self.names}) columns = { name: TableColumn(field=name, title=column_title_renames.get(name, name)) for name in table_names } formatters = { "cpu": NumberFormatter(format="0 %"), "memory_percent": NumberFormatter(format="0.0 %"), "memory": NumberFormatter(format="0.0 b"), "memory_limit": NumberFormatter(format="0.0 b"), "memory_managed": NumberFormatter(format="0.0 b"), "memory_unmanaged_old": NumberFormatter(format="0.0 b"), "memory_unmanaged_recent": NumberFormatter(format="0.0 b"), "memory_spilled": NumberFormatter(format="0.0 b"), "host_net_io.read_bps": NumberFormatter(format="0 b"), "host_net_io.write_bps": NumberFormatter(format="0 b"), "num_fds": NumberFormatter(format="0"), "nthreads": NumberFormatter(format="0"), "host_disk_io.read_bps": NumberFormatter(format="0 b"), "host_disk_io.write_bps": NumberFormatter(format="0 b"), } table = DataTable( source=self.source, columns=[columns[n] for n in table_names], reorderable=True, sortable=True, width=width, index_position=None, **_DATATABLE_STYLESHEETS_KWARGS, ) for name in table_names: if name in formatters: table.columns[table_names.index(name)].formatter = formatters[name] extra_names = ["name", "address"] + self.extra_names extra_columns = { name: TableColumn(field=name, title=column_title_renames.get(name, name)) for name in extra_names } extra_table = DataTable( source=self.source, columns=[extra_columns[n] for n in extra_names], reorderable=True, sortable=True, width=width, index_position=None, **_DATATABLE_STYLESHEETS_KWARGS, ) for name in extra_names: if name in formatters: extra_table.columns[extra_names.index(name)].formatter = formatters[ name ] hover = HoverTool( point_policy="follow_mouse", tooltips="""
Worker (@name): @memory_percent{0.0 %}
""", ) mem_plot = figure( title="Memory Use (%)", toolbar_location=None, x_range=(0, 1), y_range=(-0.1, 0.1), height=60, width=width, tools="", min_border_right=0, **kwargs, ) mem_plot.circle( source=self.source, x="memory_percent", y=0, size=10, fill_alpha=0.5 ) mem_plot.ygrid.visible = False mem_plot.yaxis.minor_tick_line_alpha = 0 mem_plot.xaxis.visible = False mem_plot.yaxis.visible = False mem_plot.add_tools(hover, BoxSelectTool()) hover = HoverTool( point_policy="follow_mouse", tooltips="""
Worker (@name): @cpu_fraction{0 %}
""", ) cpu_plot = figure( title="CPU Use (%)", toolbar_location=None, x_range=(0, 1), y_range=(-0.1, 0.1), height=60, width=width, tools="", min_border_right=0, **kwargs, ) cpu_plot.circle( source=self.source, x="cpu_fraction", y=0, size=10, fill_alpha=0.5 ) cpu_plot.ygrid.visible = False cpu_plot.yaxis.minor_tick_line_alpha = 0 cpu_plot.xaxis.visible = False cpu_plot.yaxis.visible = False cpu_plot.add_tools(hover, BoxSelectTool()) self.cpu_plot = cpu_plot if "sizing_mode" in kwargs: sizing_mode = {"sizing_mode": kwargs["sizing_mode"]} else: sizing_mode = {} components = [cpu_plot, mem_plot, table] if self.extra_names: components.append(extra_table) self.root = column(*components, **sizing_mode) @without_property_validation def update(self): data = {name: [] for name in self.names + self.extra_names} for i, ws in enumerate( sorted(self.scheduler.workers.values(), key=lambda ws: str(ws.name)) ): minfo = ws.memory for name in self.names + self.extra_names: if "." in name: n0, _, n1 = name.partition(".") v = ws.metrics.get(n0, {}).get(n1, None) else: v = ws.metrics.get(name, None) data[name].append(v) data["name"][-1] = ws.name if ws.name is not None else i data["address"][-1] = ws.address if ws.memory_limit: data["memory_percent"][-1] = ws.metrics["memory"] / ws.memory_limit else: data["memory_percent"][-1] = "" data["memory_limit"][-1] = ws.memory_limit data["memory_managed"][-1] = minfo.managed data["memory_unmanaged_old"][-1] = minfo.unmanaged_old data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent data["memory_unmanaged_recent"][-1] = minfo.unmanaged_recent data["memory_spilled"][-1] = minfo.spilled data["cpu"][-1] = ws.metrics["cpu"] / 100.0 data["cpu_fraction"][-1] = ws.metrics["cpu"] / 100.0 / ws.nthreads data["nthreads"][-1] = ws.nthreads for name in self.names + self.extra_names: if name == "name": data[name].insert(0, f"Total ({len(data[name])})") continue try: if len(self.scheduler.workers) == 0: total_data = None elif name == "memory_percent": total_mem = sum( ws.memory_limit for ws in self.scheduler.workers.values() ) total_data = ( ( sum( ws.metrics["memory"] for ws in self.scheduler.workers.values() ) / total_mem ) if total_mem else "" ) elif name == "cpu": total_data = ( sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values()) / 100 / len(self.scheduler.workers.values()) ) elif name == "cpu_fraction": total_data = ( sum(ws.metrics["cpu"] for ws in self.scheduler.workers.values()) / 100 / sum(ws.nthreads for ws in self.scheduler.workers.values()) ) else: total_data = sum(data[name]) data[name].insert(0, total_data) except TypeError: data[name].insert(0, None) self.source.data.update(data) class Shuffling(DashboardComponent): """Occupancy (in time) per worker""" def __init__(self, scheduler, **kwargs): with log_errors(): self.scheduler = scheduler self.source = ColumnDataSource( { "worker": [], "y": [], "comm_memory": [], "comm_memory_limit": [], "comm_buckets": [], "comm_avg_duration": [], "comm_avg_size": [], "comm_read": [], "comm_written": [], "comm_color": [], "disk_memory": [], "disk_memory_limit": [], "disk_buckets": [], "disk_avg_duration": [], "disk_avg_size": [], "disk_read": [], "disk_written": [], "disk_color": [], } ) self.totals_source = ColumnDataSource( { "x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"], "values": [0, 0, 0, 0], } ) self.comm_memory = figure( title="Comms Buffer", tools="", toolbar_location="above", x_range=Range1d(0, 100_000_000), **kwargs, ) self.comm_memory.hbar( source=self.source, right="comm_memory", y="y", height=0.9, color="comm_color", ) hover = HoverTool( tooltips=[ ("Memory Used", "@comm_memory{0.00 b}"), ("Average Write", "@comm_avg_size{0.00 b}"), ("# Buckets", "@comm_buckets"), ("Average Duration", "@comm_avg_duration"), ], formatters={"@comm_avg_duration": "datetime"}, mode="hline", ) self.comm_memory.add_tools(hover) self.comm_memory.x_range.start = 0 self.comm_memory.x_range.end = 1 self.comm_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.disk_memory = figure( title="Disk Buffer", tools="", toolbar_location="above", x_range=Range1d(0, 100_000_000), **kwargs, ) self.disk_memory.yaxis.visible = False self.disk_memory.hbar( source=self.source, right="disk_memory", y="y", height=0.9, color="disk_color", ) hover = HoverTool( tooltips=[ ("Memory Used", "@disk_memory{0.00 b}"), ("Average Write", "@disk_avg_size{0.00 b}"), ("# Buckets", "@disk_buckets"), ("Average Duration", "@disk_avg_duration"), ], formatters={"@disk_avg_duration": "datetime"}, mode="hline", ) self.disk_memory.add_tools(hover) self.disk_memory.xaxis[0].formatter = NumeralTickFormatter(format="0.0 b") self.totals = figure( title="Total movement", tools="", toolbar_location="above", **kwargs, ) titles = ["Network Send", "Network Receive", "Disk Write", "Disk Read"] self.totals = figure( x_range=titles, title="Totals", toolbar_location=None, tools="", **kwargs, ) self.totals.vbar( x="x", top="values", width=0.9, source=self.totals_source, ) self.totals.xgrid.grid_line_color = None self.totals.y_range.start = 0 self.totals.yaxis[0].formatter = NumeralTickFormatter(format="0.0 b") hover = HoverTool( tooltips=[("Total", "@values{0.00b}")], mode="vline", ) self.totals.add_tools(hover) self.root = row(self.comm_memory, self.disk_memory) @without_property_validation def update(self): with log_errors(): input = self.scheduler.extensions["shuffle"].heartbeats if not input: return input = list(input.values())[-1] # TODO: multiple concurrent shuffles data = defaultdict(list) now = time() for i, (worker, d) in enumerate(input.items()): data["y"].append(i) data["worker"].append(worker) for prefix in ["comm", "disk"]: data[f"{prefix}_total"].append(d[prefix]["total"]) data[f"{prefix}_memory"].append(d[prefix]["memory"]) data[f"{prefix}_memory_limit"].append(d[prefix]["memory_limit"]) data[f"{prefix}_buckets"].append(d[prefix]["buckets"]) data[f"{prefix}_avg_duration"].append( d[prefix]["diagnostics"].get("avg_duration", 0) ) data[f"{prefix}_avg_size"].append( d[prefix]["diagnostics"].get("avg_size", 0) ) data[f"{prefix}_read"].append(d[prefix]["read"]) data[f"{prefix}_written"].append(d[prefix]["written"]) if self.scheduler.workers[worker].last_seen < now - 5: data[f"{prefix}_color"].append("gray") elif d[prefix]["memory"] > d[prefix]["memory_limit"]: data[f"{prefix}_color"].append("red") else: data[f"{prefix}_color"].append("blue") """ singletons = { f"{prefix}_avg_duration": [ sum(data[f"{prefix}_avg_duration"]) / len(data[f"{prefix}_avg_duration"]) ], f"{prefix}_avg_size": [ sum(data[f"{prefix}_avg_size"]) / len(data[f"{prefix}_avg_size"]) ], "disk_avg_duration": [ sum(data["disk_avg_duration"]) / len(data["disk_avg_duration"]) ], "disk_avg_size": [ sum(data["disk_avg_size"]) / len(data["disk_avg_size"]) ], } singletons[f"{prefix}_avg_bandwidth"] = [ singletons[f"{prefix}_avg_size"][0] / singletons[f"{prefix}_avg_duration"][0] ] singletons["disk_avg_bandwidth"] = [ singletons["disk_avg_size"][0] / singletons["disk_avg_duration"][0] ] singletons["y"] = [data["y"][-1] / 2] """ totals = { "x": ["Network Send", "Network Receive", "Disk Write", "Disk Read"], "values": [ sum(data["comm_written"]), sum(data["comm_read"]), sum(data["disk_written"]), sum(data["disk_read"]), ], } update(self.totals_source, totals) update(self.source, dict(data)) limit = max(data["comm_memory_limit"] + data["disk_memory_limit"]) * 1.2 self.comm_memory.x_range.end = limit self.disk_memory.x_range.end = limit _STYLES = { "width": "100%", "height": "100%", "max-width": "1920px", "max-height": "1080px", "padding": "12px", "border": "1px solid lightgray", "box-shadow": "inset 1px 0 8px 0 lightgray", "overflow": "auto", } if BOKEH_VERSION.major < 3: _BOKEH_STYLES_KWARGS = {"style": _STYLES} else: _BOKEH_STYLES_KWARGS = {"styles": _STYLES} class SchedulerLogs: def __init__(self, scheduler, start=None): logs = scheduler.get_logs(start=start, timestamps=True) if not logs: logs_html = ( '

No logs to report

' ) else: logs_html = Log( "\n".join( "%s - %s" % (datetime.fromtimestamp(time).strftime("%H:%M:%S.%f"), line) for time, level, line in logs ) )._repr_html_() self.root = Div(text=logs_html, **_BOKEH_STYLES_KWARGS) @log_errors def systemmonitor_doc(scheduler, extra, doc): sysmon = SystemMonitor(scheduler, sizing_mode="stretch_both") doc.title = "Dask: Scheduler System Monitor" add_periodic_callback(doc, sysmon, 500) doc.add_root(sysmon.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def shuffling_doc(scheduler, extra, doc): doc.title = "Dask: Shuffling" shuffling = Shuffling(scheduler, width=400, height=400) workers_memory = WorkersMemory(scheduler, width=400, height=400) timeseries = SystemTimeseries( scheduler, width=1600, height=200, follow_interval=3000 ) event_loop = EventLoop(scheduler, width=200, height=400) add_periodic_callback(doc, shuffling, 200) add_periodic_callback(doc, workers_memory, 200) add_periodic_callback(doc, timeseries, 500) add_periodic_callback(doc, event_loop, 500) timeseries.bandwidth.y_range = timeseries.disk.y_range doc.add_root( column( row( workers_memory.root, shuffling.comm_memory, shuffling.disk_memory, shuffling.totals, event_loop.root, ), row(column(timeseries.bandwidth, timeseries.disk)), ) ) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def stealing_doc(scheduler, extra, doc): occupancy = Occupancy(scheduler) stealing_ts = StealingTimeSeries(scheduler) stealing_events = StealingEvents(scheduler) stealing_events.root.x_range = stealing_ts.root.x_range doc.title = "Dask: Work Stealing" add_periodic_callback(doc, occupancy, 500) add_periodic_callback(doc, stealing_ts, 500) add_periodic_callback(doc, stealing_events, 500) doc.add_root( row( occupancy.root, column( stealing_ts.root, stealing_events.root, sizing_mode="stretch_both", ), ) ) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def events_doc(scheduler, extra, doc): events = Events(scheduler, "all", height=250) events.update() add_periodic_callback(doc, events, 500) doc.title = "Dask: Scheduler Events" doc.add_root(column(events.root, sizing_mode="scale_width")) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def exceptions_doc(scheduler, extra, doc): table = ExceptionsTable(scheduler) table.update() add_periodic_callback(doc, table, 1000) doc.title = "Dask: Exceptions" doc.add_root(table.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def workers_doc(scheduler, extra, doc): table = WorkerTable(scheduler) table.update() add_periodic_callback(doc, table, 500) doc.title = "Dask: Workers" doc.add_root(table.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def hardware_doc(scheduler, extra, doc): hw = Hardware(scheduler) hw.update() doc.title = "Dask: Cluster Hardware Bandwidth" doc.add_root(hw.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME add_periodic_callback(doc, hw, 500) @log_errors def tasks_doc(scheduler, extra, doc): ts = TaskStream( scheduler, n_rectangles=dask.config.get( "distributed.scheduler.dashboard.tasks.task-stream-length" ), clear_interval="60s", sizing_mode="stretch_both", ) ts.update() add_periodic_callback(doc, ts, 5000) doc.title = "Dask: Task Stream" doc.add_root(ts.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def graph_doc(scheduler, extra, doc): graph = TaskGraph(scheduler, sizing_mode="stretch_both") doc.title = "Dask: Task Graph" graph.update() add_periodic_callback(doc, graph, 200) doc.add_root(graph.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def tg_graph_doc(scheduler, extra, doc): tg_graph = TaskGroupGraph(scheduler, sizing_mode="stretch_both") doc.title = "Dask: Task Groups Graph" tg_graph.update() add_periodic_callback(doc, tg_graph, 200) doc.add_root(tg_graph.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME @log_errors def status_doc(scheduler, extra, doc): cluster_memory = ClusterMemory(scheduler, sizing_mode="stretch_both") cluster_memory.update() add_periodic_callback(doc, cluster_memory, 100) doc.add_root(cluster_memory.root) if len(scheduler.workers) <= 100: workers_memory = WorkersMemory(scheduler, sizing_mode="stretch_both") processing = CurrentLoad(scheduler, sizing_mode="stretch_both") processing_root = processing.processing_figure else: workers_memory = WorkersMemoryHistogram(scheduler, sizing_mode="stretch_both") processing = ProcessingHistogram(scheduler, sizing_mode="stretch_both") processing_root = processing.root current_load = CurrentLoad(scheduler, sizing_mode="stretch_both") occupancy = Occupancy(scheduler, sizing_mode="stretch_both") workers_transfer_bytes = WorkersTransferBytes(scheduler, sizing_mode="stretch_both") cpu_root = current_load.cpu_figure occupancy_root = occupancy.root workers_memory.update() workers_transfer_bytes.update() processing.update() current_load.update() occupancy.update() add_periodic_callback(doc, workers_memory, 100) add_periodic_callback(doc, workers_transfer_bytes, 100) add_periodic_callback(doc, processing, 100) add_periodic_callback(doc, current_load, 100) add_periodic_callback(doc, occupancy, 100) doc.add_root(workers_memory.root) tab1 = TabPanel(child=processing_root, title="Processing") tab2 = TabPanel(child=cpu_root, title="CPU") tab3 = TabPanel(child=occupancy_root, title="Occupancy") tab4 = TabPanel(child=workers_transfer_bytes.root, title="Data Transfer") proc_tabs = Tabs( tabs=[tab1, tab2, tab3, tab4], name="processing_tabs", sizing_mode="stretch_both", ) doc.add_root(proc_tabs) task_stream = TaskStream( scheduler, n_rectangles=dask.config.get( "distributed.scheduler.dashboard.status.task-stream-length" ), clear_interval="5s", sizing_mode="stretch_both", ) task_stream.update() add_periodic_callback(doc, task_stream, 100) doc.add_root(task_stream.root) task_progress = TaskProgress(scheduler, sizing_mode="stretch_both") task_progress.update() add_periodic_callback(doc, task_progress, 100) doc.add_root(task_progress.root) doc.title = "Dask: Status" doc.theme = BOKEH_THEME doc.template = env.get_template("status.html") doc.template_variables.update(extra) @curry def individual_doc(cls, interval, scheduler, extra, doc, fig_attr="root", **kwargs): # Note: @log_errors and @curry are not compatible with log_errors(): fig = cls(scheduler, sizing_mode="stretch_both", **kwargs) fig.update() add_periodic_callback(doc, fig, interval) doc.add_root(getattr(fig, fig_attr)) doc.theme = BOKEH_THEME doc.title = "Dask: " + funcname(cls) @log_errors def individual_profile_doc(scheduler, extra, doc): prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc) doc.add_root(prof.root) prof.trigger_update() doc.theme = BOKEH_THEME @log_errors def individual_profile_server_doc(scheduler, extra, doc): prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc) doc.add_root(prof.root) prof.trigger_update() doc.theme = BOKEH_THEME @log_errors def profile_doc(scheduler, extra, doc): doc.title = "Dask: Profile" prof = ProfileTimePlot(scheduler, sizing_mode="stretch_both", doc=doc) doc.add_root(prof.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME prof.trigger_update() @log_errors def profile_server_doc(scheduler, extra, doc): doc.title = "Dask: Profile of Event Loop" prof = ProfileServer(scheduler, sizing_mode="stretch_both", doc=doc) doc.add_root(prof.root) doc.template = env.get_template("simple.html") doc.template_variables.update(extra) doc.theme = BOKEH_THEME prof.trigger_update()