Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 53 additions & 8 deletions broadcast-worker/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,9 +208,16 @@ func (h *Handler) handleThreadCreated(ctx context.Context, evt *model.MessageEve

// Channel rooms: only thread subscribers and @-mentioned accounts receive the
// event. Fetch the subscriber list and build fanOut before any further work.
// Mentioned accounts are first gated by history visibility (members only, and
// only when their window admits the parent) so a restricted or non-member
// mentionee never receives a thread they cannot see.
var fanOut []string
if meta.Type == model.RoomTypeChannel {
fanOut, err = h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
allowedMentions, err := h.allowedThreadMentions(ctx, msg.RoomID, parsed.Accounts, msg.ThreadParentMessageCreatedAt)
if err != nil {
return err
}
fanOut, err = h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, allowedMentions)
if err != nil {
return fmt.Errorf("channel thread fan-out for parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -252,12 +259,17 @@ func (h *Handler) handleThreadCreated(ctx context.Context, evt *model.MessageEve
}
return h.publishToThreadAccounts(ctx, fanOut, payload, parentMsgID)
case model.RoomTypeDM, model.RoomTypeBotDM:
// DM thread replies fan out to all members. @-mention badges are correct
// since DM members can see the reply. lastMsgAt is intentionally NOT
// updated: thread replies must not trigger hasUnread for non-participants.
// DM thread replies fan out to all members. The mention badge is written to
// the THREAD subscription (by parentMessageId), not the room subscription —
// a thread-only reply must not bold the whole room. No history gate is needed
// here: SetThreadSubscriptionMentions only updates thread_subscriptions rows
// that already exist, and those are created (and history-gated) upstream by
// message-worker — a restricted or non-member mentionee has no row to flip.
// lastMsgAt is intentionally NOT updated: thread replies must not trigger
// hasUnread for non-participants.
if len(resolved.Accounts) > 0 {
if err := h.store.SetSubscriptionMentions(ctx, meta.ID, resolved.Accounts); err != nil {
return fmt.Errorf("set subscription mentions: %w", err)
if err := h.store.SetThreadSubscriptionMentions(ctx, parentMsgID, resolved.Accounts); err != nil {
return fmt.Errorf("set thread subscription mentions: %w", err)
}
}
return h.publishDMEvents(ctx, meta, clientMsg, evt.Timestamp, resolved.Accounts)
Expand Down Expand Up @@ -314,7 +326,11 @@ func (h *Handler) handleThreadUpdated(ctx context.Context, evt *model.MessageEve
switch room.Type {
case model.RoomTypeChannel:
parsed := mention.Parse(msg.Content)
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
allowedMentions, err := h.allowedThreadMentions(ctx, room.ID, parsed.Accounts, msg.ThreadParentMessageCreatedAt)
if err != nil {
return err
}
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, allowedMentions)
if err != nil {
return fmt.Errorf("channel thread fan-out for thread update of parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -367,7 +383,11 @@ func (h *Handler) handleThreadDeleted(ctx context.Context, evt *model.MessageEve
// receive the delete. Only the channel path uses mentions; the DM path
// fans out to all members.
parsed := mention.Parse(msg.Content)
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, parsed.Accounts)
allowedMentions, err := h.allowedThreadMentions(ctx, room.ID, parsed.Accounts, msg.ThreadParentMessageCreatedAt)
if err != nil {
return err
}
fanOut, err := h.channelThreadFanOut(ctx, parentMsgID, msg.UserAccount, allowedMentions)
if err != nil {
return fmt.Errorf("channel thread fan-out for thread delete of parent %s: %w", parentMsgID, err)
}
Expand Down Expand Up @@ -978,6 +998,31 @@ func threadFanOutAccounts(senderAccount string, followers map[string]struct{}, e
return fanOut
}

// allowedThreadMentions filters mentioned accounts down to room members permitted
// to see the thread by their history window (mentionVisible). Accounts with no
// room subscription (non-members) are absent from the window map and excluded.
// Returns nil for an empty input.
func (h *Handler) allowedThreadMentions(ctx context.Context, roomID string, mentions []string, parentCreatedAt *time.Time) ([]string, error) {
if len(mentions) == 0 {
return nil, nil
}
windows, err := h.store.GetHistorySharedSince(ctx, roomID, mentions)
if err != nil {
return nil, fmt.Errorf("get history windows for room %s: %w", roomID, err)
}
allowed := make([]string, 0, len(mentions))
for _, acc := range mentions {
hss, isMember := windows[acc]
// Exclude non-members (no room subscription) outright; keep a member only when
// their history window admits the thread's parent.
if !isMember || !mentionVisible(hss, parentCreatedAt) {
continue
}
allowed = append(allowed, acc)
}
return allowed, nil
}

// channelThreadFanOut resolves the deduplicated recipient list for a channel
// thread event: it fetches the parent message's thread followers and merges
// them with the @-mentioned accounts, excluding the sender. Shared by the
Expand Down
157 changes: 156 additions & 1 deletion broadcast-worker/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2142,10 +2142,14 @@ func TestHandleThreadCreated_DMRoom_WithMention_SetsMentions(t *testing.T) {

msgTime := time.Date(2026, 4, 1, 11, 0, 0, 0, time.UTC)

// Thread reply mention badge goes to the THREAD subscription (by parentMessageId),
// not the room subscription. No history gate on the DM path — SetThreadSubscriptionMentions
// only flips rows message-worker already created (and gated), so resolved.Accounts is passed directly.
store.EXPECT().GetRoomMeta(gomock.Any(), "dm-1").Return(metaOf(testDMRoom), nil)
store.EXPECT().SetSubscriptionMentions(gomock.Any(), "dm-1", []string{"bob"}).Return(nil)
store.EXPECT().SetThreadSubscriptionMentions(gomock.Any(), "parent-dm", []string{"bob"}).Return(nil)
store.EXPECT().ListSubscriptions(gomock.Any(), "dm-1").Return(testDMSubs, nil)
us.EXPECT().FindUsersByAccounts(gomock.Any(), []string{"alice", "bob"}).Return(testUsers, nil)
// SetSubscriptionMentions (room sub) must NOT be called — no EXPECT registered.

evt := model.MessageEvent{
Event: model.EventCreated,
Expand All @@ -2169,6 +2173,56 @@ func TestHandleThreadCreated_DMRoom_WithMention_SetsMentions(t *testing.T) {
require.Len(t, pub.records, 2)
}

func TestHandleThreadCreated_ChannelExcludesRestrictedAndNonMemberMentions(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
us := NewMockUserStore(ctrl)
pub := &mockPublisher{}
keyStore := NewMockRoomKeyProvider(ctrl)

parentAt := time.Date(2026, 6, 1, 12, 0, 0, 0, time.UTC)
joinedAfter := parentAt.Add(time.Hour)
msgTime := parentAt.Add(2 * time.Hour)

// bob: member, full access → included. carol: member, joined after parent → excluded.
// dave: absent from the map → non-member → excluded.
store.EXPECT().GetRoomMeta(gomock.Any(), "room-1").Return(metaOf(testChannelRoom), nil)
store.EXPECT().GetHistorySharedSince(gomock.Any(), "room-1", gomock.Any()).
Return(map[string]*time.Time{"bob": nil, "carol": &joinedAfter}, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{}, nil)
us.EXPECT().FindUsersByAccounts(gomock.Any(), gomock.Any()).Return(testUsers, nil)

evt := model.MessageEvent{
Event: model.EventCreated,
SiteID: "site-a",
Timestamp: msgTime.UnixMilli(),
Message: model.Message{
ID: "reply-1",
RoomID: "room-1",
UserID: "u-alice",
UserAccount: "alice",
Content: "@bob @carol @dave hi",
CreatedAt: msgTime,
ThreadParentMessageID: "parent-1",
ThreadParentMessageCreatedAt: &parentAt,
TShow: false,
},
}
data, _ := json.Marshal(evt)

h := NewHandler(store, us, pub, keyStore, false)
require.NoError(t, h.HandleMessage(context.Background(), data))

got := map[string]bool{}
for _, r := range pub.records {
got[r.subject] = true
}
assert.True(t, got[subject.UserRoomEvent("alice")], "sender receives their own echo")
assert.True(t, got[subject.UserRoomEvent("bob")], "unrestricted member mentionee receives the reply")
assert.False(t, got[subject.UserRoomEvent("carol")], "member who joined after the parent is excluded")
assert.False(t, got[subject.UserRoomEvent("dave")], "non-member mentionee is excluded")
}

func TestHandleThreadUpdated_ChannelRoom_FansOutToFollowers(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
Expand Down Expand Up @@ -2226,6 +2280,107 @@ func TestHandleThreadUpdated_ChannelRoom_FansOutToFollowers(t *testing.T) {
assert.True(t, subjects[subject.UserRoomEvent("carol")])
}

func TestHandleThreadUpdated_ChannelExcludesRestrictedAndNonMemberMentions(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
us := NewMockUserStore(ctrl)
pub := &mockPublisher{}
keyStore := NewMockRoomKeyProvider(ctrl)

parentAt := time.Date(2026, 6, 1, 12, 0, 0, 0, time.UTC)
joinedAfter := parentAt.Add(time.Hour)
msgTime := parentAt.Add(2 * time.Hour)
editedAt := msgTime.Add(time.Minute)

room := &model.Room{ID: "r1", Type: model.RoomTypeChannel, SiteID: "site-a"}
store.EXPECT().GetRoom(gomock.Any(), "r1").Return(room, nil)
// bob: member full access → included. carol: joined after parent → excluded.
// dave: absent → non-member → excluded.
store.EXPECT().GetHistorySharedSince(gomock.Any(), "r1", gomock.Any()).
Return(map[string]*time.Time{"bob": nil, "carol": &joinedAfter}, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{}, nil)

evt := model.MessageEvent{
Event: model.EventUpdated,
SiteID: "site-a",
Timestamp: editedAt.UnixMilli(),
Message: model.Message{
ID: "reply-1",
RoomID: "r1",
UserAccount: "alice",
Content: "@bob @carol @dave hi",
CreatedAt: msgTime,
EditedAt: &editedAt,
UpdatedAt: &editedAt,
ThreadParentMessageID: "parent-1",
ThreadParentMessageCreatedAt: &parentAt,
TShow: false,
},
}
data, _ := json.Marshal(evt)

h := NewHandler(store, us, pub, keyStore, false)
require.NoError(t, h.HandleMessage(context.Background(), data))

got := map[string]bool{}
for _, r := range pub.records {
got[r.subject] = true
}
assert.True(t, got[subject.UserRoomEvent("alice")], "sender receives their own echo")
assert.True(t, got[subject.UserRoomEvent("bob")], "unrestricted member mentionee receives the edit")
assert.False(t, got[subject.UserRoomEvent("carol")], "member who joined after the parent is excluded")
assert.False(t, got[subject.UserRoomEvent("dave")], "non-member mentionee is excluded")
}

func TestHandleThreadDeleted_ChannelExcludesRestrictedAndNonMemberMentions(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
us := NewMockUserStore(ctrl)
pub := &mockPublisher{}
keyStore := NewMockRoomKeyProvider(ctrl)

parentAt := time.Date(2026, 6, 1, 12, 0, 0, 0, time.UTC)
joinedAfter := parentAt.Add(time.Hour)
msgTime := parentAt.Add(2 * time.Hour)
deletedAt := msgTime.Add(time.Minute)

room := &model.Room{ID: "r1", Type: model.RoomTypeChannel, SiteID: "site-a"}
store.EXPECT().GetRoom(gomock.Any(), "r1").Return(room, nil)
store.EXPECT().GetHistorySharedSince(gomock.Any(), "r1", gomock.Any()).
Return(map[string]*time.Time{"bob": nil, "carol": &joinedAfter}, nil)
store.EXPECT().GetThreadFollowers(gomock.Any(), "parent-1").Return(map[string]struct{}{}, nil)

evt := model.MessageEvent{
Event: model.EventDeleted,
SiteID: "site-a",
Timestamp: deletedAt.UnixMilli(),
Message: model.Message{
ID: "reply-1",
RoomID: "r1",
UserAccount: "alice",
Content: "@bob @carol @dave hi",
CreatedAt: msgTime,
UpdatedAt: &deletedAt,
ThreadParentMessageID: "parent-1",
ThreadParentMessageCreatedAt: &parentAt,
TShow: false,
},
}
data, _ := json.Marshal(evt)

h := NewHandler(store, us, pub, keyStore, false)
require.NoError(t, h.HandleMessage(context.Background(), data))

got := map[string]bool{}
for _, r := range pub.records {
got[r.subject] = true
}
assert.True(t, got[subject.UserRoomEvent("alice")], "sender receives their own echo")
assert.True(t, got[subject.UserRoomEvent("bob")], "unrestricted member mentionee receives the delete")
assert.False(t, got[subject.UserRoomEvent("carol")], "member who joined after the parent is excluded")
assert.False(t, got[subject.UserRoomEvent("dave")], "non-member mentionee is excluded")
}

func TestHandleThreadUpdated_ChannelRoom_GetThreadFollowersError(t *testing.T) {
ctrl := gomock.NewController(t)
store := NewMockStore(ctrl)
Expand Down
20 changes: 19 additions & 1 deletion broadcast-worker/helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,24 @@
package main

import "strings"
import (
"strings"
"time"
)

// mentionVisible reports whether a mentioned user whose room subscription carries
// historySharedSince may see a thread reply whose parent was created at
// parentCreatedAt. Mirrors notification-worker.isRestricted (inverted to
// "visible"): a nil window means full access; a set window with a missing or
// older parent timestamp means no access.
func mentionVisible(historySharedSince, parentCreatedAt *time.Time) bool {
if historySharedSince == nil {
return true
}
if parentCreatedAt == nil {
return false
}
return !parentCreatedAt.Before(*historySharedSince)
}

// isBot returns true if account follows the bot naming convention used across
// the codebase (suffix `.bot` or prefix `p_`). Mirrors the predicate in
Expand Down
25 changes: 25 additions & 0 deletions broadcast-worker/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,35 @@ package main

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestMentionVisible(t *testing.T) {
base := time.Date(2026, 6, 1, 12, 0, 0, 0, time.UTC)
before := base.Add(-time.Hour)
after := base.Add(time.Hour)
tests := []struct {
name string
hss *time.Time
parent *time.Time
visible bool
}{
{"nil window is unrestricted", nil, &base, true},
{"nil window nil parent still unrestricted", nil, nil, true},
{"parent after window is visible", &before, &base, true},
{"parent equal to window is visible", &base, &base, true},
{"parent before window is hidden", &after, &base, false},
{"set window with nil parent is hidden", &before, nil, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equal(t, tt.visible, mentionVisible(tt.hss, tt.parent))
})
}
}

func TestDedupedAccounts(t *testing.T) {
cases := []struct {
name string
Expand Down
Loading
Loading