@@ -1693,93 +1693,6 @@ def __init__(
1693
1693
columns = ["user_id" , "room_id" ],
1694
1694
)
1695
1695
1696
- self .db_pool .updates .register_background_update_handler (
1697
- "populate_participant_bg_update" , self ._populate_participant
1698
- )
1699
-
1700
- async def _populate_participant (self , progress : JsonDict , batch_size : int ) -> int :
1701
- """
1702
- Background update to populate column `participant` on `room_memberships` table
1703
-
1704
- A 'participant' is someone who is currently joined to a room and has sent at least
1705
- one `m.room.message` or `m.room.encrypted` event.
1706
-
1707
- This background update will set the `participant` column across all rows in
1708
- `room_memberships` based on the user's *current* join status, and if
1709
- they've *ever* sent a message or encrypted event. Therefore one should
1710
- never assume the `participant` column's value is based solely on whether
1711
- the user participated in a previous "session" (where a "session" is defined
1712
- as a period between the user joining and leaving). See
1713
- https://github.com/element-hq/synapse/pull/18068#discussion_r1931070291
1714
- for further detail.
1715
- """
1716
- stream_token = progress .get ("last_stream_token" , None )
1717
-
1718
- def _get_max_stream_token_txn (txn : LoggingTransaction ) -> int :
1719
- sql = """
1720
- SELECT event_stream_ordering from room_memberships
1721
- ORDER BY event_stream_ordering DESC
1722
- LIMIT 1;
1723
- """
1724
- txn .execute (sql )
1725
- res = txn .fetchone ()
1726
- if not res or not res [0 ]:
1727
- return 0
1728
- return res [0 ]
1729
-
1730
- def _background_populate_participant_txn (
1731
- txn : LoggingTransaction , stream_token : str
1732
- ) -> None :
1733
- sql = """
1734
- UPDATE room_memberships
1735
- SET participant = True
1736
- FROM (
1737
- SELECT DISTINCT c.state_key, e.room_id
1738
- FROM current_state_events AS c
1739
- INNER JOIN events AS e ON c.room_id = e.room_id
1740
- WHERE c.membership = 'join'
1741
- AND c.state_key = e.sender
1742
- AND (
1743
- e.type = 'm.room.message'
1744
- OR e.type = 'm.room.encrypted'
1745
- )
1746
- ) AS subquery
1747
- WHERE room_memberships.user_id = subquery.state_key
1748
- AND room_memberships.room_id = subquery.room_id
1749
- AND room_memberships.event_stream_ordering <= ?
1750
- AND room_memberships.event_stream_ordering > ?;
1751
- """
1752
- batch = int (stream_token ) - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
1753
- txn .execute (sql , (stream_token , batch ))
1754
-
1755
- if stream_token is None :
1756
- stream_token = await self .db_pool .runInteraction (
1757
- "_get_max_stream_token" , _get_max_stream_token_txn
1758
- )
1759
-
1760
- if stream_token < 0 :
1761
- await self .db_pool .updates ._end_background_update (
1762
- "populate_participant_bg_update"
1763
- )
1764
- return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
1765
-
1766
- await self .db_pool .runInteraction (
1767
- "_background_populate_participant_txn" ,
1768
- _background_populate_participant_txn ,
1769
- stream_token ,
1770
- )
1771
-
1772
- progress ["last_stream_token" ] = (
1773
- stream_token - _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
1774
- )
1775
- await self .db_pool .runInteraction (
1776
- "populate_participant_bg_update" ,
1777
- self .db_pool .updates ._background_update_progress_txn ,
1778
- "populate_participant_bg_update" ,
1779
- progress ,
1780
- )
1781
- return _POPULATE_PARTICIPANT_BG_UPDATE_BATCH_SIZE
1782
-
1783
1696
async def _background_add_membership_profile (
1784
1697
self , progress : JsonDict , batch_size : int
1785
1698
) -> int :
0 commit comments