Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 31 additions & 25 deletions broadcast-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ func (h *Handler) handleThreadCreated(ctx context.Context, evt *model.MessageEve
// event. Fetch the subscriber list and build fanOut before any further work.
var fanOut []string
if meta.Type == model.RoomTypeChannel {
fanOut, err = h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
fanOut, err = h.channelThreadFanOut(ctx, meta.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
if err != nil {
return fmt.Errorf("channel thread fan-out for parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -314,7 +314,7 @@ func (h *Handler) handleThreadUpdated(ctx context.Context, evt *model.MessageEve
switch room.Type {
case model.RoomTypeChannel:
parsed := mention.Parse(msg.Content)
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
fanOut, err := h.channelThreadFanOut(ctx, room.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
if err != nil {
return fmt.Errorf("channel thread fan-out for thread update of parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -367,7 +367,7 @@ func (h *Handler) handleThreadDeleted(ctx context.Context, evt *model.MessageEve
// receive the delete. Only the channel path uses mentions; the DM path
// fans out to all members.
parsed := mention.Parse(msg.Content)
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
fanOut, err := h.channelThreadFanOut(ctx, room.ID, parentMsgID, msg.UserAccount, parsed.Accounts)
if err != nil {
return fmt.Errorf("channel thread fan-out for thread delete of parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -942,19 +942,16 @@ func (h *Handler) publishToThreadAccounts(ctx context.Context, accounts []string
return nil
}

// threadFanOutAccounts builds the deduplicated fan-out recipient list for a
// thread event. The message sender is always included first (unless a bot):
// they authored the reply and are therefore a thread participant, so their own
// devices must receive the event for multi-device sync. The sender is added
// directly here rather than relied upon via replyAccounts — replyAccounts is
// written by message-worker on a separate, unordered MESSAGES_CANONICAL
// consumer, so a fan-out that depended on it would race the sender's own first
// reply and silently drop the echo. followers (thread repliers) and
// extraAccounts (@-mentioned users) are merged after, deduped. Bots are always
// excluded.
func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, extraAccounts []string) []string {
seen := map[string]struct{}{}
var fanOut []string
// threadFanOutAccounts builds the deduplicated recipient list (sender +
// followers + member mentions, bots excluded). The sender is added directly,
// not via replyAccounts, which message-worker writes asynchronously and would
// race the author's own echo. Followers are trusted member-clean (room-worker
// scrubs them on removal, #308); mentions may target non-members (#307) so only
// memberMentions are delivered (#309).
func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, mentions []string, memberMentions map[string]struct{}) []string {
capHint := len(followers) + len(mentions) + 1
seen := make(map[string]struct{}, capHint)
fanOut := make([]string, 0, capHint)
add := func(acc string) {
if acc == "" {
return
Expand All @@ -968,26 +965,35 @@ func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, e
seen[acc] = struct{}{}
fanOut = append(fanOut, acc)
}
add(senderAccount) // author is a thread participant — include race-free
add(senderAccount)
for acc := range followers {
add(acc)
}
for _, acc := range extraAccounts {
add(acc)
for _, acc := range mentions {
if _, ok := memberMentions[acc]; ok { // gated: a mention may target a non-member
add(acc)
}
}
return fanOut
}

// channelThreadFanOut resolves the deduplicated recipient list for a channel
// thread event: it fetches the parent message's thread followers and merges
// them with the @-mentioned accounts, excluding the sender. Shared by the
// channel branch of every thread handler (created/updated/deleted).
func (h *Handler) channelThreadFanOut(ctx context.Context, parentMsgID, sender string, mentions []string) ([]string, error) {
// channelThreadFanOut resolves the channel thread recipient list: trusted
// followers plus mentioned members, sender always included. Only mentions are
// membership-checked, and only when present — a no-mention reply queries
// nothing, and the query is scoped to the mentions, never the whole room.
func (h *Handler) channelThreadFanOut(ctx context.Context, roomID, parentMsgID, sender string, mentions []string) ([]string, error) {
followers, err := h.store.GetThreadFollowers(ctx, parentMsgID)
if err != nil {
return nil, fmt.Errorf("get thread followers for parent %s: %w", parentMsgID, err)
}
return threadFanOutAccounts(sender, followers, mentions), nil
var memberMentions map[string]struct{}
if len(mentions) > 0 {
memberMentions, err = h.store.FilterRoomMembers(ctx, roomID, mentions)
if err != nil {
return nil, fmt.Errorf("filter mentioned members for room %s: %w", roomID, err)
}
}
return threadFanOutAccounts(sender, followers, mentions, memberMentions), nil
}

// usersByAccount indexes a slice of users by their Account for O(1) lookup
Expand Down
156 changes: 117 additions & 39 deletions broadcast-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1721,65 +1721,84 @@ func TestHandleUnpinned_DMRoom_FansOutToBothMembers(t *testing.T) {

func TestThreadFanOutAccounts(t *testing.T) {
tests := []struct {
name string
sender string
followers map[string]struct{}
extraAccounts []string
want []string
name string
sender string
followers map[string]struct{}
mentions []string
memberMentions map[string]struct{}
want []string
}{
{
name: "sender alone still notified (own devices, no other followers)",
sender: "alice",
followers: map[string]struct{}{},
extraAccounts: nil,
want: []string{"alice"},
name: "sender alone still notified (own devices, no other followers)",
sender: "alice",
followers: map[string]struct{}{},
want: []string{"alice"},
},
{
name: "sender included even when not yet in replyAccounts (race-free)",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
extraAccounts: nil,
want: []string{"alice", "bob"},
name: "sender included even when not yet in replyAccounts (race-free)",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
want: []string{"alice", "bob"},
},
{
name: "sender included when also a follower (multi-device support)",
name: "follower delivered without a membership check (replyAccounts trusted, #308)",
sender: "alice",
followers: map[string]struct{}{"alice": {}, "bob": {}},
want: []string{"alice", "bob"},
followers: map[string]struct{}{"bob": {}, "carol": {}},
want: []string{"alice", "bob", "carol"},
},
{
name: "member mention delivered",
sender: "alice",
followers: map[string]struct{}{},
mentions: []string{"carol"},
memberMentions: map[string]struct{}{"carol": {}},
want: []string{"alice", "carol"},
},
{
name: "sender included when only in extra accounts",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
extraAccounts: []string{"alice"},
want: []string{"bob", "alice"},
name: "non-member mention filtered out (#309)",
sender: "alice",
followers: map[string]struct{}{},
mentions: []string{"mallory"},
memberMentions: map[string]struct{}{}, // mallory is not a member → dropped
want: []string{"alice"},
},
{
name: "extra accounts merged deduped",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
extraAccounts: []string{"bob", "carol"},
want: []string{"alice", "bob", "carol"},
name: "followers trusted, mentions gated, mix",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
mentions: []string{"carol", "mallory"},
memberMentions: map[string]struct{}{"carol": {}},
want: []string{"alice", "bob", "carol"},
},
{
name: "bot accounts skipped even if sender is bot",
sender: "helper.bot",
followers: map[string]struct{}{"helper.bot": {}, "bob": {}},
extraAccounts: []string{"other.bot"},
want: []string{"bob"},
name: "bot accounts skipped even if sender is bot",
sender: "helper.bot",
followers: map[string]struct{}{"helper.bot": {}, "bob": {}},
mentions: []string{"other.bot"},
memberMentions: map[string]struct{}{"other.bot": {}},
want: []string{"bob"},
},
{
name: "sender not duplicated when in both followers and extras",
sender: "alice",
followers: map[string]struct{}{"alice": {}, "bob": {}},
extraAccounts: []string{"alice", "carol"},
want: []string{"alice", "bob", "carol"},
name: "sender not duplicated when also a follower and mention",
sender: "alice",
followers: map[string]struct{}{"alice": {}, "bob": {}},
mentions: []string{"alice", "carol"},
memberMentions: map[string]struct{}{"alice": {}, "carol": {}},
want: []string{"alice", "bob", "carol"},
},
{
name: "mention that is also a follower is delivered via the trusted follower path",
sender: "alice",
followers: map[string]struct{}{"bob": {}},
mentions: []string{"bob"},
memberMentions: map[string]struct{}{"bob": {}},
want: []string{"alice", "bob"},
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
got := threadFanOutAccounts(tc.sender, tc.followers, tc.extraAccounts)
got := threadFanOutAccounts(tc.sender, tc.followers, tc.mentions, tc.memberMentions)
assert.ElementsMatch(t, tc.want, got)
})
}
Expand Down Expand Up @@ -2016,6 +2035,7 @@ func TestHandleThreadCreated_ChannelRoom_FansOutToFollowers(t *testing.T) {
followers := map[string]struct{}{"bob": {}, "carol": {}}
store.EXPECT().GetRoomMeta(gomock.Any(), roomID).Return(metaOf(testChannelRoom), nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
// No @-mentions in the reply → no membership query; followers are trusted (#308).
us.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"alice"}).Return([]model.User{testUsers[0]}, nil)

evt := model.MessageEvent{
Expand Down Expand Up @@ -2054,6 +2074,60 @@ func TestHandleThreadCreated_ChannelRoom_FansOutToFollowers(t *testing.T) {
assert.True(t, subjects[subject.UserRoomEvent("carol")])
}

func TestHandleThreadCreated_ChannelRoom_FiltersNonMemberMentions(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
us := NewMockUserStore(ctrl)
pub := &mockPublisher{}
keyStore := NewMockRoomKeyProvider(ctrl)

msgTime := time.Date(2026, 4, 1, 10, 0, 0, 0, time.UTC)
parentMsgID := "parent-1"
siteID := "site-a"
roomID := "r1"

// Followers bob + carol are delivered without a membership check (#308); of
// the mentions, member dave is delivered but non-member mallory is not (#309).
followers := map[string]struct{}{"bob": {}, "carol": {}}
store.EXPECT().GetRoomMeta(gomock.Any(), roomID).Return(metaOf(testChannelRoom), nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
// Only the mentioned accounts are queried — never the followers.
store.EXPECT().FilterRoomMembers(gomock.Any(), "room-1", gomock.InAnyOrder([]string{"dave", "mallory"})).
Return(map[string]struct{}{"dave": {}}, nil)
us.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.InAnyOrder([]string{"alice", "dave", "mallory"})).Return([]model.User{testUsers[0]}, nil)

evt := model.MessageEvent{
Event: model.EventCreated,
SiteID: siteID,
Timestamp: msgTime.UnixMilli(),
Message: model.Message{
ID: "reply-1",
RoomID: roomID,
UserID: "u-alice",
UserAccount: "alice",
Content: "a thread reply @dave @mallory",
CreatedAt: msgTime,
ThreadParentMessageID: parentMsgID,
TShow: false,
},
}
data, _ := json.Marshal(evt)

h := NewHandler(store, us, pub, keyStore, false)
require.NoError(t, h.HandleMessage(context.Background(), data))

subjects := map[string]bool{}
for _, r := range pub.records {
subjects[r.subject] = true
}
assert.True(t, subjects[subject.UserRoomEvent("alice")], "sender must receive their own echo")
assert.True(t, subjects[subject.UserRoomEvent("bob")], "follower must receive the event")
assert.True(t, subjects[subject.UserRoomEvent("carol")], "follower must receive the event")
assert.True(t, subjects[subject.UserRoomEvent("dave")], "member mention must receive the event")
assert.False(t, subjects[subject.UserRoomEvent("mallory")], "non-member mention must NOT receive the live thread event (#309)")
require.Len(t, pub.records, 4)
}

func TestHandleThreadCreated_ChannelRoom_NoFollowers_SendsToSenderOnly(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
Expand All @@ -2065,6 +2139,7 @@ func TestHandleThreadCreated_ChannelRoom_NoFollowers_SendsToSenderOnly(t *testin

store.EXPECT().GetRoomMeta(gomock.Any(), "r1").Return(metaOf(testChannelRoom), nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{}, nil)
// No followers and no mentions → no membership query at all.
us.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"alice"}).Return([]model.User{testUsers[0]}, nil)

evt := model.MessageEvent{
Expand Down Expand Up @@ -2186,6 +2261,7 @@ func TestHandleThreadUpdated_ChannelRoom_FansOutToFollowers(t *testing.T) {
followers := map[string]struct{}{"bob": {}, "carol": {}}
store.EXPECT().GetRoom(gomock.Any(), roomID).Return(room, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
// No @-mentions → no membership query; followers are trusted (#308).

evt := model.MessageEvent{
Event: model.EventUpdated,
Expand Down Expand Up @@ -2334,6 +2410,7 @@ func TestHandleThreadDeleted_ChannelRoom_FansOutToFollowers(t *testing.T) {
followers := map[string]struct{}{"bob": {}, "carol": {}}
store.EXPECT().GetRoom(gomock.Any(), roomID).Return(room, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), parentMsgID).Return(followers, nil)
// No @-mentions → no membership query; followers are trusted (#308).
// No NewTCount → no badge update.

evt := model.MessageEvent{
Expand Down Expand Up @@ -2387,6 +2464,7 @@ func TestHandleThreadDeleted_ChannelRoom_WithBadgeUpdate(t *testing.T) {
room := &model.Room{ID: "r1", Type: model.RoomTypeChannel, SiteID: "site-a"}
store.EXPECT().GetRoom(gomock.Any(), "r1").Return(room, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{"bob": {}}, nil)
// No @-mentions → no membership query; follower bob is trusted (#308).

evt := model.MessageEvent{
Event: model.EventDeleted,
Expand Down
39 changes: 39 additions & 0 deletions broadcast-worker/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,45 @@ func TestBroadcastWorker_GetThreadFollowers_Integration(t *testing.T) {
})
}

func TestBroadcastWorker_FilterRoomMembers_Integration(t *testing.T) {
db := setupMongo(t)
ctx := context.Background()
store := NewMongoStore(db.Collection("rooms"), db.Collection("subscriptions"), db.Collection("thread_rooms"), nil, 0)

// alice and bob are members of r-fm; carol is a member of a different room only.
_, err := db.Collection("subscriptions").InsertMany(ctx, []interface{}{
model.Subscription{ID: "fm1", User: model.SubscriptionUser{ID: "u1", Account: "alice"}, RoomID: "r-fm"},
model.Subscription{ID: "fm2", User: model.SubscriptionUser{ID: "u2", Account: "bob"}, RoomID: "r-fm"},
model.Subscription{ID: "fm3", User: model.SubscriptionUser{ID: "u3", Account: "carol"}, RoomID: "r-other"},
})
require.NoError(t, err)

t.Run("returns only the queried accounts that are members", func(t *testing.T) {
// eve is not a member anywhere; carol is a member of a different room.
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"bob", "carol", "eve"})
require.NoError(t, err)
assert.Equal(t, map[string]struct{}{"bob": {}}, members)
})

t.Run("all queried accounts are members", func(t *testing.T) {
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"alice", "bob"})
require.NoError(t, err)
assert.Equal(t, map[string]struct{}{"alice": {}, "bob": {}}, members)
})

t.Run("empty input does not hit the database", func(t *testing.T) {
members, err := store.FilterRoomMembers(ctx, "r-fm", nil)
require.NoError(t, err)
assert.Empty(t, members)
})

t.Run("no matches returns empty map", func(t *testing.T) {
members, err := store.FilterRoomMembers(ctx, "r-fm", []string{"eve", "mallory"})
require.NoError(t, err)
assert.Empty(t, members)
})
}

func TestBroadcastWorker_EnsureIndexes_Integration(t *testing.T) {
db := setupMongo(t)
ctx := context.Background()
Expand Down
15 changes: 15 additions & 0 deletions broadcast-worker/mock_store_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading