Skip to content

Commit 52c8a74

Browse files
committed
feat(room-service,room-worker): durable federation relay for cross-site events
Six room-service request/reply handlers (role_updated, mute/favorite toggled, subscription_read, thread_read, room_restricted) federated cross-site events by publishing an InboxEvent inline straight to a remote site's INBOX across a supercluster gateway. On failure the error returned to the client *after* the local Mongo write committed, so local and remote diverged with no durable retry. Replace this with a durable "federation relay": each handler keeps its synchronous Mongo write and reply but publishes one RoomFederationEvent to the local ROOMS stream; room-worker forwards each wrapped InboxEvent to the destination INBOX with at-least-once retry — the source stream is the outbox. The producer publish is local-cluster only, so a remote outage can never block the user's RPC, and a destination-site outage delays the event (retry-forever with escalating backoff) rather than dropping it. - pkg/model: RoomFederationEvent + FederationTarget envelope types. - pkg/subject: RoomCanonicalFederation builder (chat.room.canonical.{siteID}.federation). - room-service: federate + buildFederationTarget helpers; six handlers converted. Wire format is byte-identical to the prior direct publishes, so inbox-worker is unchanged. - room-worker: processFederation forwards each target (transient error -> Nak/redeliver, malformed -> Ack-poison), validating destSiteID/eventType/ envelope/dedupId at the boundary, each attempt bounded by a 3s fail-fast timeout. It runs on a dedicated durable consumer + worker pool, isolated from the membership consumer (filtered to create/member.add/member.remove/ room.rename), so an unreachable destination backs up only the federation lane, never local membership processing. The federation lane retries a failed forward forever with escalating backoff (5s -> 5m, MaxDeliver=-1), so a long destination outage delays — never drops — the event. Fails fast on non-positive MAX_WORKERS. - docs/client-api.md: cross-site federation note for all six RPCs. - Tests: forwarder, the two consumer configs, all six handlers (relay envelope + byte-identical wrapped InboxEvent), a model round-trip, and an end-to-end JetStream integration round-trip. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01WcNmcyHTmyokFh9vYm3brj
1 parent ecdc070 commit 52c8a74

12 files changed

Lines changed: 767 additions & 138 deletions

File tree

