Skip to content

Commit 1b7807a

Browse files
Fix /sync caching transient errors for the sync_response_cache_duration. (#19845)
Fixes: #19844 Concretely, this changes `ResponseCache` to unset cache entries once they resolve to a `Failure`. --------- Signed-off-by: Olivier 'reivilibre <oliverw@matrix.org> Co-authored-by: Eric Eastwood <erice@element.io>
1 parent 64e09f1 commit 1b7807a

4 files changed

Lines changed: 93 additions & 9 deletions

File tree

changelog.d/19845.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix `/sync` caching transient errors for the `sync_response_cache_duration`.

synapse/util/async_helpers.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
9898
deferred.
9999
100100
If consumeErrors is true errors will be captured from the origin deferred.
101+
In that case, errors will NOT be propagated onto any errbacks added to this
102+
`ObservableDeferred`; instead the success callbacks will be called with `None`.
101103
102104
Cancelling or otherwise resolving an observer will not affect the original
103105
ObservableDeferred.

synapse/util/caches/response_cache.py

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import attr
3333

3434
from twisted.internet import defer
35+
from twisted.python.failure import Failure
3536

3637
from synapse.logging.context import make_deferred_yieldable, run_in_background
3738
from synapse.logging.opentracing import (
@@ -226,14 +227,19 @@ def _set(
226227
Returns:
227228
The cache entry object.
228229
"""
229-
result = ObservableDeferred(deferred, consumeErrors=True)
230+
result = ObservableDeferred(
231+
deferred,
232+
# We set `consumeErrors=False` as we want to handle errors ourselves (`on_fail`) instead of
233+
# replacing them with a `None` successful result that would go to `on_succeed`
234+
consumeErrors=False,
235+
)
230236
key = context.cache_key
231237
entry = ResponseCacheEntry(
232238
result, opentracing_span_context, cancellable=cancellable
233239
)
234240
self._result_cache[key] = entry
235241

236-
def on_complete(r: RV) -> RV:
242+
def on_succeed(r: RV) -> RV:
237243
# if this cache has a non-zero timeout, and the callback has not cleared
238244
# the should_cache bit, we leave it in the cache for now and schedule
239245
# its removal later.
@@ -254,10 +260,29 @@ def on_complete(r: RV) -> RV:
254260
self.unset(key)
255261
return r
256262

257-
# make sure we do this *after* adding the entry to result_cache,
258-
# in case the result is already complete (in which case flipping the order would
259-
# leave us with a stuck entry in the cache).
260-
result.addBoth(on_complete)
263+
def on_fail(failure: Failure) -> None:
264+
"""
265+
If the deferred fails, unset the cache entry.
266+
"""
267+
self.unset(key)
268+
269+
# Consider the Failure handled so they don't get thrown by the reactor
270+
return None
271+
272+
# Two ordering constraints to be aware of for registering the callback and errback:
273+
# 1. Both of them must be registered _after_ adding the entry to the result_cache,
274+
# otherwise it is possible for them to trigger immediately (before the entry
275+
# is added to the result_cache), the net effect of which is that it would leave
276+
# us with a stuck entry in the cache.
277+
# 2. We must register the errback after callback.
278+
# If the errback was registered first, `on_fail` returning `None` would
279+
# cause `on_succeed` to be called with that `None` as argument.
280+
# This would start a timer to remove a cache entry, even though there isn't a
281+
# valid cache entry yet.
282+
# (This could prematurely remove a future cache entry with the same key.)
283+
result.addCallback(on_succeed)
284+
result.addErrback(on_fail)
285+
261286
return entry
262287

263288
def unset(self, key: KV) -> None:

tests/util/caches/test_response_cache.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#
44
# Copyright 2021 The Matrix.org Foundation C.I.C.
55
# Copyright (C) 2023 New Vector, Ltd
6+
# Copyright (C) 2026 Element Creations Ltd.
67
#
78
# This program is free software: you can redistribute it and/or modify
89
# it under the terms of the GNU Affero General Public License as
@@ -15,7 +16,8 @@
1516
# Originally licensed under the Apache License, Version 2.0:
1617
# <http://www.apache.org/licenses/LICENSE-2.0>.
1718
#
18-
# [This file includes modifications made by New Vector Limited]
19+
# [This file includes modifications made by New Vector Limited
20+
# and Element Creations Ltd]
1921
#
2022
#
2123

@@ -234,9 +236,9 @@ async def non_caching(o: str, cache_context: ResponseCacheContext[int]) -> str:
234236
[], cache.keys(), "cache should not have the result now"
235237
)
236238

237-
def test_cache_func_errors(self) -> None:
239+
def test_errors_raised_to_all_waiters(self) -> None:
238240
"""If the callback raises an error, the error should be raised to all
239-
callers and the result should not be cached"""
241+
concurrent callers that were waiting on the same in-flight result."""
240242
cache = self.with_cache("error_cache", ms=3000)
241243

242244
expected_error = Exception("oh no")
@@ -259,6 +261,60 @@ async def erring(o: str) -> str:
259261
self.assertFailure(wrap_d, Exception)
260262
self.assertFailure(wrap2_d, Exception)
261263

264+
def test_errors_are_not_cached(self) -> None:
265+
"""If the callback raises an error, the error is not cached and
266+
served to any subsequent requests.
267+
"""
268+
cache = self.with_cache("error_not_cached", ms=3000)
269+
270+
return_error = True
271+
272+
REQUEST_FAKE_WORK_SLEEP_TIME = Duration(seconds=1)
273+
274+
async def erring_then_fine(_: str) -> str:
275+
"""
276+
This function raises an error the first time it is called,
277+
then is fine the next time it is called.
278+
"""
279+
nonlocal return_error
280+
281+
# pretend to do some work
282+
await self.clock.sleep(REQUEST_FAKE_WORK_SLEEP_TIME)
283+
284+
if return_error:
285+
return_error = False
286+
raise RuntimeError("this is a temporary error!")
287+
return "fine"
288+
289+
# First call: should get an error
290+
wrap_d = defer.ensureDeferred(cache.wrap(0, erring_then_fine, "ignored"))
291+
292+
# Should be pending
293+
self.assertNoResult(wrap_d)
294+
295+
# Wait for the time it takes for the request to resolve to an error
296+
self.reactor.advance(REQUEST_FAKE_WORK_SLEEP_TIME.as_secs())
297+
298+
# Check that the Deferred resolved to an error of the correct type
299+
self.failureResultOf(wrap_d, RuntimeError)
300+
301+
# Second call: the error shouldn't be replayed
302+
# and the next call should succeed, so we should get a successful response
303+
wrap2_d = defer.ensureDeferred(cache.wrap(0, erring_then_fine, "ignored"))
304+
305+
# Since this is a new request (not coalesced with the previous failed one),
306+
# this should still be pending
307+
self.assertNoResult(wrap2_d)
308+
309+
# Wait for the time it takes for the request to resolve
310+
self.reactor.advance(REQUEST_FAKE_WORK_SLEEP_TIME.as_secs())
311+
312+
self.assertEqual(
313+
self.successResultOf(wrap2_d),
314+
"fine",
315+
"should get the fresh result, not the cached error",
316+
)
317+
262318
def test_cache_cancel_first_wait(self) -> None:
263319
"""Test that cancellation of the deferred returned by wrap() on the
264320
first call does not immediately cause a cancellation error to be raised

0 commit comments

Comments
 (0)