Skip to content

Commit 10e5c78

Browse files
committed
Smooth transient social-agent aggregate dropouts
1 parent b0670df commit 10e5c78

File tree

1 file changed

+27
-6
lines changed

1 file changed

+27
-6
lines changed

compose/social-agent-network/aggregate_results.py

Lines changed: 27 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,19 +20,34 @@
2020
DEFAULT_HOST = "0.0.0.0"
2121
DEFAULT_PORT = 8090
2222
DEFAULT_THROUGHPUT_WINDOW_SECONDS = 8.0
23+
DEFAULT_SNAPSHOT_STALE_SECONDS = 5.0
2324

2425

25-
def load_agent_snapshots():
26+
def load_agent_snapshots(cached=None, now_epoch=None, stale_after_seconds=DEFAULT_SNAPSHOT_STALE_SECONDS):
27+
cached = {} if cached is None else cached
28+
now_epoch = time.time() if now_epoch is None else now_epoch
2629
snapshots = []
30+
next_cached = {}
2731
for path in sorted(RESULTS_DIR.glob(INPUT_PATTERN)):
2832
try:
2933
with path.open(encoding="utf-8") as fd:
30-
snapshots.append(json.load(fd))
34+
snapshot = json.load(fd)
35+
snapshots.append(snapshot)
36+
next_cached[str(path)] = {
37+
"snapshot": snapshot,
38+
"loaded_at": now_epoch,
39+
}
3140
except (FileNotFoundError, json.JSONDecodeError):
3241
# Writers replace these files atomically, but readers can still
3342
# catch a transient missing/partial state between polls.
34-
continue
35-
return snapshots
43+
cached_entry = cached.get(str(path))
44+
if (
45+
cached_entry is not None
46+
and (now_epoch - cached_entry["loaded_at"]) <= stale_after_seconds
47+
):
48+
snapshots.append(cached_entry["snapshot"])
49+
next_cached[str(path)] = cached_entry
50+
return snapshots, next_cached
3651

3752

3853
def aggregate_snapshots(snapshots, now_epoch, previous=None):
@@ -407,13 +422,19 @@ def log_message(self, format, *args):
407422
def run(interval_seconds, once, host, port, history_limit):
408423
state = AggregationState(history_limit=history_limit)
409424
previous = None
425+
cached_snapshots = {}
410426

411427
def aggregate_forever():
412-
nonlocal previous
428+
nonlocal previous, cached_snapshots
413429
while True:
414430
now_epoch = time.time()
431+
snapshots, cached_snapshots = load_agent_snapshots(
432+
cached=cached_snapshots,
433+
now_epoch=now_epoch,
434+
stale_after_seconds=max(DEFAULT_SNAPSHOT_STALE_SECONDS, interval_seconds * 2),
435+
)
415436
snapshot = aggregate_snapshots(
416-
load_agent_snapshots(), now_epoch, previous=previous
437+
snapshots, now_epoch, previous=previous
417438
)
418439
state.update(snapshot)
419440
previous = snapshot

0 commit comments

Comments
 (0)