docs/client-api.md

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1107,6 +1107,8 @@ See [Error envelope](#6-error-envelope-reference). Returned synchronously when v
11071107

11081108
**`chat.user.{targetAccount}.event.subscription.update`** — emitted once for the user whose role changed, `action: "role_updated"`. Delivered to the target user only (not the requester, not other members). See the [subscription.update schema](#subscriptionupdate-event); the embedded `Subscription` reflects the updated `roles`. No `AsyncJobResult` and no room-key event fire for role updates.
11091109

1110+
**Cross-site federation:** when the target user's home site differs from the room's site, `room-service` emits a `RoomFederationEvent` on the ROOMS stream and `room-worker` forwards the cross-site `role_updated` event (at-least-once) to `chat.inbox.{userSite}.external.role_updated`, where `inbox-worker` applies the updated `roles` to the local `Subscription` (guarded by `rolesUpdatedAt`).
1111+
11101112
```json
11111113
{
11121114
"userId": "01970a4f8c2d7c9a01970a4f8c2d7c9a",
@@ -1234,7 +1236,7 @@ When the synchronous reply is an error envelope, the request was rejected before
12341236
> - `externalAccess` — whether the room is reachable from outside the company network (e.g. internet-side / off-VPN clients). This is a network-access gate, NOT a cross-site federation flag
12351237
> - `ownerAccount` — required on the unrestricted-to-restricted transition
12361238
>
1237-
> room-service does the Mongo writes, fans out an `InboxRoomRestricted` event per remote federated site (published to `chat.inbox.{remoteSiteID}.external.room_restricted`), and replies `{"status":"ok","requestId":"…"}` once the work is committed. No `AsyncJobResult` is emitted — the reply *is* the result.
1239+
> room-service does the Mongo writes, emits a single `RoomFederationEvent` on the ROOMS stream (one target per remote federated site), and replies `{"status":"ok","requestId":"…"}` once the work is committed. `room-worker` forwards the cross-site `room_restricted` event (at-least-once) to each remote site's `chat.inbox.{remoteSiteID}.external.room_restricted`. No `AsyncJobResult` is emitted — the reply *is* the result.
12381240
>
12391241
> Clients learn about the change via a **`RoomRestrictedRoomEvent`** (`type: "room_restricted"`) on the same `chat.room.{roomID}.event` stream they already subscribe to for chat messages. Like `RoomRenamedRoomEvent`, it's a flat struct with no zero-valued envelope fields:
12401242
>
@@ -1547,6 +1549,8 @@ See [Error envelope](#6-error-envelope-reference). Common errors:
15471549
}
15481550
```
15491551

1552+
**3. Cross-site federation** — when the reader's home site differs from the room's site, `room-service` emits a `RoomFederationEvent` on the ROOMS stream and `room-worker` forwards the cross-site `subscription_read` event (at-least-once) to `chat.inbox.{userSite}.external.subscription_read`, where `inbox-worker` applies `lastSeenAt`/`alert` to the local `Subscription` (guarded by `lastSeenAt`).
1553+
15501554
##### Triggered events — error path
15511555

15521556
`None — error returned only via the reply subject.`
@@ -1558,7 +1562,7 @@ See [Error envelope](#6-error-envelope-reference). Common errors:
15581562
**Subject:** `chat.user.{account}.request.room.{roomID}.{siteID}.message.thread.read`
15591563
**Reply subject:** auto-generated `_INBOX.>` (NATS request/reply)
15601564

1561-
A **synchronous RPC** that clears a single thread's unread state for the caller. `room-service` validates room membership and thread-subscription existence, removes the threadId from the user's `Subscription.ThreadUnread`, recomputes the per-subscription `alert` flag, refreshes the `ThreadSubscription` (`lastSeenAt`, `updatedAt`, `hasMention=false`), and — for cross-site users — publishes a `thread_read` event directly to the user's home-site INBOX so the destination `inbox-worker` can mirror both updates.
1565+
A **synchronous RPC** that clears a single thread's unread state for the caller. `room-service` validates room membership and thread-subscription existence, removes the threadId from the user's `Subscription.ThreadUnread`, recomputes the per-subscription `alert` flag, refreshes the `ThreadSubscription` (`lastSeenAt`, `updatedAt`, `hasMention=false`), and — for cross-site users — emits a `RoomFederationEvent` on the ROOMS stream; `room-worker` forwards the cross-site `thread_read` event to the user's home site (at-least-once) so the destination `inbox-worker` can mirror both updates.
15621566

15631567
##### Request body
15641568

@@ -1593,7 +1597,7 @@ See [Error envelope](#6-error-envelope-reference). Common errors:
15931597

15941598
- **Alert recomputation:** `alert = oldSub.alert && len(newThreadUnread) > 0`. A thread-read can only clear an alert, never set one. When the post-removal `threadUnread` is empty, `alert` becomes false. This computation runs atomically inside the MongoDB aggregation pipeline on the handler's site — not derived client-side.
15951599
- **Concurrent local writes:** the room-`Subscription` update and the `ThreadSubscription` update run in parallel inside an `errgroup`. Both must succeed before the handler proceeds.
1596-
- **Cross-site federation:** if the user's home site differs from the handler's site, a `thread_read` event is published directly to `chat.inbox.{userSite}.external.thread_read` with payload `{account, roomId, threadRoomId, parentMessageId, newThreadUnread, alert, lastSeenAt, timestamp}` (timestamps as `int64` UnixMilli). The destination `inbox-worker` applies the supplied `newThreadUnread`+`alert` to the local Subscription cache and applies `lastSeenAt`+`updatedAt`+`hasMention=false` to the local ThreadSubscription with an `$lt` order-safety guard so out-of-order delivery cannot regress the thread's read position.
1600+
- **Cross-site federation:** if the user's home site differs from the handler's site, the handler emits a `RoomFederationEvent` on the ROOMS stream and `room-worker` forwards the cross-site `thread_read` event (at-least-once) to `chat.inbox.{userSite}.external.thread_read` with payload `{account, roomId, threadRoomId, parentMessageId, newThreadUnread, alert, lastSeenAt, timestamp}` (timestamps as `int64` UnixMilli). The destination `inbox-worker` applies the supplied `newThreadUnread`+`alert` to the local Subscription cache and applies `lastSeenAt`+`updatedAt`+`hasMention=false` to the local ThreadSubscription with an `$lt` order-safety guard so out-of-order delivery cannot regress the thread's read position.
15971601
- **Defensive `roomId` filter:** the thread-subscription lookup additionally enforces that the supplied `threadId` belongs to the room named in the subject. Mismatches return `thread subscription not found` (rather than silently clearing an unrelated thread).
15981602
- **Thread-room read-floor recompute:** after both writes succeed, `room-service` recomputes `thread_rooms.minUserLastSeenAt` = `MIN(lastSeenAt)` across all `thread_subscriptions` for the thread room. The floor is set only when every subscriber has a usable `lastSeenAt`; otherwise it is cleared. The recompute is best-effort — a failure is logged but does not fail the RPC. The stored value is also available via [Get Thread Messages](#get-thread-messages).
15991603
- **Read-floor fan-out:** when (and only when) the recompute above changes `thread_rooms.minUserLastSeenAt`, the server publishes a `thread_message_read` event (routed by the **parent** room's type) carrying the new floor, so peers can advance thread read-receipt UI live. Best-effort (a publish failure does not fail the RPC); never fires when the floor is unchanged or the thread room is missing.
@@ -1680,6 +1684,7 @@ See [Error envelope](#6-error-envelope-reference). Common errors:
16801684
##### Behaviour notes
16811685

16821686
- **Notification delivery:** `notification-worker` respects `muted` flags when deciding whether to send mobile push notifications (see [Notification fan-out](#notification-fan-out-mobile-push-only) below).
1687+
- **Cross-site federation:** when the requester's home site differs from the room's site, `room-service` emits a `RoomFederationEvent` on the ROOMS stream and `room-worker` forwards the cross-site `subscription_mute_toggled` event (at-least-once) to `chat.inbox.{userSite}.external.subscription_mute_toggled`, where `inbox-worker` applies `muted` to the local `Subscription` (guarded by `muteUpdatedAt`).
16831688

16841689
---
16851690

@@ -1729,7 +1734,7 @@ See [Error envelope](#6-error-envelope-reference). Common errors:
17291734

17301735
##### Cross-site behaviour
17311736

1732-
When the requester's home site differs from the room's site, `room-service` additionally publishes a `subscription_favorite_toggled` InboxEvent directly to `chat.inbox.{userSite}.external.subscription_favorite_toggled`. `inbox-worker` on the user's home site mirrors the flip onto the local `Subscription` document. Missing-subscription on the home site (e.g., a federation race) is a silent no-op — no NACK, no redelivery loop.
1737+
When the requester's home site differs from the room's site, `room-service` emits a `RoomFederationEvent` on the ROOMS stream and `room-worker` forwards the cross-site `subscription_favorite_toggled` event (at-least-once) to `chat.inbox.{userSite}.external.subscription_favorite_toggled`. `inbox-worker` on the user's home site mirrors the flip onto the local `Subscription` document. Missing-subscription on the home site (e.g., a federation race) is a silent no-op — no NACK, no redelivery loop.
17331738

17341739
---
17351740

pkg/model/event.go

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,31 @@ type InboxEvent struct {
185185
Timestamp int64 `json:"timestamp" bson:"timestamp"`
186186
}
187187

188+
// FederationTarget is one cross-site delivery instruction carried inside a
189+
// RoomFederationEvent. Envelope is a pre-marshaled model.InboxEvent (built by
190+
// the origin so the cross-site wire format is byte-identical to a direct
191+
// publish); EventType is its InboxEvent.Type, used only to build the
192+
// destination subject. DedupID is the Nats-Msg-Id room-worker forwards with so
193+
// JetStream dedups redeliveries at the destination INBOX.
194+
type FederationTarget struct {
195+
DestSiteID string `json:"destSiteId" bson:"destSiteId"`
196+
EventType InboxEventType `json:"eventType" bson:"eventType"`
197+
Envelope []byte `json:"envelope" bson:"envelope"`
198+
DedupID string `json:"dedupId" bson:"dedupId"`
199+
}
200+
201+
// RoomFederationEvent is the room-scoped "federation relay" event published by
202+
// room-service onto the ROOMS stream (chat.room.canonical.{siteID}.federation).
203+
// room-worker consumes it and forwards each Target to its destination site's
204+
// INBOX with at-least-once retry, so a request/reply handler no longer has to
205+
// publish cross-gateway inline. Targets is non-empty by construction (the
206+
// publisher skips the publish when there is nothing to federate).
207+
type RoomFederationEvent struct {
208+
RoomID string `json:"roomId" bson:"roomId"`
209+
Targets []FederationTarget `json:"targets" bson:"targets"`
210+
Timestamp int64 `json:"timestamp" bson:"timestamp"`
211+
}
212+
188213
type MemberAddEvent struct {
189214
Type string `json:"type" bson:"type"`
190215
RoomID string `json:"roomId" bson:"roomId"`

pkg/model/model_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4034,3 +4034,32 @@ func TestMigrationRequests_RoundTrip(t *testing.T) {
40344034
roundTrip(t, &model.MigrationDeleteRequest{MessageID: "m1", DeletedAt: ts}, &model.MigrationDeleteRequest{})
40354035
roundTrip(t, &model.MigrationAck{OK: true}, &model.MigrationAck{})
40364036
}
4037+
4038+
func TestRoomFederationEvent_RoundTrip(t *testing.T) {
4039+
inner := model.InboxEvent{
4040+
Type: model.InboxSubscriptionMuteToggled,
4041+
SiteID: "site-a",
4042+
DestSiteID: "site-b",
4043+
Payload: []byte(`{"account":"alice","roomId":"r1","muted":true,"timestamp":1}`),
4044+
Timestamp: 1,
4045+
}
4046+
innerData, err := json.Marshal(inner)
4047+
require.NoError(t, err)
4048+
4049+
evt := model.RoomFederationEvent{
4050+
RoomID: "r1",
4051+
Targets: []model.FederationTarget{{
4052+
DestSiteID: "site-b",
4053+
EventType: model.InboxSubscriptionMuteToggled,
4054+
Envelope: innerData,
4055+
DedupID: "req-1:site-b",
4056+
}},
4057+
Timestamp: 2,
4058+
}
4059+
data, err := json.Marshal(evt)
4060+
require.NoError(t, err)
4061+
4062+
var got model.RoomFederationEvent
4063+
require.NoError(t, json.Unmarshal(data, &got))
4064+
require.Equal(t, evt, got)
4065+
}

pkg/subject/subject.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,14 @@ func RoomCanonicalMemberEvent(siteID, eventType string) string {
134134
return fmt.Sprintf("chat.room.canonical.%s.event.member.%s", siteID, eventType)
135135
}
136136

137+
// RoomCanonicalFederation returns the ROOMS-stream subject for the
138+
// federation-relay envelope (RoomFederationEvent). It lives under
139+
// chat.room.canonical.{siteID}.> so room-worker's wildcard consumer receives
140+
// it, while notification-worker's exact mute filter does not.
141+
func RoomCanonicalFederation(siteID string) string {
142+
return fmt.Sprintf("chat.room.canonical.%s.federation", siteID)
143+
}
144+
137145
func SubscriptionUpdate(account string) string {
138146
return fmt.Sprintf("chat.user.%s.event.subscription.update", account)
139147
}

pkg/subject/subject_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ func TestWildcardPatterns(t *testing.T) {
261261
"chat.user.*.request.room.*.site-a.member.role-update"},
262262
{"RoomCanonicalWild", subject.RoomCanonicalWildcard("site-a"),
263263
"chat.room.canonical.site-a.>"},
264+
{"RoomCanonicalFederation", subject.RoomCanonicalFederation("site-a"),
265+
"chat.room.canonical.site-a.federation"},
264266
{"MsgHistoryWild", subject.MsgHistoryWildcard("site-a"),
265267
"chat.user.*.request.room.*.site-a.msg.history"},
266268
{"MsgCanonicalWild", subject.MsgCanonicalWildcard("site-a"),

0 commit comments

Comments
 (0)