Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/client-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -3719,6 +3719,7 @@ Additional legacy fields may be present, mirroring the `GET /api/v3/users` respo
| `chat.user.{account}.request.user.{siteID}.subscription.setAppSubscription` | [`subscription.setAppSubscription`](#subscriptionsetappsubscription) |
| `chat.user.{account}.request.user.{siteID}.apps.list` | [`apps.list`](#appslist) |
| `chat.user.{account}.request.user.{siteID}.thread.list` | [List User Threads](#list-user-threads) |
| `chat.user.{account}.request.user.{siteID}.thread.unread.summary` | [Get User Thread Unread Summary](#get-user-thread-unread-summary) |

#### status.getByName

Expand Down Expand Up @@ -4453,6 +4454,50 @@ See [Error envelope](#6-error-envelope-reference). A malformed `cursor` returns

---

#### Get User Thread Unread Summary

**Subject:** `chat.user.{account}.request.user.{siteID}.thread.unread.summary`
**Reply subject:** auto-generated `_INBOX.>` (NATS request/reply)

- `{siteID}` is the **caller's own home site** — the site that runs the aggregator.

Returns a single cross-site rollup of whether the user has any unread thread activity. `user-service` fans out a per-site query to **every configured federation site** (`ALL_SITE_IDS`, including the local site); each site's `room-service` answers for its own threads, and the booleans are OR-merged into one answer. Use this for a lightweight unread badge — it carries no per-thread detail (use [List User Threads](#list-user-threads) for that).

The request takes **no body** — the account is taken from the subject.

##### Success response

| Field | Type | Notes |
|---|---|---|
| `unread` | boolean | `true` if the user has any unread thread on any site. |
| `unreadDirectMessage` | boolean | `true` if any unread thread belongs to a DM room. |
| `unreadMention` | boolean | `true` if the user is @-mentioned in any unread thread. |
| `lastMessageAt` | number | Optional. UTC ms of the newest thread activity across all sites; absent when the user has no threads anywhere. |
| `unavailableSites` | string[] | Optional. Sites that failed to respond; the booleans may understate reality until they recover. |

```json
{
"unread": true,
"unreadDirectMessage": false,
"unreadMention": true,
"lastMessageAt": 1746518400000
}
```

##### Error response

See [Error envelope](#6-error-envelope-reference).

##### Triggered events — success path

`None — reply only.`

##### Triggered events — error path

`None — error returned only via the reply subject.`

---

## 4. Message Send

### Send Message
Expand Down
12 changes: 12 additions & 0 deletions pkg/model/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,18 @@ func TestThreadUnreadSummaryResponseJSON(t *testing.T) {
roundTrip(t, &r, &model.ThreadUnreadSummaryResponse{})
}

func TestThreadUnreadAggregateResponseJSON(t *testing.T) {
ms := int64(1717000000000)
r := model.ThreadUnreadAggregateResponse{
Unread: true,
UnreadDirectMessage: true,
UnreadMention: false,
LastMessageAt: &ms,
UnavailableSites: []string{"site-b"},
}
roundTrip(t, &r, &model.ThreadUnreadAggregateResponse{})
}

func TestThreadUnreadSummaryRequestJSON(t *testing.T) {
r := model.ThreadUnreadSummaryRequest{UserAccount: "alice@example.com"}
roundTrip(t, &r, &model.ThreadUnreadSummaryRequest{})
Expand Down
36 changes: 20 additions & 16 deletions pkg/model/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,26 @@ type SubscriptionUser struct {
}

type Subscription struct {
ID string `json:"id" bson:"_id"`
User SubscriptionUser `json:"u" bson:"u"`
RoomID string `json:"roomId" bson:"roomId"`
SiteID string `json:"siteId" bson:"siteId"`
Roles []Role `json:"roles" bson:"roles"`
Name string `json:"name" bson:"name"`
RoomType RoomType `json:"roomType" bson:"roomType"`
IsSubscribed bool `json:"isSubscribed,omitempty" bson:"isSubscribed,omitempty"`
HistorySharedSince *time.Time `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"`
JoinedAt time.Time `json:"joinedAt" bson:"joinedAt"`
LastSeenAt *time.Time `json:"lastSeenAt,omitempty" bson:"lastSeenAt,omitempty"`
HasMention bool `json:"hasMention" bson:"hasMention"`
ThreadUnread []string `json:"threadUnread,omitempty" bson:"threadUnread,omitempty"`
Alert bool `json:"alert" bson:"alert"`
Muted bool `json:"muted" bson:"muted"`
Favorite bool `json:"favorite" bson:"favorite"`
ID string `json:"id" bson:"_id"`
User SubscriptionUser `json:"u" bson:"u"`
RoomID string `json:"roomId" bson:"roomId"`
SiteID string `json:"siteId" bson:"siteId"`
Roles []Role `json:"roles" bson:"roles"`
Name string `json:"name" bson:"name"`
RoomType RoomType `json:"roomType" bson:"roomType"`
// IsSubscribed is app-subscription state for botDM rooms. omitempty means false
// is stored as an ABSENT field, so queries must match {isSubscribed: true}
// (present ⇒ subscribed), never {$ne: false}. See user-service aggregateCurrent
// and room-service GetThreadUnreadSummary.
IsSubscribed bool `json:"isSubscribed,omitempty" bson:"isSubscribed,omitempty"`
HistorySharedSince *time.Time `json:"historySharedSince,omitempty" bson:"historySharedSince,omitempty"`
JoinedAt time.Time `json:"joinedAt" bson:"joinedAt"`
LastSeenAt *time.Time `json:"lastSeenAt,omitempty" bson:"lastSeenAt,omitempty"`
HasMention bool `json:"hasMention" bson:"hasMention"`
ThreadUnread []string `json:"threadUnread,omitempty" bson:"threadUnread,omitempty"`
Alert bool `json:"alert" bson:"alert"`
Muted bool `json:"muted" bson:"muted"`
Favorite bool `json:"favorite" bson:"favorite"`
// Denormalized from Room.{Restricted,ExternalAccess}; the only place remote sites carry restricted state
// (cross-site inbox mirrors subscriptions, not Room docs). Treat missing as false.
Restricted bool `json:"restricted,omitempty" bson:"restricted,omitempty"`
Expand Down
14 changes: 14 additions & 0 deletions pkg/model/threadsubscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,17 @@ type ThreadUnreadSummaryResponse struct {
UnreadMention bool `json:"unreadMention"`
LastMessageAt *int64 `json:"lastMessageAt,omitempty"`
}

// ThreadUnreadAggregateResponse is the cross-site rollup the user-service
// aggregator returns to the client. The booleans are OR-merged across every
// federation site; LastMessageAt is the newest across sites (UnixMilli, nil when
// the user has no threads anywhere). UnavailableSites lists sites that failed to
// respond — their unread state is not reflected in this page and the booleans
// may understate reality until they recover.
type ThreadUnreadAggregateResponse struct {
Unread bool `json:"unread"`
UnreadDirectMessage bool `json:"unreadDirectMessage"`
UnreadMention bool `json:"unreadMention"`
LastMessageAt *int64 `json:"lastMessageAt,omitempty"`
UnavailableSites []string `json:"unavailableSites,omitempty"`
}
18 changes: 18 additions & 0 deletions pkg/subject/subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,6 +974,24 @@ func UserThreadListPattern(siteID string) string {
return fmt.Sprintf("chat.user.{account}.request.user.%s.thread.list", siteID)
}

// UserThreadUnreadSummary is the concrete client-facing subject for the
// cross-site thread unread rollup RPC. siteID is the CALLER's own home site —
// the site that runs the aggregator. Pair with UserThreadUnreadSummaryPattern
// for user-service's registration.
func UserThreadUnreadSummary(account, siteID string) string {
if !isValidAccountToken(account) {
panic("invalid account token: contains NATS wildcard characters")
}
return fmt.Sprintf("chat.user.%s.request.user.%s.thread.unread.summary", account, siteID)
}

// UserThreadUnreadSummaryPattern is the natsrouter pattern user-service
// registers for the cross-site thread unread rollup RPC (siteID baked in,
// account left as {account}).
func UserThreadUnreadSummaryPattern(siteID string) string {
return fmt.Sprintf("chat.user.{account}.request.user.%s.thread.unread.summary", siteID)
}

func UserSubscriptionSetAppSubscription(account, siteID string) string {
if !isValidAccountToken(account) {
panic("invalid account token: contains NATS wildcard characters")
Expand Down
6 changes: 6 additions & 0 deletions pkg/subject/subject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,10 @@ func TestUserServiceBuilders(t *testing.T) {
{"subscription.list", subject.UserSubscriptionList("alice", "s1"), "chat.user.alice.request.user.s1.subscription.list"},
{"subscription.setAppSubscription", subject.UserSubscriptionSetAppSubscription("alice", "s1"), "chat.user.alice.request.user.s1.subscription.setAppSubscription"},
{"subscription.getByRoomID", subject.UserSubscriptionGetByRoomID("alice", "s1"), "chat.user.alice.request.user.s1.subscription.getByRoomID"},
{"thread.list", subject.UserThreadList("alice", "s1"), "chat.user.alice.request.user.s1.thread.list"},
{"thread.list.pattern", subject.UserThreadListPattern("s1"), "chat.user.{account}.request.user.s1.thread.list"},
{"thread.unread.summary", subject.UserThreadUnreadSummary("alice", "s1"), "chat.user.alice.request.user.s1.thread.unread.summary"},
{"thread.unread.summary.pattern", subject.UserThreadUnreadSummaryPattern("s1"), "chat.user.{account}.request.user.s1.thread.unread.summary"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
Expand Down Expand Up @@ -691,6 +695,8 @@ func TestUserServiceBuildersRejectWildcardAccounts(t *testing.T) {
{"UserSubscriptionList", func() { subject.UserSubscriptionList("*", "s1") }},
{"UserSubscriptionSetAppSubscription", func() { subject.UserSubscriptionSetAppSubscription("*", "s1") }},
{"UserSubscriptionGetByRoomID", func() { subject.UserSubscriptionGetByRoomID(">", "s1") }},
{"UserThreadList", func() { subject.UserThreadList("*", "s1") }},
{"UserThreadUnreadSummary", func() { subject.UserThreadUnreadSummary(">", "s1") }},
}
for _, b := range builders {
t.Run(b.name, func(t *testing.T) {
Expand Down
69 changes: 66 additions & 3 deletions room-service/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3716,9 +3716,11 @@ func TestMongoStore_GetThreadUnreadSummary_Integration(t *testing.T) {
newer := time.Now().UTC()
older := time.Now().UTC().Add(-2 * time.Hour)

_, err := db.Collection("rooms").InsertMany(ctx, []interface{}{
model.Room{ID: "room-chan", Type: model.RoomTypeChannel, SiteID: site},
model.Room{ID: "room-dm", Type: model.RoomTypeDM, SiteID: site},
// roomType (for DM classification) comes from the user's room subscription, not
// a rooms lookup — so the pipeline reads subscriptions, never the rooms collection.
_, err := db.Collection("subscriptions").InsertMany(ctx, []interface{}{
model.Subscription{ID: "sub-chan", RoomID: "room-chan", SiteID: site, RoomType: model.RoomTypeChannel, User: model.SubscriptionUser{Account: account}},
model.Subscription{ID: "sub-dm", RoomID: "room-dm", SiteID: site, RoomType: model.RoomTypeDM, User: model.SubscriptionUser{Account: account}},
})
require.NoError(t, err)

Expand Down Expand Up @@ -3802,6 +3804,67 @@ func TestMongoStore_GetThreadUnreadSummary_Integration(t *testing.T) {
assert.False(t, got.Unread)
assert.Nil(t, got.LastMessageAt)
})

t.Run("unsubscribed app (botDM, isSubscribed=false) thread is excluded", func(t *testing.T) {
const appUser = "unsub-app@example.com"
_, err := db.Collection("thread_rooms").InsertOne(ctx, model.ThreadRoom{ID: "tr-bot-unsub", RoomID: "room-bot-unsub", SiteID: site, LastMsgAt: newer})
require.NoError(t, err)
_, err = db.Collection("thread_subscriptions").InsertOne(ctx, model.ThreadSubscription{ID: "ts-bot-unsub", ThreadRoomID: "tr-bot-unsub", RoomID: "room-bot-unsub", UserAccount: appUser, SiteID: site, LastSeenAt: nil, HasMention: true})
require.NoError(t, err)
// The room subscription exists but the user has unsubscribed from the app
// (isSubscribed=false → omitted from BSON).
_, err = db.Collection("subscriptions").InsertOne(ctx, model.Subscription{ID: "sub-bot-unsub", RoomID: "room-bot-unsub", SiteID: site, RoomType: model.RoomTypeBotDM, IsSubscribed: false, User: model.SubscriptionUser{Account: appUser}})
require.NoError(t, err)

got, err := store.GetThreadUnreadSummary(ctx, appUser, site)
require.NoError(t, err)
assert.False(t, got.Unread)
assert.False(t, got.UnreadMention)
assert.Nil(t, got.LastMessageAt)
})

t.Run("subscribed app (botDM, isSubscribed=true) thread is included", func(t *testing.T) {
const appUser = "sub-app@example.com"
_, err := db.Collection("thread_rooms").InsertOne(ctx, model.ThreadRoom{ID: "tr-bot-sub", RoomID: "room-bot-sub", SiteID: site, LastMsgAt: newer})
require.NoError(t, err)
_, err = db.Collection("thread_subscriptions").InsertOne(ctx, model.ThreadSubscription{ID: "ts-bot-sub", ThreadRoomID: "tr-bot-sub", RoomID: "room-bot-sub", UserAccount: appUser, SiteID: site, LastSeenAt: nil, HasMention: true})
require.NoError(t, err)
_, err = db.Collection("subscriptions").InsertOne(ctx, model.Subscription{ID: "sub-bot-sub", RoomID: "room-bot-sub", SiteID: site, RoomType: model.RoomTypeBotDM, IsSubscribed: true, User: model.SubscriptionUser{Account: appUser}})
require.NoError(t, err)

got, err := store.GetThreadUnreadSummary(ctx, appUser, site)
require.NoError(t, err)
assert.True(t, got.Unread)
assert.True(t, got.UnreadMention)
require.NotNil(t, got.LastMessageAt)
assert.WithinDuration(t, newer, got.LastMessageAt.UTC(), time.Millisecond)
})

t.Run("unsubscribed app thread does not mask a subscribed channel thread", func(t *testing.T) {
const mixUser = "mixed@example.com"
_, err := db.Collection("thread_rooms").InsertMany(ctx, []interface{}{
model.ThreadRoom{ID: "tr-mix-bot", RoomID: "room-bot-mix", SiteID: site, LastMsgAt: newer},
model.ThreadRoom{ID: "tr-mix-chan", RoomID: "room-chan", SiteID: site, LastMsgAt: older},
})
require.NoError(t, err)
_, err = db.Collection("thread_subscriptions").InsertMany(ctx, []interface{}{
model.ThreadSubscription{ID: "ts-mix-bot", ThreadRoomID: "tr-mix-bot", RoomID: "room-bot-mix", UserAccount: mixUser, SiteID: site, LastSeenAt: nil},
model.ThreadSubscription{ID: "ts-mix-chan", ThreadRoomID: "tr-mix-chan", RoomID: "room-chan", UserAccount: mixUser, SiteID: site, LastSeenAt: nil},
})
require.NoError(t, err)
_, err = db.Collection("subscriptions").InsertMany(ctx, []interface{}{
model.Subscription{ID: "sub-mix-bot", RoomID: "room-bot-mix", SiteID: site, RoomType: model.RoomTypeBotDM, IsSubscribed: false, User: model.SubscriptionUser{Account: mixUser}},
model.Subscription{ID: "sub-mix-chan", RoomID: "room-chan", SiteID: site, RoomType: model.RoomTypeChannel, User: model.SubscriptionUser{Account: mixUser}},
})
require.NoError(t, err)

got, err := store.GetThreadUnreadSummary(ctx, mixUser, site)
require.NoError(t, err)
assert.True(t, got.Unread) // from the channel thread
require.NotNil(t, got.LastMessageAt)
// lastMessageAt reflects only the kept (channel) thread, not the newer app thread.
assert.WithinDuration(t, older, got.LastMessageAt.UTC(), time.Millisecond)
})
}

// account is a user's identity, so EnsureIndexes makes users.account unique —
Expand Down
60 changes: 46 additions & 14 deletions room-service/store_mongo.go
Original file line number Diff line number Diff line change
Expand Up @@ -1656,33 +1656,56 @@ func (s *MongoStore) FindUsersByAccounts(ctx context.Context, accounts []string)
return users, nil
}

// GetThreadUnreadSummary rolls up a single user's thread unread state on this
// site. Unread = subscribed AND threadRoom.lastMsgAt > lastSeenAt (nil lastSeenAt
// = never seen = unread, via BSON null being the smallest value). The booleans
// are an OR across the user's threads, expressed as $max over per-row booleans
// (BSON orders false < true). lastMessageAt is the newest thread message time.
func (s *MongoStore) GetThreadUnreadSummary(ctx context.Context, account, siteID string) (*ThreadUnreadSummary, error) {
pipeline := mongo.Pipeline{
// threadUnreadSummaryPipeline builds the GetThreadUnreadSummary aggregation:
// match the user's thread subscriptions on this site, join each thread's room
// (for lastMsgAt) and the user's room subscription (for roomType + isSubscribed),
// drop threads on unsubscribed botDM apps, then OR-reduce the per-row unread
// booleans into a single rollup.
func threadUnreadSummaryPipeline(account, siteID string) mongo.Pipeline {
return mongo.Pipeline{
{{Key: "$match", Value: bson.M{"userAccount": account, "siteId": siteID}}},
// $lookup justification: thread_rooms holds the thread's lastMsgAt, which is
// not denormalized onto thread_subscriptions; the unread comparison needs it.
{{Key: "$lookup", Value: bson.M{
"from": "thread_rooms",
"localField": "threadRoomId",
"foreignField": "_id",
"as": "tr",
}}},
{{Key: "$unwind", Value: "$tr"}},
// $lookup justification: roomType (dm/botDM classification) and isSubscribed
// both live only on the room subscription, not on thread_subscriptions —
// denormalizing them would need write-path fan-out across services and risk
// staleness. The subscription is the single source for both, so no separate
// rooms lookup is needed. Join on the (roomId, u.account) unique index,
// projected to the two fields used.
{{Key: "$lookup", Value: bson.M{
"from": "rooms",
"localField": "roomId",
"foreignField": "_id",
"as": "room",
"from": "subscriptions",
"let": bson.M{"rid": "$roomId", "acct": "$userAccount"},
"pipeline": mongo.Pipeline{
{{Key: "$match", Value: bson.M{"$expr": bson.M{"$and": bson.A{
bson.M{"$eq": bson.A{"$roomId", "$$rid"}},
bson.M{"$eq": bson.A{"$u.account", "$$acct"}},
}}}}},
{{Key: "$project", Value: bson.M{"_id": 0, "roomType": 1, "isSubscribed": 1}}},
},
"as": "sub",
}}},
{{Key: "$unwind", Value: bson.M{"path": "$room", "preserveNullAndEmptyArrays": true}}},
{{Key: "$unwind", Value: bson.M{"path": "$sub", "preserveNullAndEmptyArrays": true}}},
// Drop threads on a botDM the user has unsubscribed from: an app subscription
// stores isSubscribed=false as an omitted field, so the inbox keeps a botDM
// only when isSubscribed is explicitly true. Non-botDM rooms (dm/channel) are
// always kept — isSubscribed is meaningful only for app subscriptions. Mirrors
// user-service's inbox filter {roomType: botDM, isSubscribed: true}.
{{Key: "$match", Value: bson.M{"$or": bson.A{
bson.M{"sub.roomType": bson.M{"$ne": model.RoomTypeBotDM}},
bson.M{"sub.isSubscribed": true},
}}}},
{{Key: "$addFields", Value: bson.M{
"isUnread": bson.M{"$gt": bson.A{"$tr.lastMsgAt", "$lastSeenAt"}},
"isDMUnread": bson.M{"$and": bson.A{
bson.M{"$gt": bson.A{"$tr.lastMsgAt", "$lastSeenAt"}},
bson.M{"$eq": bson.A{"$room.type", model.RoomTypeDM}},
bson.M{"$eq": bson.A{"$sub.roomType", model.RoomTypeDM}},
}},
}}},
{{Key: "$group", Value: bson.M{
Expand All @@ -1697,8 +1720,17 @@ func (s *MongoStore) GetThreadUnreadSummary(ctx context.Context, account, siteID
"lastMessageAt": bson.M{"$max": "$tr.lastMsgAt"},
}}},
}
}

cursor, err := s.threadSubscriptions.Aggregate(ctx, pipeline)
// GetThreadUnreadSummary rolls up a single user's thread unread state on this
// site. Unread = subscribed AND threadRoom.lastMsgAt > lastSeenAt (nil lastSeenAt
// = never seen = unread, via BSON null being the smallest value). "Subscribed"
// drops threads on a botDM app the user has unsubscribed from (room subscription
// isSubscribed != true); dm/channel threads are always counted. The booleans are
// an OR across the user's threads, expressed as $max over per-row booleans (BSON
// orders false < true). lastMessageAt is the newest kept thread message time.
func (s *MongoStore) GetThreadUnreadSummary(ctx context.Context, account, siteID string) (*ThreadUnreadSummary, error) {
cursor, err := s.threadSubscriptions.Aggregate(ctx, threadUnreadSummaryPipeline(account, siteID))
if err != nil {
return nil, fmt.Errorf("aggregate thread unread summary: %w", err)
}
Expand Down
Loading
Loading