Skip to content

Commit 7c633f1

Browse files
devonherikjohnstonreivilibreMadLittleMods
authored
Pass leave from remote invite rejection down Sliding Sync (#18375)
Fixes #17753 ### Dev notes The `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` database tables were added in #17512 ### Pull Request Checklist <!-- Please read https://element-hq.github.io/synapse/latest/development/contributing_guide.html before submitting your pull request --> * [X] Pull request is based on the develop branch * [x] Pull request includes a [changelog file](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#changelog). The entry should: - Be a short description of your change which makes sense to users. "Fixed a bug that prevented receiving messages from other servers." instead of "Moved X method from `EventStore` to `EventWorkerStore`.". - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [X] [Code style](https://element-hq.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://element-hq.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) --------- Co-authored-by: Erik Johnston <[email protected]> Co-authored-by: Olivier 'reivilibre <[email protected]> Co-authored-by: Eric Eastwood <[email protected]>
1 parent ae877aa commit 7c633f1

File tree

7 files changed

+360
-1
lines changed

7 files changed

+360
-1
lines changed

changelog.d/18375.bugfix

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Pass leave from remote invite rejection down Sliding Sync.

synapse/handlers/sliding_sync/__init__.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,7 @@ async def handle_room(room_id: str) -> None:
271271
from_token=from_token,
272272
to_token=to_token,
273273
newly_joined=room_id in interested_rooms.newly_joined_rooms,
274+
newly_left=room_id in interested_rooms.newly_left_rooms,
274275
is_dm=room_id in interested_rooms.dm_room_ids,
275276
)
276277

