from __future__ import annotations import asyncio from distributed.metrics import time class ResourceLimiter: """Limit an abstract resource This allows us to track usage of an abstract resource. If the usage of this resources goes beyond a defined maxvalue, we can block further execution Example:: limiter = ResourceLimiter(2) limiter.increase(1) limiter.increase(2) limiter.decrease(1) # This will block since we're still not below maxvalue await limiter.wait_for_available() """ def __init__(self, maxvalue: int) -> None: self._maxvalue = maxvalue self._acquired = 0 self._condition = asyncio.Condition() self._waiters = 0 self.time_blocked_total = 0.0 self.time_blocked_avg = 0.0 def __repr__(self) -> str: return f"" def available(self) -> int: """How far can the value be increased before blocking""" return max(0, self._maxvalue - self._acquired) def free(self) -> bool: """Return True if nothing has been acquired / the limiter is in a neutral state""" return self._acquired == 0 async def wait_for_available(self) -> None: """Block until the counter drops below maxvalue""" start = time() duration = 0 try: if self.available(): return async with self._condition: self._waiters += 1 await self._condition.wait_for(self.available) self._waiters -= 1 duration = time() - start finally: self.time_blocked_total += duration self.time_blocked_avg = self.time_blocked_avg * 0.9 + duration * 0.1 def increase(self, value: int) -> None: """Increase the internal counter by value""" self._acquired += value async def decrease(self, value: int) -> None: """Decrease the internal counter by value""" if value > self._acquired: raise RuntimeError( f"Cannot release more than what was acquired! release: {value} acquired: {self._acquired}" ) self._acquired -= value async with self._condition: self._condition.notify_all()