Skip to content

Commit ff03a51

Browse files
Revert "Fix LaterGauge metrics to collect from all servers (#18751)" (#18789)
This PR reverts #18751 ### Why revert? @reivilibre [found](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$u9OEmMxaFYUzWHhCk1A_r50Y0aGrtKEhepF7WxWJkUA?via=matrix.org&via=node.marinchik.ink&via=element.io) that our CI was failing in bizarre ways (thanks for stepping up to dive into this 🙇). Examples: - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9.` - `twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15.` <details> <summary>More detailed part of the log</summary> https://github.com/element-hq/synapse/actions/runs/16758038107/job/47500520633#step:9:6809 ``` tests.util.test_wheel_timer.WheelTimerTestCase.test_single_insert_fetch =============================================================================== Error: Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/disttrial.py", line 371, in task await worker.run(case, result) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 305, in run return await self.callRemote(workercommands.Run, testCase=testCaseId) # type: ignore[no-any-return] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1092, in _runCallbacks current.result = callback( # type: ignore[misc] File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/protocols/amp.py", line 1968, in _massageError error.trap(RemoteAmpError) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 431, in trap self.raiseException() File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 455, in raiseException raise self.value.with_traceback(self.tb) twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 9. tests.util.test_macaroons.MacaroonGeneratorTestCase.test_guest_access_token ------------------------------------------------------------------------------- Ran 4325 tests in 669.321s FAILED (skips=159, errors=62, successes=4108) while calling from thread Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 1064, in runUntilCurrent f(*a, **kw) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/base.py", line 790, in stop raise error.ReactorNotRunning("Can't stop reactor that isn't running.") twisted.internet.error.ReactorNotRunning: Can't stop reactor that isn't running. joining disttrial worker #0 failed Traceback (most recent call last): File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1853, in _inlineCallbacks result = context.run( File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/python/failure.py", line 467, in throwExceptionIntoGenerator return g.throw(self.value.with_traceback(self.tb)) File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/trial/_dist/worker.py", line 406, in exit await endDeferred File "/home/runner/.cache/pypoetry/virtualenvs/matrix-synapse-pswDeSvb-py3.9/lib/python3.9/site-packages/twisted/internet/defer.py", line 1187, in __iter__ yield self twisted.internet.error.ProcessTerminated: A process has ended with a probable error condition: process ended by signal 15. ``` </details> With more debugging (thanks @devonh for also stepping in as maintainer), we were finding that the CI was consistently failing at `test_exposed_to_prometheus` which was a bit of smoke because of all of the [metrics changes](#18592) that were merged recently. Locally, although I wasn't able to reproduce the bizarre errors, I could easily see increased memory usage (~20GB vs ~2GB) and the `test_exposed_to_prometheus` test taking a while to complete when running a full test run (`SYNAPSE_TEST_LOG_LEVEL=INFO poetry run trial tests`). <img width="1485" height="78" alt="Lots of memory usage" src="https://github.com/user-attachments/assets/811e2a96-75e5-4a3c-966c-00dc0512cea9" /> After updating `test_exposed_to_prometheus` to dump the `latest_metrics_response = generate_latest(REGISTRY)`, I could see that it's a massive 3.2GB response. Inspecting the contents, we can see 4.1M (4,137,123) entries for just `synapse_background_update_status{server_name="test"} 3.0` which is a `LaterGauge`. I don't think we have 4.1M test cases so it's also unclear why we end up with so many samples but it does make sense that we do see a lot of duplicates because each `HomeserverTestCase` will create a homeserver for each test case that will `LaterGauge.register_hook(...)` (part of the #18751 changes). `tests/storage/databases/main/test_metrics.py` ```python latest_metrics_response = generate_latest(REGISTRY) with open("/tmp/synapse-test-metrics", "wb") as f: f.write(latest_metrics_response) ``` After reverting the #18751 changes, running the full test suite locally doesn't result in memory spikes and seems to run normally. ### Dev notes Discussion in the [`#synapse-dev:matrix.org`](https://matrix.to/#/!vcyiEtMVHIhWXcJAfl:sw1v.org/$vkMATs04yqZggVVd6Noop5nU8M2DVoTkrAWshw7u1-w?via=matrix.org&via=node.marinchik.ink&via=element.io) room. ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [x] Pull request is based on the develop branch * [ ] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters))
1 parent 6514381 commit ff03a51

File tree

14 files changed

+141
-241
lines changed

14 files changed

+141
-241
lines changed

changelog.d/18751.misc

Lines changed: 0 additions & 1 deletion
This file was deleted.

synapse/federation/send_queue.py

Lines changed: 15 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
"""
3838

3939
import logging
40-
from enum import Enum
4140
from typing import (
4241
TYPE_CHECKING,
4342
Dict,
@@ -68,25 +67,6 @@
6867
logger = logging.getLogger(__name__)
6968

7069

71-
class QueueNames(str, Enum):
72-
PRESENCE_MAP = "presence_map"
73-
KEYED_EDU = "keyed_edu"
74-
KEYED_EDU_CHANGED = "keyed_edu_changed"
75-
EDUS = "edus"
76-
POS_TIME = "pos_time"
77-
PRESENCE_DESTINATIONS = "presence_destinations"
78-
79-
80-
queue_name_to_gauge_map: Dict[QueueNames, LaterGauge] = {}
81-
82-
for queue_name in QueueNames:
83-
queue_name_to_gauge_map[queue_name] = LaterGauge(
84-
name=f"synapse_federation_send_queue_{queue_name.value}_size",
85-
desc="",
86-
labelnames=[SERVER_NAME_LABEL],
87-
)
88-
89-
9070
class FederationRemoteSendQueue(AbstractFederationSender):
9171
"""A drop in replacement for FederationSender"""
9272

@@ -131,15 +111,23 @@ def __init__(self, hs: "HomeServer"):
131111
# we make a new function, so we need to make a new function so the inner
132112
# lambda binds to the queue rather than to the name of the queue which
133113
# changes. ARGH.
134-
def register(queue_name: QueueNames, queue: Sized) -> None:
135-
queue_name_to_gauge_map[queue_name].register_hook(
136-
lambda: {(self.server_name,): len(queue)}
114+
def register(name: str, queue: Sized) -> None:
115+
LaterGauge(
116+
name="synapse_federation_send_queue_%s_size" % (queue_name,),
117+
desc="",
118+
labelnames=[SERVER_NAME_LABEL],
119+
caller=lambda: {(self.server_name,): len(queue)},
137120
)
138121

139-
for queue_name in QueueNames:
140-
queue = getattr(self, queue_name.value)
141-
assert isinstance(queue, Sized)
142-
register(queue_name, queue=queue)
122+
for queue_name in [
123+
"presence_map",
124+
"keyed_edu",
125+
"keyed_edu_changed",
126+
"edus",
127+
"pos_time",
128+
"presence_destinations",
129+
]:
130+
register(queue_name, getattr(self, queue_name))
143131

144132
self.clock.looping_call(self._clear_queue, 30 * 1000)
145133

synapse/federation/sender/__init__.py

Lines changed: 19 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -199,24 +199,6 @@
199199
labelnames=[SERVER_NAME_LABEL],
200200
)
201201

202-
transaction_queue_pending_destinations_gauge = LaterGauge(
203-
name="synapse_federation_transaction_queue_pending_destinations",
204-
desc="",
205-
labelnames=[SERVER_NAME_LABEL],
206-
)
207-
208-
transaction_queue_pending_pdus_gauge = LaterGauge(
209-
name="synapse_federation_transaction_queue_pending_pdus",
210-
desc="",
211-
labelnames=[SERVER_NAME_LABEL],
212-
)
213-
214-
transaction_queue_pending_edus_gauge = LaterGauge(
215-
name="synapse_federation_transaction_queue_pending_edus",
216-
desc="",
217-
labelnames=[SERVER_NAME_LABEL],
218-
)
219-
220202
# Time (in s) to wait before trying to wake up destinations that have
221203
# catch-up outstanding.
222204
# Please note that rate limiting still applies, so while the loop is
@@ -416,28 +398,38 @@ def __init__(self, hs: "HomeServer"):
416398
# map from destination to PerDestinationQueue
417399
self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
418400

419-
transaction_queue_pending_destinations_gauge.register_hook(
420-
lambda: {
401+
LaterGauge(
402+
name="synapse_federation_transaction_queue_pending_destinations",
403+
desc="",
404+
labelnames=[SERVER_NAME_LABEL],
405+
caller=lambda: {
421406
(self.server_name,): sum(
422407
1
423408
for d in self._per_destination_queues.values()
424409
if d.transmission_loop_running
425410
)
426-
}
411+
},
427412
)
428-
transaction_queue_pending_pdus_gauge.register_hook(
429-
lambda: {
413+
414+
LaterGauge(
415+
name="synapse_federation_transaction_queue_pending_pdus",
416+
desc="",
417+
labelnames=[SERVER_NAME_LABEL],
418+
caller=lambda: {
430419
(self.server_name,): sum(
431420
d.pending_pdu_count() for d in self._per_destination_queues.values()
432421
)
433-
}
422+
},
434423
)
435-
transaction_queue_pending_edus_gauge.register_hook(
436-
lambda: {
424+
LaterGauge(
425+
name="synapse_federation_transaction_queue_pending_edus",
426+
desc="",
427+
labelnames=[SERVER_NAME_LABEL],
428+
caller=lambda: {
437429
(self.server_name,): sum(
438430
d.pending_edu_count() for d in self._per_destination_queues.values()
439431
)
440-
}
432+
},
441433
)
442434

443435
self._is_processing = False

synapse/handlers/presence.py

Lines changed: 10 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -173,18 +173,6 @@
173173
labelnames=["locality", "from", "to", SERVER_NAME_LABEL],
174174
)
175175

176-
presence_user_to_current_state_size_gauge = LaterGauge(
177-
name="synapse_handlers_presence_user_to_current_state_size",
178-
desc="",
179-
labelnames=[SERVER_NAME_LABEL],
180-
)
181-
182-
presence_wheel_timer_size_gauge = LaterGauge(
183-
name="synapse_handlers_presence_wheel_timer_size",
184-
desc="",
185-
labelnames=[SERVER_NAME_LABEL],
186-
)
187-
188176
# If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
189177
# "currently_active"
190178
LAST_ACTIVE_GRANULARITY = 60 * 1000
@@ -791,8 +779,11 @@ def __init__(self, hs: "HomeServer"):
791779
EduTypes.PRESENCE, self.incoming_presence
792780
)
793781

794-
presence_user_to_current_state_size_gauge.register_hook(
795-
lambda: {(self.server_name,): len(self.user_to_current_state)}
782+
LaterGauge(
783+
name="synapse_handlers_presence_user_to_current_state_size",
784+
desc="",
785+
labelnames=[SERVER_NAME_LABEL],
786+
caller=lambda: {(self.server_name,): len(self.user_to_current_state)},
796787
)
797788

798789
# The per-device presence state, maps user to devices to per-device presence state.
@@ -891,8 +882,11 @@ def __init__(self, hs: "HomeServer"):
891882
60 * 1000,
892883
)
893884

894-
presence_wheel_timer_size_gauge.register_hook(
895-
lambda: {(self.server_name,): len(self.wheel_timer)}
885+
LaterGauge(
886+
name="synapse_handlers_presence_wheel_timer_size",
887+
desc="",
888+
labelnames=[SERVER_NAME_LABEL],
889+
caller=lambda: {(self.server_name,): len(self.wheel_timer)},
896890
)
897891

898892
# Used to handle sending of presence to newly joined users/servers

synapse/http/request_metrics.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,12 +164,12 @@ def _get_in_flight_counts() -> Mapping[Tuple[str, ...], int]:
164164
return counts
165165

166166

167-
in_flight_requests = LaterGauge(
167+
LaterGauge(
168168
name="synapse_http_server_in_flight_requests_count",
169169
desc="",
170170
labelnames=["method", "servlet", SERVER_NAME_LABEL],
171+
caller=_get_in_flight_counts,
171172
)
172-
in_flight_requests.register_hook(_get_in_flight_counts)
173173

174174

175175
class RequestMetrics:

synapse/metrics/__init__.py

Lines changed: 36 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
Dict,
3232
Generic,
3333
Iterable,
34-
List,
3534
Mapping,
3635
Optional,
3736
Sequence,
@@ -74,6 +73,8 @@
7473

7574
METRICS_PREFIX = "/_synapse/metrics"
7675

76+
all_gauges: Dict[str, Collector] = {}
77+
7778
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
7879

7980
SERVER_NAME_LABEL = "server_name"
@@ -162,47 +163,42 @@ class LaterGauge(Collector):
162163
name: str
163164
desc: str
164165
labelnames: Optional[StrSequence] = attr.ib(hash=False)
165-
# List of callbacks: each callback should either return a value (if there are no
166-
# labels for this metric), or dict mapping from a label tuple to a value
167-
_hooks: List[
168-
Callable[
169-
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
170-
]
171-
] = attr.ib(factory=list, hash=False)
166+
# callback: should either return a value (if there are no labels for this metric),
167+
# or dict mapping from a label tuple to a value
168+
caller: Callable[
169+
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
170+
]
172171

173172
def collect(self) -> Iterable[Metric]:
174173
# The decision to add `SERVER_NAME_LABEL` is from the `LaterGauge` usage itself
175174
# (we don't enforce it here, one level up).
176175
g = GaugeMetricFamily(self.name, self.desc, labels=self.labelnames) # type: ignore[missing-server-name-label]
177176

178-
for hook in self._hooks:
179-
try:
180-
hook_result = hook()
181-
except Exception:
182-
logger.exception(
183-
"Exception running callback for LaterGauge(%s)", self.name
184-
)
185-
yield g
186-
return
187-
188-
if isinstance(hook_result, (int, float)):
189-
g.add_metric([], hook_result)
190-
else:
191-
for k, v in hook_result.items():
192-
g.add_metric(k, v)
193-
177+
try:
178+
calls = self.caller()
179+
except Exception:
180+
logger.exception("Exception running callback for LaterGauge(%s)", self.name)
194181
yield g
182+
return
195183

196-
def register_hook(
197-
self,
198-
hook: Callable[
199-
[], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
200-
],
201-
) -> None:
202-
self._hooks.append(hook)
184+
if isinstance(calls, (int, float)):
185+
g.add_metric([], calls)
186+
else:
187+
for k, v in calls.items():
188+
g.add_metric(k, v)
189+
190+
yield g
203191

204192
def __attrs_post_init__(self) -> None:
193+
self._register()
194+
195+
def _register(self) -> None:
196+
if self.name in all_gauges.keys():
197+
logger.warning("%s already registered, reregistering", self.name)
198+
REGISTRY.unregister(all_gauges.pop(self.name))
199+
205200
REGISTRY.register(self)
201+
all_gauges[self.name] = self
206202

207203

208204
# `MetricsEntry` only makes sense when it is a `Protocol`,
@@ -254,7 +250,7 @@ def __init__(
254250
# Protects access to _registrations
255251
self._lock = threading.Lock()
256252

257-
REGISTRY.register(self)
253+
self._register_with_collector()
258254

259255
def register(
260256
self,
@@ -345,6 +341,14 @@ def collect(self) -> Iterable[Metric]:
345341
gauge.add_metric(labels=key, value=getattr(metrics, name))
346342
yield gauge
347343

344+
def _register_with_collector(self) -> None:
345+
if self.name in all_gauges.keys():
346+
logger.warning("%s already registered, reregistering", self.name)
347+
REGISTRY.unregister(all_gauges.pop(self.name))
348+
349+
REGISTRY.register(self)
350+
all_gauges[self.name] = self
351+
348352

349353
class GaugeHistogramMetricFamilyWithLabels(GaugeHistogramMetricFamily):
350354
"""

synapse/notifier.py

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -86,24 +86,6 @@
8686
labelnames=["stream", SERVER_NAME_LABEL],
8787
)
8888

