Skip to content

Commit edbba7f

Browse files
committed
Fix permanent badge inflation from read receipts before first rotation
_handle_new_receipts_for_notifs_txn used simple_update_txn to record last_receipt_stream_ordering, which silently no-ops if no summary row exists yet. _rotate_notifs_before_txn then INSERTed the row with last_receipt_stream_ordering=NULL. The badge query treats NULL as "trust the counts", and since highlights survive the receipt DELETE, they get re-counted on every rotation, growing the badge unboundedly. Pre-populate event_push_summary rows with the receipt position for every thread with pending push actions, and switch the threaded path to upsert. A SQL migration backfills existing NULL rows from receipts_linearized.
1 parent 3f0f03d commit edbba7f

4 files changed

Lines changed: 218 additions & 2 deletions

File tree

changelog.d/19785.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Fix a long-standing bug where the badge notification count for a room could become permanently inflated if a read receipt was sent before the room's notification counts were first summarised.

synapse/storage/databases/main/event_push_actions.py

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1509,6 +1509,41 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
15091509
"last_receipt_stream_ordering": stream_ordering,
15101510
},
15111511
)
1512+
# If no summary row exists yet for a thread that has pending push
1513+
# actions (room active but not yet through a rotation cycle), the
1514+
# UPDATE above is a silent no-op for that thread and
1515+
# last_receipt_stream_ordering is never persisted.
1516+
# _rotate_notifs_before_txn would then INSERT the row with
1517+
# last_receipt_stream_ordering=NULL, causing the badge query to
1518+
# include every event before the receipt as unread. Pre-populate
1519+
# rows for every thread with pending push actions so rotation
1520+
# only counts events that arrive after this receipt.
1521+
txn.execute(
1522+
"""
1523+
SELECT DISTINCT thread_id
1524+
FROM event_push_actions
1525+
WHERE user_id = ? AND room_id = ?
1526+
""",
1527+
(user_id, room_id),
1528+
)
1529+
pending_thread_ids = [row[0] for row in txn]
1530+
for pending_thread_id in pending_thread_ids:
1531+
self.db_pool.simple_upsert_txn(
1532+
txn,
1533+
table="event_push_summary",
1534+
keyvalues={
1535+
"user_id": user_id,
1536+
"room_id": room_id,
1537+
"thread_id": pending_thread_id,
1538+
},
1539+
values={},
1540+
insertion_values={
1541+
"notif_count": 0,
1542+
"unread_count": 0,
1543+
"stream_ordering": old_rotate_stream_ordering,
1544+
"last_receipt_stream_ordering": stream_ordering,
1545+
},
1546+
)
15121547

15131548
# For a threaded receipt, we *always* want to update that receipt,
15141549
# event if there are no new notifications in that thread. This ensures
@@ -1517,8 +1552,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
15171552
unread_counts = [(0, 0, thread_id)]
15181553

