-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetrics.py
More file actions
32 lines (26 loc) · 1016 Bytes
/
Copy pathmetrics.py
File metadata and controls
32 lines (26 loc) · 1016 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from collections import deque
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
@dataclass
class ThroughputMeter:
"""
Measures throughput using a sliding time window.
GIVEN a stream of events
WHEN tick() is called for each event
THEN returns current events/second over the window
"""
window: timedelta = timedelta(seconds=30)
def __post_init__(self):
self._events = deque() # stores datetime stamps
def tick(self, now: datetime | None = None) -> float:
"Record an event at 'now' and return events/sec over the window."
now = now or datetime.now(timezone.utc)
self._events.append(now)
cutoff = now - self.window
while self._events and self._events[0] < cutoff:
self._events.popleft()
# rate = count / seconds
return len(self._events) / self.window.total_seconds()
def reset(self) -> None:
"""Clear all recorded events."""
self._events.clear()