89-
90-
notifier_listeners_gauge = LaterGauge(
91-
name="synapse_notifier_listeners",
92-
desc="",
93-
labelnames=[SERVER_NAME_LABEL],
94-
)
95-
96-
notifier_rooms_gauge = LaterGauge(
97-
name="synapse_notifier_rooms",
98-
desc="",
99-
labelnames=[SERVER_NAME_LABEL],
100-
)
101-
notifier_users_gauge = LaterGauge(
102-
name="synapse_notifier_users",
103-
desc="",
104-
labelnames=[SERVER_NAME_LABEL],
105-
)
106-
10789
T = TypeVar("T")
10890

10991

@@ -299,16 +281,28 @@ def count_listeners() -> Mapping[Tuple[str, ...], int]:
299281
)
300282
}
301283

302-
notifier_listeners_gauge.register_hook(count_listeners)
303-
notifier_rooms_gauge.register_hook(
304-
lambda: {
284+
LaterGauge(
285+
name="synapse_notifier_listeners",
286+
desc="",
287+
labelnames=[SERVER_NAME_LABEL],
288+
caller=count_listeners,
289+
)
290+
291+
LaterGauge(
292+
name="synapse_notifier_rooms",
293+
desc="",
294+
labelnames=[SERVER_NAME_LABEL],
295+
caller=lambda: {
305296
(self.server_name,): count(
306297
bool, list(self.room_to_user_streams.values())
307298
)
308-
}
299+
},
309300
)
310-
notifier_users_gauge.register_hook(
311-
lambda: {(self.server_name,): len(self.user_to_user_stream)}
301+
LaterGauge(
302+
name="synapse_notifier_users",
303+
desc="",
304+
labelnames=[SERVER_NAME_LABEL],
305+
caller=lambda: {(self.server_name,): len(self.user_to_user_stream)},
312306
)
313307

314308
def add_replication_callback(self, cb: Callable[[], None]) -> None:

0 commit comments

Comments
 (0)