Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
340ae9b
WIP
erikjohnston Mar 28, 2025
74cd6d5
Improve reject invite logic
devonh Apr 7, 2025
17cdd4f
simplify query: left joins with e.event_id = NULL never match
reivilibre Apr 16, 2025
5ee188a
optimise query: the unioned subqueries can not overlap and don't need…
reivilibre Apr 16, 2025
de7217f
add missing `user_id` filter to first half of the query
reivilibre Apr 16, 2025
9e41abd
don't use `is` for comparing strings returned from the DB; they may n…
reivilibre Apr 16, 2025
af7a4af
TODO: suspicion
reivilibre Apr 16, 2025
fd4a633
TODO: note what indices we need
reivilibre Apr 16, 2025
e0aad4b
Revert "add missing `user_id` filter to first half of the query"
devonh Apr 28, 2025
1519d4f
Reapply "add missing `user_id` filter to first half of the query"
devonh Apr 28, 2025
7ed2a12
Get tests passing again
devonh Apr 29, 2025
223c87e
Cleanup printing
devonh Apr 29, 2025
b8d1b5e
More printing cleanup
devonh Apr 29, 2025
7640c59
Readd union optimization
devonh Apr 29, 2025
9138ee0
Merge branch 'develop' into erikj/ss_reject_invites
devonh Apr 29, 2025
241ec9a
Add changelog entry
devonh Apr 29, 2025
79fc888
Comment newly_left room logic
devonh May 1, 2025
3e353b6
Move test & add comments
devonh May 1, 2025
a7bc0ca
Update tests/rest/client/sliding_sync/test_sliding_sync.py
devonh May 1, 2025
93b00c3
Remove leftover code
devonh May 1, 2025
720f85e
Make function take set
devonh May 1, 2025
06b9a34
Update synapse/storage/databases/main/stream.py
devonh May 1, 2025
db486c7
Add comment for why no membership means leave
devonh May 1, 2025
06209fc
Filter out all membership non-changes
devonh May 1, 2025
c9a6a0b
Add better index on sliding_sync_membership_snapshots
devonh May 1, 2025
096bbfa
Add docstring to new func
devonh May 1, 2025
ed7c580
Readd old impl for fallback tables case
devonh May 1, 2025
14c4e4d
Add newlines to docstring
devonh May 2, 2025
2c3f74f
Expand docstring
devonh May 2, 2025
1f74e91
Assert topological tokens not used
devonh May 2, 2025
5ad092e
Add FIXME comment to update tests to run against new tables
devonh May 2, 2025
b05f339
Merge branch 'develop' into erikj/ss_reject_invites
devonh May 5, 2025
e876851
Merge branch 'develop' into erikj/ss_reject_invites
devonh May 7, 2025
ea3c9fc
Move db delta to new schema version
devonh May 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/18375.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Pass leave from remote invite rejection down Sliding Sync.
23 changes: 23 additions & 0 deletions synapse/handlers/sliding_sync/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ async def handle_room(room_id: str) -> None:
from_token=from_token,
to_token=to_token,
newly_joined=room_id in interested_rooms.newly_joined_rooms,
newly_left=room_id in interested_rooms.newly_left_rooms,
is_dm=room_id in interested_rooms.dm_room_ids,
)

