Skip to content

Commit 25160a4

Browse files
committed
fix(broadcast-worker): gate channel thread fan-out on membership (#309)
channelThreadFanOut published per-account thread events to replyAccounts followers + @-mentions with no check against current room membership, so a non-member left in replyAccounts, or an @-mention of a non-member, received the full live thread reply — while the push path (notification-worker) correctly bounded recipients to members. With replyAccounts now kept member-clean on removal (#308), followers are trusted and delivered directly; only @-mentions (which can still target a non-member) are intersected against membership, and only when the reply has mentions — scoped to just the mentioned accounts via the shared pkg/pipelines.SubscribedAccounts point read, so the common no-mention reply issues zero membership queries and cost never scales with room size. The sender is always included (multi-device echo, race-free vs the async replyAccounts writer). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01FNGoUansahjF37TFjs2yse
1 parent e88bcdb commit 25160a4

6 files changed

Lines changed: 211 additions & 62 deletions

File tree

broadcast-worker/handler.go

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ func (h *Handler) handleThreadCreated(ctx context.Context, evt *model.MessageEve
210210
// event. Fetch the subscriber list and build fanOut before any further work.
211211
var fanOut []string
212212
if meta.Type == model.RoomTypeChannel {
213-
fanOut, err = h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
213+
fanOut, err = h.channelThreadFanOut(ctx, meta.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
214214
if err != nil {
215215
return fmt.Errorf("channel thread fan-out for parent %s: %w", parentMsgID, err)
216216
}
@@ -314,7 +314,7 @@ func (h *Handler) handleThreadUpdated(ctx context.Context, evt *model.MessageEve
314314
switch room.Type {
315315
case model.RoomTypeChannel:
316316
parsed := mention.Parse(msg.Content)
317-
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
317+
fanOut, err := h.channelThreadFanOut(ctx, room.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
318318
if err != nil {
319319
return fmt.Errorf("channel thread fan-out for thread update of parent %s: %w", parentMsgID, err)
320320
}
@@ -367,7 +367,7 @@ func (h *Handler) handleThreadDeleted(ctx context.Context, evt *model.MessageEve
367367
// receive the delete. Only the channel path uses mentions; the DM path
368368
// fans out to all members.
369369
parsed := mention.Parse(msg.Content)
370-
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
370+
fanOut, err := h.channelThreadFanOut(ctx, room.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
371371
if err != nil {
372372
return fmt.Errorf("channel thread fan-out for thread delete of parent %s: %w", parentMsgID, err)
373373
}
@@ -942,17 +942,13 @@ func (h *Handler) publishToThreadAccounts(ctx context.Context, accounts []string
942942
return nil
943943
}
944944

945-
// threadFanOutAccounts builds the deduplicated fan-out recipient list for a
946-
// thread event. The message sender is always included first (unless a bot):
947-
// they authored the reply and are therefore a thread participant, so their own
948-
// devices must receive the event for multi-device sync. The sender is added
949-
// directly here rather than relied upon via replyAccounts — replyAccounts is
950-
// written by message-worker on a separate, unordered MESSAGES_CANONICAL
951-
// consumer, so a fan-out that depended on it would race the sender's own first
952-
// reply and silently drop the echo. followers (thread repliers) and
953-
// extraAccounts (@-mentioned users) are merged after, deduped. Bots are always
954-
// excluded.
955-
func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, extraAccounts []string) []string {
945+
// threadFanOutAccounts builds the deduplicated recipient list (sender +
946+
// followers + member mentions, bots excluded). The sender is added directly,
947+
// not via replyAccounts, which message-worker writes asynchronously and would
948+
// race the author's own echo. Followers are trusted member-clean (room-worker
949+
// scrubs them on removal, #308); mentions may target non-members (#307) so only
950+
// memberMentions are delivered (#309).
951+
func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, mentions []string, memberMentions map[string]struct{}) []string {
956952
seen := map[string]struct{}{}
957953
var fanOut []string
958954
add := func(acc string) {
@@ -968,26 +964,35 @@ func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, e
968964
seen[acc] = struct{}{}
969965
fanOut = append(fanOut, acc)
970966
}
971-
add(senderAccount) // author is a thread participant — include race-free
967+
add(senderAccount)
972968
for acc := range followers {
973969
add(acc)
974970
}
975-
for _, acc := range extraAccounts {
976-
add(acc)
971+
for _, acc := range mentions {
972+
if _, ok := memberMentions[acc]; ok { // gated: a mention may target a non-member
973+
add(acc)
974+
}
977975
}
978976
return fanOut
979977
}
980978

981-
// channelThreadFanOut resolves the deduplicated recipient list for a channel
982-
// thread event: it fetches the parent message's thread followers and merges
983-
// them with the @-mentioned accounts, excluding the sender. Shared by the
984-
// channel branch of every thread handler (created/updated/deleted).
985-
func (h *Handler) channelThreadFanOut(ctx context.Context, parentMsgID, sender string, mentions []string) ([]string, error) {
979+
// channelThreadFanOut resolves the channel thread recipient list: trusted
980+
// followers plus mentioned members, sender always included. Only mentions are
981+
// membership-checked, and only when present — a no-mention reply queries
982+
// nothing, and the query is scoped to the mentions, never the whole room.
983+
func (h *Handler) channelThreadFanOut(ctx context.Context, roomID, parentMsgID, sender string, mentions []string) ([]string, error) {
986984
followers, err := h.store.GetThreadFollowers(ctx, parentMsgID)
987985
if err != nil {
988986
return nil, fmt.Errorf("get thread followers for parent %s: %w", parentMsgID, err)
989987
}
990-
return threadFanOutAccounts(sender, followers, mentions), nil
988+
var memberMentions map[string]struct{}
989+
if len(mentions) > 0 {
990+
memberMentions, err = h.store.FilterRoomMembers(ctx, roomID, mentions)
991+
if err != nil {
992+
return nil, fmt.Errorf("filter mentioned members for room %s: %w", roomID, err)
993+
}
994+
}
995+
return threadFanOutAccounts(sender, followers, mentions, memberMentions), nil
991996
}
992997

993998
// usersByAccount indexes a slice of users by their Account for O(1) lookup

broadcast-worker/handler_test.go

Lines changed: 117 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1721,65 +1721,84 @@ func TestHandleUnpinned_DMRoom_FansOutToBothMembers(t *testing.T) {
17211721

17221722
func TestThreadFanOutAccounts(t *testing.T) {
17231723
tests := []struct {
1724-
name string
1725-
sender string
1726-
followers map[string]struct{}
1727-
extraAccounts []string
1728-
want []string
1724+
name string
1725+
sender string
1726+
followers map[string]struct{}
1727+
mentions []string
1728+
memberMentions map[string]struct{}
1729+
want []string
17291730
}{
17301731
{
1731-
name: "sender alone still notified (own devices, no other followers)",
1732-
sender: "alice",
1733-
followers: map[string]struct{}{},
1734-
extraAccounts: nil,
1735-
want: []string{"alice"},
1732+
name: "sender alone still notified (own devices, no other followers)",
1733+
sender: "alice",
1734+
followers: map[string]struct{}{},
1735+
want: []string{"alice"},
17361736
},
17371737
{
1738-
name: "sender included even when not yet in replyAccounts (race-free)",
1739-
sender: "alice",
1740-
followers: map[string]struct{}{"bob": {}},
1741-
extraAccounts: nil,
1742-
want: []string{"alice", "bob"},
1738+
name: "sender included even when not yet in replyAccounts (race-free)",
1739+
sender: "alice",
1740+
followers: map[string]struct{}{"bob": {}},
1741+
want: []string{"alice", "bob"},
17431742
},
17441743
{
1745-
name: "sender included when also a follower (multi-device support)",
1744+
name: "follower delivered without a membership check (replyAccounts trusted, #308)",
17461745
sender: "alice",
1747-
followers: map[string]struct{}{"alice": {}, "bob": {}},
1748-
want: []string{"alice", "bob"},
1746+
followers: map[string]struct{}{"bob": {}, "carol": {}},
1747+
want: []string{"alice", "bob", "carol"},
1748+
},
1749+
{
1750+
name: "member mention delivered",
1751+
sender: "alice",
1752+
followers: map[string]struct{}{},
1753+
mentions: []string{"carol"},
1754+
memberMentions: map[string]struct{}{"carol": {}},
1755+
want: []string{"alice", "carol"},
17491756
},
17501757
{
1751-
name: "sender included when only in extra accounts",
1752-
sender: "alice",
1753-
followers: map[string]struct{}{"bob": {}},
1754-
extraAccounts: []string{"alice"},
1755-
want: []string{"bob", "alice"},
1758+
name: "non-member mention filtered out (#309)",
1759+
sender: "alice",
1760+
followers: map[string]struct{}{},
1761+
mentions: []string{"mallory"},
1762+
memberMentions: map[string]struct{}{}, // mallory is not a member → dropped
1763+
want: []string{"alice"},
17561764
},
17571765
{
1758-
name: "extra accounts merged deduped",
1759-
sender: "alice",
1760-
followers: map[string]struct{}{"bob": {}},
1761-
extraAccounts: []string{"bob", "carol"},
1762-
want: []string{"alice", "bob", "carol"},
1766+
name: "followers trusted, mentions gated, mix",
1767+
sender: "alice",
1768+
followers: map[string]struct{}{"bob": {}},
1769+
mentions: []string{"carol", "mallory"},
1770+
memberMentions: map[string]struct{}{"carol": {}},
1771+
want: []string{"alice", "bob", "carol"},
17631772
},
17641773
{
1765-
name: "bot accounts skipped even if sender is bot",
1766-
sender: "helper.bot",
1767-
followers: map[string]struct{}{"helper.bot": {}, "bob": {}},
1768-
extraAccounts: []string{"other.bot"},
1769-
want: []string{"bob"},
1774+
name: "bot accounts skipped even if sender is bot",
1775+
sender: "helper.bot",
1776+
followers: map[string]struct{}{"helper.bot": {}, "bob": {}},
1777+
mentions: []string{"other.bot"},
1778+
memberMentions: map[string]struct{}{"other.bot": {}},
1779+
want: []string{"bob"},
17701780
},
17711781
{
1772-
name: "sender not duplicated when in both followers and extras",
1773-
sender: "alice",
1774-
followers: map[string]struct{}{"alice": {}, "bob": {}},
1775-
extraAccounts: []string{"alice", "carol"},
1776-
want: []string{"alice", "bob", "carol"},
1782+
name: "sender not duplicated when also a follower and mention",
1783+
sender: "alice",
1784+
followers: map[string]struct{}{"alice": {}, "bob": {}},
1785+
mentions: []string{"alice", "carol"},
1786+
memberMentions: map[string]struct{}{"alice": {}, "carol": {}},
1787+
want: []string{"alice", "bob", "carol"},
1788+
},
1789+
{
1790+
name: "mention that is also a follower is delivered via the trusted follower path",
1791+
sender: "alice",
1792+
followers: map[string]struct{}{"bob": {}},
1793+
mentions: []string{"bob"},
1794+
memberMentions: map[string]struct{}{"bob": {}},
1795+
want: []string{"alice", "bob"},
17771796
},
17781797
}
17791798

17801799
for _, tc := range tests {
17811800
t.Run(tc.name, func(t *testing.T) {
1782-
got := threadFanOutAccounts(tc.sender, tc.followers, tc.extraAccounts)
1801+
got := threadFanOutAccounts(tc.sender, tc.followers, tc.mentions, tc.memberMentions)
17831802
assert.ElementsMatch(t, tc.want, got)
17841803
})
17851804
}
@@ -2016,6 +2035,7 @@ func TestHandleThreadCreated_ChannelRoom_FansOutToFollowers(t *testing.T) {
20162035
followers := map[string]struct{}{"bob": {}, "carol": {}}
20172036
store.EXPECT().GetRoomMeta(gomock.Any(), roomID).Return(metaOf(testChannelRoom), nil)
20182037
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
2038+
// No @-mentions in the reply → no membership query; followers are trusted (#308).
20192039
us.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"alice"}).Return([]model.User{testUsers[0]}, nil)
20202040

20212041
evt := model.MessageEvent{
@@ -2054,6 +2074,60 @@ func TestHandleThreadCreated_ChannelRoom_FansOutToFollowers(t *testing.T) {
20542074
assert.True(t, subjects[subject.UserRoomEvent("carol")])
20552075
}
20562076

2077+
func TestHandleThreadCreated_ChannelRoom_FiltersNonMemberMentions(t *testing.T) {
2078+
ctrl := gomock.NewController(t)
2079+
store := NewMockStore(ctrl)
2080+
us := NewMockUserStore(ctrl)
2081+
pub := &mockPublisher{}
2082+
keyStore := NewMockRoomKeyProvider(ctrl)
2083+
2084+
msgTime := time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC)
2085+
parentMsgID := "parent-1"
2086+
siteID := "site-a"
2087+
roomID := "r1"
2088+
2089+
// Followers bob + carol are delivered without a membership check (#308); of
2090+
// the mentions, member dave is delivered but non-member mallory is not (#309).
2091+
followers := map[string]struct{}{"bob": {}, "carol": {}}
2092+
store.EXPECT().GetRoomMeta(gomock.Any(), roomID).Return(metaOf(testChannelRoom), nil)
2093+
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
2094+
// Only the mentioned accounts are queried — never the followers.
2095+
store.EXPECT().FilterRoomMembers(gomock.Any(), "room-1", gomock.InAnyOrder([]string{"dave", "mallory"})).
2096+
Return(map[string]struct{}{"dave": {}}, nil)
2097+
us.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.InAnyOrder([]string{"alice", "dave", "mallory"})).Return([]model.User{testUsers[0]}, nil)
2098+
2099+
evt := model.MessageEvent{
2100+
Event: model.EventCreated,
2101+
SiteID: siteID,
2102+
Timestamp: msgTime.UnixMilli(),
2103+
Message: model.Message{
2104+
ID: "reply-1",
2105+
RoomID: roomID,
2106+
UserID: "u-alice",
2107+
UserAccount: "alice",
2108+
Content: "a thread reply @dave @mallory",
2109+
CreatedAt: msgTime,
2110+
ThreadParentMessageID: parentMsgID,
2111+
TShow: false,
2112+
},
2113+
}
2114+
data, _ := json.Marshal(evt)
2115+
2116+
h := NewHandler(store, us, pub, keyStore, false)
2117+
require.NoError(t, h.HandleMessage(context.Background(), data))
2118+
2119+
subjects := map[string]bool{}
2120+
for _, r := range pub.records {
2121+
subjects[r.subject] = true
2122+
}
2123+
assert.True(t, subjects[subject.UserRoomEvent("alice")], "sender must receive their own echo")
2124+
assert.True(t, subjects[subject.UserRoomEvent("bob")], "follower must receive the event")
2125+
assert.True(t, subjects[subject.UserRoomEvent("carol")], "follower must receive the event")
2126+
assert.True(t, subjects[subject.UserRoomEvent("dave")], "member mention must receive the event")
2127+
assert.False(t, subjects[subject.UserRoomEvent("mallory")], "non-member mention must NOT receive the live thread event (#309)")
2128+
require.Len(t, pub.records, 4)
2129+
}
2130+
20572131
func TestHandleThreadCreated_ChannelRoom_NoFollowers_SendsToSenderOnly(t *testing.T) {
20582132
ctrl := gomock.NewController(t)
20592133
store := NewMockStore(ctrl)
@@ -2065,6 +2139,7 @@ func TestHandleThreadCreated_ChannelRoom_NoFollowers_SendsToSenderOnly(t *testin
20652139

20662140
store.EXPECT().GetRoomMeta(gomock.Any(), "r1").Return(metaOf(testChannelRoom), nil)
20672141
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{}, nil)
2142+
// No followers and no mentions → no membership query at all.
20682143
us.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"alice"}).Return([]model.User{testUsers[0]}, nil)
20692144

20702145
evt := model.MessageEvent{
@@ -2186,6 +2261,7 @@ func TestHandleThreadUpdated_ChannelRoom_FansOutToFollowers(t *testing.T) {
21862261
followers := map[string]struct{}{"bob": {}, "carol": {}}
21872262
store.EXPECT().GetRoom(gomock.Any(), roomID).Return(room, nil)
21882263
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
2264+
// No @-mentions → no membership query; followers are trusted (#308).
21892265

21902266
evt := model.MessageEvent{
21912267
Event: model.EventUpdated,
@@ -2334,6 +2410,7 @@ func TestHandleThreadDeleted_ChannelRoom_FansOutToFollowers(t *testing.T) {
23342410
followers := map[string]struct{}{"bob": {}, "carol": {}}
23352411
store.EXPECT().GetRoom(gomock.Any(), roomID).Return(room, nil)
23362412
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
2413+
// No @-mentions → no membership query; followers are trusted (#308).
23372414
// No NewTCount → no badge update.
23382415

23392416
evt := model.MessageEvent{
@@ -2387,6 +2464,7 @@ func TestHandleThreadDeleted_ChannelRoom_WithBadgeUpdate(t *testing.T) {
23872464
room := &model.Room{ID: "r1", Type: model.RoomTypeChannel, SiteID: "site-a"}
23882465
store.EXPECT().GetRoom(gomock.Any(), "r1").Return(room, nil)
23892466
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{"bob": {}}, nil)
2467+
// No @-mentions → no membership query; follower bob is trusted (#308).
23902468

23912469
evt := model.MessageEvent{
23922470
Event: model.EventDeleted,

broadcast-worker/integration_test.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -446,6 +446,45 @@ func TestBroadcastWorker_GetThreadFollowers_Integration(t *testing.T) {
446446
})
447447
}
448448

449+
func TestBroadcastWorker_FilterRoomMembers_Integration(t *testing.T) {
450+
db := setupMongo(t)
451+
ctx := context.Background()
452+
store := NewMongoStore(db.Collection("rooms"), db.Collection("subscriptions"), db.Collection("thread_rooms"), nil, 0)
453+
454+
// alice and bob are members of r-fm; carol is a member of a different room only.
455+
_, err := db.Collection("subscriptions").InsertMany(ctx, []interface{}{
456+
model.Subscription{ID: "fm1", User: model.SubscriptionUser{ID: "u1", Account: "alice"}, RoomID: "r-fm"},
457+
model.Subscription{ID: "fm2", User: model.SubscriptionUser{ID: "u2", Account: "bob"}, RoomID: "r-fm"},
458+
model.Subscription{ID: "fm3", User: model.SubscriptionUser{ID: "u3", Account: "carol"}, RoomID: "r-other"},
459+
})
460+
require.NoError(t, err)
461+
462+
t.Run("returns only the queried accounts that are members", func(t *testing.T) {
463+
// eve is not a member anywhere; carol is a member of a different room.
464+
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"bob", "carol", "eve"})
465+
require.NoError(t, err)
466+
assert.Equal(t, map[string]struct{}{"bob": {}}, members)
467+
})
468+
469+
t.Run("all queried accounts are members", func(t *testing.T) {
470+
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"alice", "bob"})
471+
require.NoError(t, err)
472+
assert.Equal(t, map[string]struct{}{"alice": {}, "bob": {}}, members)
473+
})
474+
475+
t.Run("empty input does not hit the database", func(t *testing.T) {
476+
members, err := store.FilterRoomMembers(ctx, "r-fm", nil)
477+
require.NoError(t, err)
478+
assert.Empty(t, members)
479+
})
480+
481+
t.Run("no matches returns empty map", func(t *testing.T) {
482+
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"eve", "mallory"})
483+
require.NoError(t, err)
484+
assert.Empty(t, members)
485+
})
486+
}
487+
449488
func TestBroadcastWorker_EnsureIndexes_Integration(t *testing.T) {
450489
db := setupMongo(t)
451490
ctx := context.Background()

broadcast-worker/mock_store_test.go

Lines changed: 15 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

broadcast-worker/store.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@ type Store interface {
1717
GetRoom(ctx context.Context, roomID string) (*model.Room, error)
1818
GetRoomMeta(ctx context.Context, roomID string) (roommetacache.Meta, error)
1919
ListSubscriptions(ctx context.Context, roomID string) ([]model.Subscription, error)
20+
// FilterRoomMembers returns the subset of accounts that are current members
21+
// of roomID, querying only those accounts (index-backed, never the whole room).
22+
FilterRoomMembers(ctx context.Context, roomID string, accounts []string) (map[string]struct{}, error)
2023
GetThreadFollowers(ctx context.Context, parentMessageID string) (map[string]struct{}, error)
2124
UpdateRoomLastMessage(ctx context.Context, roomID, msgID string, msgAt time.Time, mentionAll bool) error
2225
SetSubscriptionMentions(ctx context.Context, roomID string, accounts []string) error

0 commit comments

Comments
 (0)