15191554
# Then any updated threads get their notification count and unread
1520-
# count updated.
1521-
self.db_pool.simple_update_many_txn(
1555+
# count updated. Use upsert so that a row is created if none exists
1556+
# yet (same race as the unthreaded case above: without this, rotation
1557+
# would INSERT with last_receipt_stream_ordering=NULL).
1558+
self.db_pool.simple_upsert_many_txn(
15221559
txn,
15231560
table="event_push_summary",
15241561
key_names=("room_id", "user_id", "thread_id"),
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
--
2+
-- This file is licensed under the Affero General Public License (AGPL) version 3.
3+
--
4+
-- Copyright (C) 2026 New Vector Ltd
5+
--
6+
-- This program is free software: you can redistribute it and/or modify
7+
-- it under the terms of the GNU Affero General Public License as
8+
-- published by the Free Software Foundation, either version 3 of the
9+
-- License, or (at your option) any later version.
10+
--
11+
-- See the GNU Affero General Public License for more details:
12+
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
13+
14+
-- Backfill last_receipt_stream_ordering for event_push_summary rows created
15+
-- with last_receipt_stream_ordering=NULL by the rotation job before the code
16+
-- fix in _handle_new_receipts_for_notifs_txn was applied.
17+
--
18+
-- NULL has a dual meaning in this column (see the schema delta that added it):
19+
-- 1. Legacy rows from old Synapse that maintained counts synchronously.
20+
-- 2. Bug-affected rows where the receipt UPDATE was a silent no-op.
21+
--
22+
-- For a given event_push_summary row, the relevant receipts are unthreaded
23+
-- receipts (cover all threads) and the threaded receipt for that thread.
24+
--
25+
-- For both kinds of stale row, if stream_ordering <= the max relevant receipt
26+
-- then every event in the summary predates the receipt and the counts should
27+
-- be zero. If stream_ordering > the max receipt, some events after the
28+
-- receipt are included; we set last_receipt_stream_ordering but leave the
29+
-- count (it may be inflated, but will self-correct when the user next reads
30+
-- the room).
31+
UPDATE event_push_summary
32+
SET last_receipt_stream_ordering = (
33+
SELECT MAX(r.event_stream_ordering)
34+
FROM receipts_linearized AS r
35+
WHERE r.user_id = event_push_summary.user_id
36+
AND r.room_id = event_push_summary.room_id
37+
AND r.receipt_type IN ('m.read', 'm.read.private')
38+
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
39+
),
40+
notif_count = CASE
41+
WHEN stream_ordering <= (
42+
SELECT MAX(r.event_stream_ordering)
43+
FROM receipts_linearized AS r
44+
WHERE r.user_id = event_push_summary.user_id
45+
AND r.room_id = event_push_summary.room_id
46+
AND r.receipt_type IN ('m.read', 'm.read.private')
47+
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
48+
) THEN 0
49+
ELSE notif_count
50+
END,
51+
unread_count = CASE
52+
WHEN stream_ordering <= (
53+
SELECT MAX(r.event_stream_ordering)
54+
FROM receipts_linearized AS r
55+
WHERE r.user_id = event_push_summary.user_id
56+
AND r.room_id = event_push_summary.room_id
57+
AND r.receipt_type IN ('m.read', 'm.read.private')
58+
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
59+
) THEN 0
60+
ELSE unread_count
61+
END
62+
WHERE last_receipt_stream_ordering IS NULL
63+
AND EXISTS (
64+
SELECT 1 FROM receipts_linearized AS r
65+
WHERE r.user_id = event_push_summary.user_id
66+
AND r.room_id = event_push_summary.room_id
67+
AND r.receipt_type IN ('m.read', 'm.read.private')
68+
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
69+
);

tests/storage/test_event_push_actions.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,115 @@ def _mark_read(event_id: str) -> None:
286286
_rotate()
287287
_assert_counts(0, 0)
288288

289+
def test_count_aggregation_receipt_before_first_rotation(self) -> None:
290+
"""
291+
Regression test: reading a highlight before the first rotation must not
292+
permanently inflate the badge count.
293+
294+
Highlights survive the receipt-triggered DELETE (highlight=1), so if
295+
last_receipt_stream_ordering is NULL when rotation creates the summary
296+
row, the badge query re-counts them forever.
297+
"""
298+
user_id, token, _, other_token, room_id = self._create_users_and_room()
299+
300+
def _assert_badge(expected: int) -> None:
301+
counts = self.get_success(
302+
self.store.db_pool.runInteraction(
303+
"get-aggregate-unread-counts",
304+
self.store._get_unread_counts_by_room_for_user_txn,
305+
user_id,
306+
)
307+
)
308+
self.assertEqual(counts.get(room_id, 0), expected)
309+
310+
def _send(highlight: bool = False) -> str:
311+
return self.helper.send_event(
312+
room_id,
313+
type="m.room.message",
314+
content={"msgtype": "m.text", "body": user_id if highlight else "msg"},
315+
tok=other_token,
316+
)["event_id"]
317+
318+
def _read(event_id: str) -> None:
319+
self.get_success(
320+
self.store.insert_receipt(
321+
room_id,
322+
"m.read",
323+
user_id=user_id,
324+
event_ids=[event_id],
325+
thread_id=None,
326+
data={},
327+
)
328+
)
329+
330+
# Highlight arrives; user reads it before any rotation (no summary row exists).
331+
_read(_send(highlight=True))
332+
# One new event after the receipt makes stream_ordering > max_clause true.
333+
_send()
334+
# Without the fix: badge = 2 (highlight re-counted). With fix: badge = 1.
335+
self.get_success(self.store._rotate_notifs())
336+
_assert_badge(1)
337+
338+
def test_count_aggregation_receipt_before_first_rotation_in_thread(self) -> None:
339+
"""
340+
Same regression as test_count_aggregation_receipt_before_first_rotation,
341+
but for a highlight inside a thread cleared by an unthreaded receipt.
342+
343+
The fix must pre-populate event_push_summary for every thread with
344+
pending push actions, not just MAIN_TIMELINE.
345+
"""
346+
user_id, token, _, other_token, room_id = self._create_users_and_room()
347+
348+
def _assert_badge(expected: int) -> None:
349+
counts = self.get_success(
350+
self.store.db_pool.runInteraction(
351+
"get-aggregate-unread-counts",
352+
self.store._get_unread_counts_by_room_for_user_txn,
353+
user_id,
354+
)
355+
)
356+
self.assertEqual(counts.get(room_id, 0), expected)
357+
358+
def _send(thread_root: str | None = None, highlight: bool = False) -> str:
359+
content: JsonDict = {
360+
"msgtype": "m.text",
361+
"body": user_id if highlight else "msg",
362+
}
363+
if thread_root is not None:
364+
content["m.relates_to"] = {
365+
"rel_type": RelationTypes.THREAD,
366+
"event_id": thread_root,
367+
}
368+
return self.helper.send_event(
369+
room_id,
370+
type="m.room.message",
371+
content=content,
372+
tok=other_token,
373+
)["event_id"]
374+
375+
def _read(event_id: str) -> None:
376+
self.get_success(
377+
self.store.insert_receipt(
378+
room_id,
379+
"m.read",
380+
user_id=user_id,
381+
event_ids=[event_id],
382+
thread_id=None,
383+
data={},
384+
)
385+
)
386+
387+
# A thread root, then a highlight inside that thread.
388+
thread_root = _send()
389+
thread_highlight = _send(thread_root=thread_root, highlight=True)
390+
# User reads everything with an unthreaded receipt before any rotation.
391+
_read(thread_highlight)
392+
# A new event in the same thread makes stream_ordering > max_clause true.
393+
_send(thread_root=thread_root)
394+
# Without the fix: badge = 2 (thread highlight re-counted). With: badge = 1.
395+
self.get_success(self.store._rotate_notifs())
396+
_assert_badge(1)
397+
289398
def test_count_aggregation_threads(self) -> None:
290399
"""
291400
This is essentially the same test as test_count_aggregation, but adds

0 commit comments

Comments
 (0)