Expand Down Expand Up @@ -542,6 +543,7 @@ async def get_room_sync_data(
from_token: Optional[SlidingSyncStreamToken],
to_token: StreamToken,
newly_joined: bool,
newly_left: bool,
is_dm: bool,
) -> SlidingSyncResult.RoomResult:
"""
Expand All @@ -559,6 +561,7 @@ async def get_room_sync_data(
from_token: The point in the stream to sync from.
to_token: The point in the stream to sync up to.
newly_joined: If the user has newly joined the room
newly_left: If the user has newly left the room
is_dm: Whether the room is a DM room
"""
user = sync_config.user
Expand Down Expand Up @@ -856,6 +859,26 @@ async def get_room_sync_data(
# TODO: Limit the number of state events we're about to send down
# the room, if its too many we should change this to an
# `initial=True`?

# For the case of rejecting remote invites, the leave event won't be
# returned by `get_current_state_deltas_for_room`. This is due to the current
# state only being filled out for rooms the server is in, and so doesn't pick
# up out-of-band leaves (including locally rejected invites) as these events
# are outliers and not added to the `current_state_delta_stream`.
#
# We rely on being explicitly told that the room has been `newly_left` to
# ensure we extract the out-of-band leave.
if newly_left and room_membership_for_user_at_to_token.event_id is not None:
membership_changed = True
leave_event = await self.store.get_event(
room_membership_for_user_at_to_token.event_id
)
state_key = leave_event.get_state_key()
if state_key is not None:
room_state_delta_id_map[(leave_event.type, state_key)] = (
room_membership_for_user_at_to_token.event_id
)

deltas = await self.get_current_state_deltas_for_room(
room_id=room_id,
room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
Expand Down
49 changes: 48 additions & 1 deletion synapse/handlers/sliding_sync/room_lists.py
Original file line number Diff line number Diff line change
Expand Up @@ -1120,7 +1120,7 @@ async def get_room_membership_for_user_at_to_token(
(
newly_joined_room_ids,
newly_left_room_map,
) = await self._get_newly_joined_and_left_rooms(
) = await self._get_newly_joined_and_left_rooms_fallback(
user_id, to_token=to_token, from_token=from_token
)

Expand Down Expand Up @@ -1176,6 +1176,53 @@ async def _get_newly_joined_and_left_rooms(
"state reset" out of the room, and so that room would not be part of the
"current memberships" of the user.

Returns:
A 2-tuple of newly joined room IDs and a map of newly_left room
IDs to the `RoomsForUserStateReset` entry.

We're using `RoomsForUserStateReset` but that doesn't necessarily mean the
user was state reset of the rooms. It's just that the `event_id`/`sender`
are optional and we can't tell the difference between the server leaving the
room when the user was the last person participating in the room and left or
was state reset out of the room. To actually check for a state reset, you
need to check if a membership still exists in the room.
"""

newly_joined_room_ids: Set[str] = set()
newly_left_room_map: Dict[str, RoomsForUserStateReset] = {}

if not from_token:
return newly_joined_room_ids, newly_left_room_map

changes = await self.store.get_sliding_sync_membership_changes(
user_id,
from_key=from_token.room_key,
to_key=to_token.room_key,
excluded_room_ids=set(self.rooms_to_exclude_globally),
)

for room_id, entry in changes.items():
if entry.membership == Membership.JOIN:
newly_joined_room_ids.add(room_id)
elif entry.membership == Membership.LEAVE:
newly_left_room_map[room_id] = entry

return newly_joined_room_ids, newly_left_room_map

@trace
async def _get_newly_joined_and_left_rooms_fallback(
self,
user_id: str,
to_token: StreamToken,
from_token: Optional[StreamToken],
) -> Tuple[AbstractSet[str], Mapping[str, RoomsForUserStateReset]]:
"""Fetch the sets of rooms that the user newly joined or left in the
given token range.

Note: there may be rooms in the newly left rooms where the user was
"state reset" out of the room, and so that room would not be part of the
"current memberships" of the user.

Returns:
A 2-tuple of newly joined room IDs and a map of newly_left room
IDs to the `RoomsForUserStateReset` entry.
Expand Down
202 changes: 202 additions & 0 deletions synapse/storage/databases/main/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.roommember import RoomsForUserStateReset
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
from synapse.util.caches.descriptors import cached, cachedList
Expand Down Expand Up @@ -993,6 +994,10 @@ async def get_current_state_delta_membership_changes_for_user(
available in the `current_state_delta_stream` table. To actually check for a
state reset, you need to check if a membership still exists in the room.
"""

assert from_key.topological is None
assert to_key.topological is None

# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return []
Expand Down Expand Up @@ -1138,6 +1143,203 @@ def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
if membership_change.room_id not in room_ids_to_exclude
]

@trace
async def get_sliding_sync_membership_changes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this uses the sliding_sync_membership_snapshots table, we have to gate it behind self.store.have_finished_sliding_sync_background_jobs().

It looks like we're using it in the fallback case as well right now:

  • _compute_interested_rooms_new_tables -> _get_newly_joined_and_left_rooms -> get_sliding_sync_membership_changes
  • _compute_interested_rooms_fallback -> get_room_membership_for_user_at_to_token -> _get_newly_joined_and_left_rooms -> get_sliding_sync_membership_changes

As one solution, per the comment in the following code, we could also just make this foreground update now as those tables originally shipped with Synapse 1.115.0rc1 (2024-09-10) (see #17512) and remove the fallback path.

if await self.store.have_finished_sliding_sync_background_jobs():
return await self._compute_interested_rooms_new_tables(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
to_token=to_token,
from_token=from_token,
)
else:
# FIXME: This can be removed once we bump `SCHEMA_COMPAT_VERSION` and run the
# foreground update for
# `sliding_sync_joined_rooms`/`sliding_sync_membership_snapshots` (tracked by
# https://github.com/element-hq/synapse/issues/17623)
return await self._compute_interested_rooms_fallback(
sync_config=sync_config,
previous_connection_state=previous_connection_state,
to_token=to_token,
from_token=from_token,
)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, after looking into it a bit, that seems like a large undertaking that would block this fix unnecessarily.
Mainly - what would we do about populating the sliding_sync_joined_rooms_to_recalculate table in a foreground update and converting the 2 background jobs over to a foreground update is quite involved as they are pretty complex.

I think it may be better to split the impl & keep the old logic for the fallback case.
That means the fallback case would still have the bug, but realistically, who is still operating in the fallback case zone at this point?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed it to do just that for now.
It uses the old way while using the fallback case, which means the bug is still present.
But uses the new way when using the new tables.

If we aren't happy with this and want to wait until we have completed the DB transition for sliding sync then that PR will need to be created & merged before we can merge this one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it may be better to split the impl & keep the old logic for the fallback case.
That means the fallback case would still have the bug, but realistically, who is still operating in the fallback case zone at this point?

Sounds reasonable to me.

We may want to consider refactoring GetRoomMembershipForUserAtToTokenTestCase/GetRoomMembershipForUserAtToTokenShardTestCase to work against compute_interested_rooms(...) (instead of get_room_membership_for_user_at_to_token(...) which only applies to the fallback path) so we have some better coverage of this logic. If you're feeling up for it, I think it could be useful to be more confident in these changes otherwise we can just add a future FIXME in both of those docstrings that we should tackle it in the future:

    FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
    instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
    in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
    well to stress that logic and we shouldn't remove them just because we're removing
    the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).

We do still have a good set of end-to-end rest layer tests to cover us in a lot of scenarios.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the idea.

I started down that path but it looks like it'll be quite involved to do properly.
It should happen in a future PR that exclusively converts the tests over to handle the new code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely comfortable merging these changes until those tests have been converted so we can ensure everything is still working as intended.
It'll take me quite a while to convert the tests due to a lack of simplified sliding sync knowledge. It's not too hard to technically change them over. But deciding what is okay to change requires intimate knowledge of the spec.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. So after converting that test, 12 of the tests fail when using the new tables (they pass when using the fallback tables).

But those same 12 tests also fail against the develop branch, so I don't think the changes here are making things any worse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests have now been updated & #18399 has been merged. Let's see if these changes are still happy.

self,
user_id: str,
from_key: RoomStreamToken,
to_key: RoomStreamToken,
excluded_room_ids: Optional[AbstractSet[str]] = None,
) -> Dict[str, RoomsForUserStateReset]:
"""
Fetch membership events that result in a meaningful membership change for a
given user.

A meaningful membership changes is one where the `membership` value actually
changes. This means memberships changes from `join` to `join` (like a display
name change) will be filtered out since they result in no meaningful change.

Note: This function only works with "live" tokens with `stream_ordering` only.

We're looking for membership changes in the token range (> `from_key` and <=
`to_key`).

Args:
user_id: The user ID to fetch membership events for.
from_key: The point in the stream to sync from (fetching events > this point).
to_key: The token to fetch rooms up to (fetching events <= this point).
excluded_room_ids: Optional list of room IDs to exclude from the results.

Returns:
All meaningful membership changes to the current state in the token range.
Events are sorted by `stream_ordering` ascending.

`event_id`/`sender` can be `None` when the server leaves a room (meaning
everyone locally left) or a state reset which removed the person from the
room. We can't tell the difference between the two cases with what's
available in the `current_state_delta_stream` table. To actually check for a
state reset, you need to check if a membership still exists in the room.
"""

assert from_key.topological is None
assert to_key.topological is None

# Start by ruling out cases where a DB query is not necessary.
if from_key == to_key:
return {}

if from_key:
has_changed = self._membership_stream_cache.has_entity_changed(
user_id, int(from_key.stream)
)
if not has_changed:
return {}

room_ids_to_exclude: AbstractSet[str] = set()
if excluded_room_ids is not None:
room_ids_to_exclude = excluded_room_ids

def f(txn: LoggingTransaction) -> Dict[str, RoomsForUserStateReset]:
# To handle tokens with a non-empty instance_map we fetch more
# results than necessary and then filter down
min_from_id = from_key.stream
max_to_id = to_key.get_max_stream_pos()

# This query looks at membership changes in
# `sliding_sync_membership_snapshots` which will not include users
# that were state reset out of rooms; so we need to look for that
# case in `current_state_delta_stream`.
sql = """
SELECT
room_id,
membership_event_id,
event_instance_name,
event_stream_ordering,
membership,
sender,
prev_membership,
room_version
FROM
(
SELECT
s.room_id,
s.membership_event_id,
s.event_instance_name,
s.event_stream_ordering,
s.membership,
s.sender,
m_prev.membership AS prev_membership
FROM sliding_sync_membership_snapshots as s
LEFT JOIN event_edges AS e ON e.event_id = s.membership_event_id
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = e.prev_event_id
WHERE s.user_id = ?

UNION ALL

SELECT
s.room_id,
e.event_id,
s.instance_name,
s.stream_id,
m.membership,
e.sender,
m_prev.membership AS prev_membership
FROM current_state_delta_stream AS s
LEFT JOIN events AS e ON e.event_id = s.event_id
LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
WHERE
s.type = ?
AND s.state_key = ?
) AS c
INNER JOIN rooms USING (room_id)
WHERE event_stream_ordering > ? AND event_stream_ordering <= ?
ORDER BY event_stream_ordering ASC
"""

txn.execute(
sql,
(user_id, EventTypes.Member, user_id, min_from_id, max_to_id),
)

membership_changes: Dict[str, RoomsForUserStateReset] = {}
for (
room_id,
membership_event_id,
event_instance_name,
event_stream_ordering,
membership,
sender,
prev_membership,
room_version_id,
) in txn:
assert room_id is not None
assert event_stream_ordering is not None

if room_id in room_ids_to_exclude:
continue

if _filter_results_by_stream(
from_key,
to_key,
event_instance_name,
event_stream_ordering,
):
# When the server leaves a room, it will insert new rows into the
# `current_state_delta_stream` table with `event_id = null` for all
# current state. This means we might already have a row for the
# leave event and then another for the same leave where the
# `event_id=null` but the `prev_event_id` is pointing back at the
# earlier leave event. We don't want to report the leave, if we
# already have a leave event.
if (
membership_event_id is None
and prev_membership == Membership.LEAVE
):
continue

if membership_event_id is None and room_id in membership_changes:
# SUSPICIOUS: if we join a room and get state reset out of it
# in the same queried window,
# won't this ignore the 'state reset out of it' part?
continue

# When `s.event_id = null`, we won't be able to get respective
# `room_membership` but can assume the user has left the room
# because this only happens when the server leaves a room
# (meaning everyone locally left) or a state reset which removed
# the person from the room.
membership = (
membership if membership is not None else Membership.LEAVE
)

if membership == prev_membership:
# If `membership` and `prev_membership` are the same then this
# is not a meaningful change so we can skip it.
# An example of this happening is when the user changes their display name.
continue

membership_change = RoomsForUserStateReset(
room_id=room_id,
sender=sender,
membership=membership,
event_id=membership_event_id,
event_pos=PersistedEventPosition(
event_instance_name, event_stream_ordering
),
room_version_id=room_version_id,
)

membership_changes[room_id] = membership_change

return membership_changes

membership_changes = await self.db_pool.runInteraction(
"get_sliding_sync_membership_changes", f
)

return membership_changes

@cancellable
async def get_membership_changes_for_user(
self,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2025 New Vector, Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- So we can fetch all rooms for a given user sorted by stream order
DROP INDEX IF EXISTS sliding_sync_membership_snapshots_user_id;
CREATE INDEX IF NOT EXISTS sliding_sync_membership_snapshots_user_id ON sliding_sync_membership_snapshots(user_id, event_stream_ordering);
12 changes: 12 additions & 0 deletions tests/handlers/test_sliding_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,12 @@ class ComputeInterestedRoomsTestCase(SlidingSyncBase):
the correct list of rooms IDs.
"""

# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
# well to stress that logic and we shouldn't remove them just because we're removing
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).

servlets = [
admin.register_servlets,
knock.register_servlets,
Expand Down Expand Up @@ -2976,6 +2982,12 @@ class ComputeInterestedRoomsShardTestCase(
sharded event stream_writers enabled
"""

# FIXME: We should refactor these tests to run against `compute_interested_rooms(...)`
# instead of just `get_room_membership_for_user_at_to_token(...)` which is only used
# in the fallback path (`_compute_interested_rooms_fallback(...)`). These scenarios do
# well to stress that logic and we shouldn't remove them just because we're removing
# the fallback path (tracked by https://github.com/element-hq/synapse/issues/17623).

servlets = [
admin.register_servlets_for_client_rest_resource,
room.register_servlets,
Expand Down
Loading
Loading