Skip to content

Commit f04e551

Browse files
jeffkbkimfacebook-github-bot
authored andcommitted
Add DeferrableMetrics class and delete metrics_output_util (#3812)
Summary: Add DeferrableMetrics, a unified metrics container that wraps either a resolved dict or a pending Future. Provides subscribe()/resolve()/update()/is_resolved() APIs for async-safe metric access. Implements Mapping for backward compatibility during the transition period. Differential Revision: D94254756
1 parent a77cf3b commit f04e551

File tree

4 files changed

+312
-367
lines changed

4 files changed

+312
-367
lines changed
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
#!/usr/bin/env python3
2+
# Copyright (c) Meta Platforms, Inc. and affiliates.
3+
# All rights reserved.
4+
#
5+
# This source code is licensed under the BSD-style license found in the
6+
# LICENSE file in the root directory of this source tree.
7+
8+
# pyre-strict
9+
10+
"""Unified metrics type for sync and async metric access.
11+
12+
Thread safety: This class relies on CPython's GIL for safe mutation of
13+
_data and _resolved from Future callback threads. The GIL ensures that
14+
attribute assignment is atomic, so concurrent reads of is_resolved() or
15+
resolve() will see a consistent state. If used outside CPython (e.g.,
16+
free-threaded Python 3.13t), a threading.Lock would be needed.
17+
"""
18+
19+
from concurrent.futures import Future
20+
from typing import Any, Callable
21+
22+
23+
class DeferrableMetrics:
24+
"""
25+
Metrics container that wraps either a resolved dict or a pending Future.
26+
27+
Provides a unified interface for both sync and async metric access:
28+
- subscribe(callback): async access, always works
29+
- resolve(): sync access, fails fast if backed by Future
30+
- update(other): deferred merge
31+
- is_resolved(): check without blocking
32+
33+
Does NOT implement Mapping. Callers must use subscribe() or resolve()
34+
to access metric values. This prevents silent blocking in the training
35+
path when backed by a Future.
36+
"""
37+
38+
def __init__(
39+
self,
40+
inner: dict[str, Any] | Future[dict[str, Any]],
41+
) -> None:
42+
if isinstance(inner, Future):
43+
self._future: Future[dict[str, Any]] | None = inner
44+
self._data: dict[str, Any] = {}
45+
self._resolved: bool = False
46+
else:
47+
self._future = None
48+
self._data = dict(inner)
49+
self._resolved = True
50+
51+
def subscribe(
52+
self,
53+
callback: Callable[[dict[str, Any]], None],
54+
on_error: Callable[[Exception], None] | None = None,
55+
) -> None:
56+
"""Register a callback for when metrics are available.
57+
58+
If already resolved, callback fires immediately (synchronously).
59+
If backed by Future, callback fires when Future completes.
60+
"""
61+
if self._resolved:
62+
callback(self._data)
63+
elif self._future is not None:
64+
65+
def _on_complete(f: Future[dict[str, Any]]) -> None:
66+
try:
67+
result = f.result()
68+
self._data = result
69+
self._resolved = True
70+
callback(self._data)
71+
except Exception as e:
72+
if on_error:
73+
on_error(e)
74+
else:
75+
raise
76+
77+
self._future.add_done_callback(_on_complete)
78+
79+
def resolve(self) -> dict[str, Any]:
80+
"""Synchronously return the resolved metrics dict.
81+
82+
Fails fast with RuntimeError if backed by a Future.
83+
Only use in contexts where CPU-offloaded metrics are disabled
84+
(eval, inference, tests).
85+
"""
86+
if not self._resolved:
87+
raise RuntimeError(
88+
"Cannot synchronously resolve DeferrableMetrics backed by a "
89+
"Future. Use subscribe() for async access, or ensure "
90+
"CPU-offloaded metrics are disabled for this code path."
91+
)
92+
return self._data
93+
94+
def update(self, other: "dict[str, Any] | DeferrableMetrics") -> None:
95+
"""Merge additional key-value pairs.
96+
97+
If already resolved, merges immediately.
98+
If backed by Future, defers the merge until Future resolves.
99+
If other is a DeferrableMetrics, subscribes to it for deferred merge.
100+
101+
Ordering constraint: when backed by a Future, update() replaces the
102+
internal Future with a new merged Future. Any subscribe() callbacks
103+
registered *before* update() are attached to the original Future and
104+
will receive un-merged data. Call update() before subscribe() to
105+
ensure callbacks see the merged result.
106+
"""
107+
if isinstance(other, DeferrableMetrics):
108+
if other.is_resolved():
109+
self.update(other.resolve())
110+
else:
111+
112+
def _propagate_error(e: Exception) -> None:
113+
if self._future is not None and not self._future.done():
114+
self._future.set_exception(e)
115+
116+
other.subscribe(
117+
lambda data: self.update(data),
118+
on_error=_propagate_error,
119+
)
120+
return
121+
122+
assert isinstance(other, dict)
123+
dict_other: dict[str, Any] = other
124+
125+
if self._resolved:
126+
self._data.update(dict_other)
127+
elif self._future is not None:
128+
original_future = self._future
129+
merged_future: Future[dict[str, Any]] = Future()
130+
131+
def _on_complete(f: Future[dict[str, Any]]) -> None:
132+
try:
133+
result = f.result()
134+
result.update(dict_other)
135+
merged_future.set_result(result)
136+
except Exception as e:
137+
merged_future.set_exception(e)
138+
139+
original_future.add_done_callback(_on_complete)
140+
self._future = merged_future
141+
142+
def is_resolved(self) -> bool:
143+
"""Check if metrics are available without blocking."""
144+
return self._resolved
145+
146+
def __bool__(self) -> bool:
147+
"""Always True. Prevents ``metrics or {}`` from replacing DeferrableMetrics."""
148+
return True
149+
150+
def __repr__(self) -> str:
151+
if self._resolved:
152+
return f"DeferrableMetrics(resolved, {len(self._data)} keys)"
153+
return "DeferrableMetrics(pending)"

torchrec/metrics/metrics_output_util.py

Lines changed: 0 additions & 133 deletions
This file was deleted.

0 commit comments

Comments
 (0)