@@ -542,6 +543,7 @@ async def get_room_sync_data(
542543
from_token: Optional[SlidingSyncStreamToken],
543544
to_token: StreamToken,
544545
newly_joined: bool,
546+
newly_left: bool,
545547
is_dm: bool,
546548
) -> SlidingSyncResult.RoomResult:
547549
"""
@@ -559,6 +561,7 @@ async def get_room_sync_data(
559561
from_token: The point in the stream to sync from.
560562
to_token: The point in the stream to sync up to.
561563
newly_joined: If the user has newly joined the room
564+
newly_left: If the user has newly left the room
562565
is_dm: Whether the room is a DM room
563566
"""
564567
user = sync_config.user
@@ -856,6 +859,26 @@ async def get_room_sync_data(
856859
# TODO: Limit the number of state events we're about to send down
857860
# the room, if its too many we should change this to an
858861
# `initial=True`?
862+
863+
# For the case of rejecting remote invites, the leave event won't be
864+
# returned by `get_current_state_deltas_for_room`. This is due to the current
865+
# state only being filled out for rooms the server is in, and so doesn't pick
866+
# up out-of-band leaves (including locally rejected invites) as these events
867+
# are outliers and not added to the `current_state_delta_stream`.
868+
#
869+
# We rely on being explicitly told that the room has been `newly_left` to
870+
# ensure we extract the out-of-band leave.
871+
if newly_left and room_membership_for_user_at_to_token.event_id is not None:
872+
membership_changed = True
873+
leave_event = await self.store.get_event(
874+
room_membership_for_user_at_to_token.event_id
875+
)
876+
state_key = leave_event.get_state_key()
877+
if state_key is not None:
878+
room_state_delta_id_map[(leave_event.type, state_key)] = (
879+
room_membership_for_user_at_to_token.event_id
880+
)
881+
859882
deltas = await self.get_current_state_deltas_for_room(
860883
room_id=room_id,
861884
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,

synapse/handlers/sliding_sync/room_lists.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1120,7 +1120,7 @@ async def get_room_membership_for_user_at_to_token(
11201120
(
11211121
newly_joined_room_ids,
11221122
newly_left_room_map,
1123-
) = await self._get_newly_joined_and_left_rooms(
1123+
) = await self._get_newly_joined_and_left_rooms_fallback(
11241124
user_id, to_token=to_token, from_token=from_token
11251125
)
11261126

@@ -1176,6 +1176,53 @@ async def _get_newly_joined_and_left_rooms(
11761176
"state reset" out of the room, and so that room would not be part of the
11771177
"current memberships" of the user.
11781178
1179+
Returns:
1180+
A 2-tuple of newly joined room IDs and a map of newly_left room
1181+
IDs to the `RoomsForUserStateReset` entry.
1182+
1183+
We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
1184+
user was state reset of the rooms. It's just that the `event_id`/`sender`
1185+
are optional and we can't tell the difference between the server leaving the
1186+
room when the user was the last person participating in the room and left or
1187+
was state reset out of the room. To actually check for a state reset, you
1188+
need to check if a membership still exists in the room.
1189+
"""
1190+
1191+
newly_joined_room_ids: Set[str] = set()
1192+
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}
1193+
1194+
if not from_token:
1195+
return newly_joined_room_ids, newly_left_room_map
1196+
1197+
changes = await self.store.get_sliding_sync_membership_changes(
1198+
user_id,
1199+
from_key=from_token.room_key,
1200+
to_key=to_token.room_key,
1201+
excluded_room_ids=set(self.rooms_to_exclude_globally),
1202+
)
1203+
1204+
for room_id, entry in changes.items():
1205+
if entry.membership == Membership.JOIN:
1206+
newly_joined_room_ids.add(room_id)
1207+
elif entry.membership == Membership.LEAVE:
1208+
newly_left_room_map[room_id] = entry
1209+
1210+
return newly_joined_room_ids, newly_left_room_map
1211+
1212+
@trace
1213+
async def _get_newly_joined_and_left_rooms_fallback(
1214+
self,
1215+
user_id: str,
1216+
to_token: StreamToken,
1217+
from_token: Optional[StreamToken],
1218+
) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
1219+
"""Fetch the sets of rooms that the user newly joined or left in the
1220+
given token range.
1221+
1222+
Note: there may be rooms in the newly left rooms where the user was
1223+
"state reset" out of the room, and so that room would not be part of the
1224+
"current memberships" of the user.
1225+
11791226
Returns:
11801227
A 2-tuple of newly joined room IDs and a map of newly_left room
11811228
IDs to the `RoomsForUserStateReset` entry.

synapse/storage/databases/main/stream.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
)
8181
from synapse.storage.databases.main.events_worker import EventsWorkerStore
8282
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
83+
from synapse.storage.roommember import RoomsForUserStateReset
8384
from synapse.storage.util.id_generators import MultiWriterIdGenerator
8485
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
8586
from synapse.util.caches.descriptors import cached, cachedList
@@ -993,6 +994,10 @@ async def get_current_state_delta_membership_changes_for_user(
993994
available in the `current_state_delta_stream` table. To actually check for a
994995
state reset, you need to check if a membership still exists in the room.
995996
"""
997+
998+
assert from_key.topological is None
999+
assert to_key.topological is None
1000+
9961001
# Start by ruling out cases where a DB query is not necessary.
9971002
if from_key == to_key:
9981003
return []
@@ -1138,6 +1143,203 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
11381143
if membership_change.room_id not in room_ids_to_exclude
11391144
]
11401145

1146+
@trace
1147+
async def get_sliding_sync_membership_changes(
1148+
self,
1149+
user_id: str,
1150+
from_key: RoomStreamToken,
1151+
to_key: RoomStreamToken,
1152+
excluded_room_ids: Optional[AbstractSet[str]] = None,
1153+
) -> Dict[str, RoomsForUserStateReset]:
1154+
"""
1155+
Fetch membership events that result in a meaningful membership change for a
1156+
given user.
1157+
1158+
A meaningful membership changes is one where the `membership` value actually
1159+
changes. This means memberships changes from `join` to `join` (like a display
1160+
name change) will be filtered out since they result in no meaningful change.
1161+
1162+
Note: This function only works with "live" tokens with `stream_ordering` only.
1163+
1164+
We're looking for membership changes in the token range (> `from_key` and <=
1165+
`to_key`).
1166+
1167+
Args:
1168+
user_id: The user ID to fetch membership events for.
1169+
from_key: The point in the stream to sync from (fetching events > this point).
1170+
to_key: The token to fetch rooms up to (fetching events <= this point).
1171+
excluded_room_ids: Optional list of room IDs to exclude from the results.
1172+
1173+
Returns:
1174+
All meaningful membership changes to the current state in the token range.
1175+
Events are sorted by `stream_ordering` ascending.
1176+
1177+
`event_id`/`sender` can be `None` when the server leaves a room (meaning
1178+
everyone locally left) or a state reset which removed the person from the
1179+
room. We can't tell the difference between the two cases with what's
1180+
available in the `current_state_delta_stream` table. To actually check for a
1181+
state reset, you need to check if a membership still exists in the room.
1182+
"""
1183+
1184+
assert from_key.topological is None
1185+
assert to_key.topological is None
1186+
1187+
# Start by ruling out cases where a DB query is not necessary.
1188+
if from_key == to_key:
1189+
return {}
1190+
1191+
if from_key:
1192+
has_changed = self._membership_stream_cache.has_entity_changed(
1193+
user_id, int(from_key.stream)
1194+
)
1195+
if not has_changed:
1196+
return {}
1197+
1198+
room_ids_to_exclude: AbstractSet[str] = set()
1199+
if excluded_room_ids is not None:
1200+
room_ids_to_exclude = excluded_room_ids
1201+
1202+
def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
1203+
# To handle tokens with a non-empty instance_map we fetch more
1204+
# results than necessary and then filter down
1205+
min_from_id = from_key.stream
1206+
max_to_id = to_key.get_max_stream_pos()
1207+
1208+
# This query looks at membership changes in
1209+
# `sliding_sync_membership_snapshots` which will not include users
1210+
# that were state reset out of rooms; so we need to look for that
1211+
# case in `current_state_delta_stream`.
1212+
sql = """
1213+
SELECT
1214+
room_id,
1215+
membership_event_id,
1216+
event_instance_name,
1217+
event_stream_ordering,
1218+
membership,
1219+
sender,
1220+
prev_membership,
1221+
room_version
1222+
FROM
1223+
(
1224+
SELECT
1225+
s.room_id,
1226+
s.membership_event_id,
1227+
s.event_instance_name,
1228+
s.event_stream_ordering,
1229+
s.membership,
1230+
s.sender,
1231+
m_prev.membership AS prev_membership
1232+
FROM sliding_sync_membership_snapshots as s
1233+
LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
1234+
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
1235+
WHERE s.user_id = ?
1236+
1237+
UNION ALL
1238+
1239+
SELECT
1240+
s.room_id,
1241+
e.event_id,
1242+
s.instance_name,
1243+
s.stream_id,
1244+
m.membership,
1245+
e.sender,
1246+
m_prev.membership AS prev_membership
1247+
FROM current_state_delta_stream AS s
1248+
LEFT JOIN events AS e ON e.event_id = s.event_id
1249+
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
1250+
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
1251+
WHERE
1252+
s.type = ?
1253+
AND s.state_key = ?
1254+
) AS c
1255+
INNER JOIN rooms USING (room_id)
1256+
WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
1257+
ORDER BY event_stream_ordering ASC
1258+
"""
1259+
1260+
txn.execute(
1261+
sql,
1262+
(user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
1263+
)
1264+
1265+
membership_changes: Dict[str, RoomsForUserStateReset] = {}
1266+
for (
1267+
room_id,
1268+
membership_event_id,
1269+
event_instance_name,
1270+
event_stream_ordering,
1271+
membership,
1272+
sender,
1273+
prev_membership,
1274+
room_version_id,
1275+
) in txn:
1276+
assert room_id is not None
1277+
assert event_stream_ordering is not None
1278+
1279+
if room_id in room_ids_to_exclude:
1280+
continue
1281+
1282+
if _filter_results_by_stream(
1283+
from_key,
1284+
to_key,
1285+
event_instance_name,
1286+
event_stream_ordering,
1287+
):
1288+
# When the server leaves a room, it will insert new rows into the
1289+
# `current_state_delta_stream` table with `event_id = null` for all
1290+
# current state. This means we might already have a row for the
1291+
# leave event and then another for the same leave where the
1292+
# `event_id=null` but the `prev_event_id` is pointing back at the
1293+
# earlier leave event. We don't want to report the leave, if we
1294+
# already have a leave event.
1295+
if (
1296+
membership_event_id is None
1297+
and prev_membership == Membership.LEAVE
1298+
):
1299+
continue
1300+
1301+
if membership_event_id is None and room_id in membership_changes:
1302+
# SUSPICIOUS: if we join a room and get state reset out of it
1303+
# in the same queried window,
1304+
# won't this ignore the 'state reset out of it' part?
1305+
continue
1306+
1307+
# When `s.event_id = null`, we won't be able to get respective
1308+
# `room_membership` but can assume the user has left the room
1309+
# because this only happens when the server leaves a room
1310+
# (meaning everyone locally left) or a state reset which removed
1311+
# the person from the room.
1312+
membership = (
1313+
membership if membership is not None else Membership.LEAVE
1314+
)
1315+
1316+
if membership == prev_membership:
1317+
# If `membership` and `prev_membership` are the same then this
1318+
# is not a meaningful change so we can skip it.
1319+
# An example of this happening is when the user changes their display name.
1320+
continue
1321+
1322+
membership_change = RoomsForUserStateReset(
1323+
room_id=room_id,
1324+
sender=sender,
1325+
membership=membership,
1326+
event_id=membership_event_id,
1327+
event_pos=PersistedEventPosition(
1328+
event_instance_name, event_stream_ordering
1329+
),
1330+
room_version_id=room_version_id,
1331+
)
1332+
1333+
membership_changes[room_id] = membership_change
1334+
1335+
return membership_changes
1336+
1337+
membership_changes = await self.db_pool.runInteraction(
1338+
"get_sliding_sync_membership_changes", f
1339+
)
1340+
1341+
return membership_changes
1342+
11411343
@cancellable
11421344
async def get_membership_changes_for_user(
11431345
self,
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
--
2+
-- This file is licensed under the Affero General Public License (AGPL) version 3.
3+
--
4+
-- Copyright (C) 2025 New Vector, Ltd
5+
--
6+
-- This program is free software: you can redistribute it and/or modify
7+
-- it under the terms of the GNU Affero General Public License as
8+
-- published by the Free Software Foundation, either version 3 of the
9+
-- License, or (at your option) any later version.
10+
--
11+
-- See the GNU Affero General Public License for more details:
12+
-- <https://www.gnu.org/licenses/agpl-3.0.html>.
13+
14+
-- So we can fetch all rooms for a given user sorted by stream order
15+
DROP INDEX IF EXISTS sliding_sync_membership_snapshots_user_id;
16+
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id, event_stream_ordering);

tests/handlers/test_sliding_sync.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -594,6 +594,12 @@ class ComputeInterestedRoomsTestCase(SlidingSyncBase):
594594
the correct list of rooms IDs.
595595
"""
596596

597+
# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
598+
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
599+
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
600+
# well to stress that logic and we shouldn't remove them just because we're removing
601+
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
602+
597603
servlets = [
598604
admin.register_servlets,
599605
knock.register_servlets,
@@ -2976,6 +2982,12 @@ class ComputeInterestedRoomsShardTestCase(
29762982
sharded event stream_writers enabled
29772983
"""
29782984

2985+
# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
2986+
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
2987+
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
2988+
# well to stress that logic and we shouldn't remove them just because we're removing
2989+
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).
2990+
29792991
servlets = [
29802992
admin.register_servlets_for_client_rest_resource,
29812993
room.register_servlets,

0 commit comments

Comments
 (0)