Skip to content

Implement message.read RPC for marking rooms as read#156

Open
Allan-Code-hub wants to merge 22 commits intomainfrom
claude/add-message-read-rpc-3u7K9
Open

Implement message.read RPC for marking rooms as read#156
Allan-Code-hub wants to merge 22 commits intomainfrom
claude/add-message-read-rpc-3u7K9

Conversation

@Allan-Code-hub
Copy link
Copy Markdown
Collaborator

@Allan-Code-hub Allan-Code-hub commented May 6, 2026

Summary

This PR implements the message.read RPC that allows clients to mark a room as read for a user. The handler validates room membership, recomputes the per-subscription alert flag, persists the new lastSeenAt timestamp, optionally recomputes Room.MinUserLastSeenAt, and federates read state to the user's home site via outbox/inbox for cross-site users.

Key Changes

Subject & Model Types

  • Added MessageRead() and MessageReadWildcard() subject builders in pkg/subject/ for the new RPC subject pattern
  • Added MessageReadRequest model type for the RPC request body
  • Added OutboxSubscriptionRead event type constant and SubscriptionReadEvent struct for cross-site federation

Room-Service Store Layer

  • Extended RoomStore interface with 4 new methods:
    • UpdateSubscriptionRead(): atomically update lastSeenAt and alert by (roomID, account)
    • GetUserSiteID(): look up a user's home site by account (returns empty string on miss, no error)
    • MinSubscriptionLastSeenByRoomID(): aggregate minimum effective lastSeenAt across room subscriptions, with joinedAt fallback for never-read subscriptions
    • UpdateRoomMinUserLastSeenAt(): set or clear Room.MinUserLastSeenAt via $set/$unset
  • Implemented all 4 methods in MongoStore with appropriate MongoDB aggregation and update operations
  • Added comprehensive integration tests for each store method

Room-Service Handler

  • Implemented handleMessageRead() with full flow:
    1. Parse subject to extract account and roomID
    2. Validate subscription exists (user is room member)
    3. Compute new alert state based on thread unread status
    4. Persist subscription update with new lastSeenAt and alert flag
    5. Publish cross-site outbox event if user's home site differs from room's home site
    6. Conditionally recompute Room.MinUserLastSeenAt when read receipt advances past previous LastMsgAt
  • Added natsMessageRead() NATS wrapper following established error handling patterns
  • Registered handler in RegisterCRUD() with queue subscription to MessageReadWildcard()
  • Added 14 table-driven unit tests covering happy paths, alert state transitions, cross-site scenarios, and error cases

Inbox-Worker (Cross-Site Sync)

  • Extended InboxStore interface with UpdateSubscriptionRead() method
  • Implemented UpdateSubscriptionRead() in mongoInboxStore with out-of-order-safe $lt guard to prevent older read receipts from overwriting newer ones
  • Added handleSubscriptionRead() handler for processing subscription_read outbox events
  • Extended handler dispatch to route OutboxSubscriptionRead events
  • Added stub implementation in test helper and comprehensive integration tests covering happy path, out-of-order safety, idempotent replay, and missing subscription no-ops

Documentation

  • Added comprehensive design spec in docs/superpowers/specs/2026-05-04-message-read-rpc-design.md
  • Added detailed implementation plan in docs/superpowers/plans/2026-05-04-message-read-rpc.md with task-by-task breakdown
  • Updated docs/client-api.md with RPC documentation including subject, request/response format, and error cases

Implementation Details

  • Synchronous handler: All writes performed inline before responding; no downstream worker chain
  • Out-of-order safety: Inbox-worker uses MongoDB $lt guard to skip stale read receipts
  • Dual subscription copies: Each room member has subscription docs at both room's home site and user's home site; handler updates room-side authoritatively and outboxes sync to user-side
  • Alert state logic: Alert remains true only if there are unread threads; clears when all threads are read
  • Room floor optimization: MinUserLastSeenAt recompute only runs when read receipt advances past previous `LastMsgAt

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq

Summary by CodeRabbit

Release Notes

  • New Features
    • Added a Mark Messages Read RPC that enables users to mark messages as read in a room.
    • Subscription read state now updates with last-seen timestamps and alert flags upon marking messages as read.
    • Implemented cross-site read receipt synchronization to propagate read status across distributed systems.
    • Room-level minimum user last-seen timestamp tracking now recalculates automatically.

claude and others added 20 commits May 4, 2026 07:12
Spec for a new NATS request/reply endpoint that marks a room as read
for a user, recomputes the per-subscription Alert flag, persists
LastSeenAt, optionally recomputes Room.MinUserLastSeenAt, and
federates the read state to the user's home site via outbox/inbox.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Seven-task plan covering subject builders, model types, room-service
store + handler, inbox-worker store + handler, and final verification.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
For the new chat.user.{account}.request.room.{roomID}.{siteID}.message.read
RPC handled by room-service.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
- MessageReadRequest: NATS request body for the message.read RPC
- OutboxSubscriptionRead: outbox event type constant 'subscription_read'
- SubscriptionReadEvent: payload of the cross-site sync event, with
  int64 UnixMilli timestamps for wire safety

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
- UpdateSubscriptionRead: write lastSeenAt + alert by (roomID, account)
- GetUserSiteID: look up users.siteId by account ('' on miss, no error)
- MinSubscriptionLastSeenByRoomID: aggregate min lastSeenAt with
  joinedAt fallback for never-read subs
- UpdateRoomMinUserLastSeenAt: set/unset rooms.minUserLastSeenAt

Existing indexes already cover all new queries.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Synchronous handler that:
1. Validates room membership via GetSubscription
2. Recomputes Alert as Sub.Alert && len(ThreadUnread) > 0
3. Falls back to JoinedAt when LastSeenAt is zero
4. Persists the new lastSeenAt + alert
5. Publishes a subscription_read outbox event when the user is
   on a different site (always, before the room recompute, so
   the user's home site receives every read receipt)
6. Skips the room recompute when LastMsgAt is nil or the user
   was already up-to-date before this read
7. Otherwise recomputes Room.MinUserLastSeenAt across all
   subscriptions for the room

Returns {"status":"ok"} on success.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Order-safe write: only applies when the stored lastSeenAt is missing
or strictly earlier than the supplied value, so out-of-order federated
delivery cannot regress the user's read state.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Add the case OutboxSubscriptionRead arm to HandleEvent and the
handleSubscriptionRead method that converts the int64 UnixMilli
LastSeenAt back to time.Time and applies the order-safe store write.

Integration tests cover: happy path, out-of-order skip, idempotent
replay, and missing-subscription no-op.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
…ad-rpc-3u7K9

# Conflicts:
#	inbox-worker/handler.go
#	inbox-worker/handler_test.go
#	inbox-worker/integration_test.go
#	inbox-worker/main.go
#	pkg/model/event.go
GetUserSiteID and GetRoom are independent of each other and of the
preceding UpdateSubscriptionRead — wrap them in an errgroup so the
hot-path message.read RPC saves one Mongo round-trip per call.

Two error-path tests gain GetRoom EXPECT().AnyTimes() because GetRoom
may now race-fire concurrently with the GetUserSiteID error /
publish-failure paths.

Also trim the narration half of handleSubscriptionRead's doc comment;
keep the WHY (idempotency / order-safety guard).

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
…tion

Covers ten scenarios against a running local stack (make deps-up + make up):
  1. Happy local — alert clears (no thread unread)
  2. Alert stays true when threadUnread is non-empty
  3. Never-read sub — falls back to JoinedAt; recompute skipped
  4. Room never messaged (LastMsgAt nil) — recompute skipped
  5. Already up-to-date — recompute skipped
  6. Not a member — error response
  7. Room ID mismatch — error response
  8. Empty body — trusts subject roomId, succeeds
  9. Cross-site user — outbox subscription_read event published
 10. Invalid subject — handler rejects

Uses ephemeral natsio/nats-box containers + docker exec mongosh against
the chat-local docker network so no host-side dependencies are needed
beyond docker. Cleans up its own test data on exit.

To be removed after manual verification.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
The room-service container runs with SITE_ID=site-local (per its
deploy/docker-compose.yml), but the script was publishing to
chat.user.*.request.room.*.default.message.read — no responder, so
every scenario except #10 (which expects no responder) failed.

Fixes:
- Default SITE_ID to "site-local" (env-overridable: SITE_ID=foo ./...)
- Add a startup probe that exits with a clear error when no responder
  is reachable, naming the SITE_ID and how to override it
- Replace `... || null` in mongo_get_field with `... ?? null` so
  falsy fields like `alert: false` aren't mis-read as `null`
- Surface NATS CLI stderr in "RPC failed: ..." error messages
- Update stale comment that referenced the wrong default site

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Case 7: handleMessageRead returned 'room ID mismatch', which is not in
sanitizeError's allow-list and got stripped to 'internal error'. The
sibling handleUpdateRole/handleAddMembers handlers prefix theirs with
'invalid request:', which IS allow-listed. Match that convention so
the message reaches the client (and the test).

Case 9: NATS-core sub on a JetStream-published subject was timing-
fragile (listener race against publish). Switch to querying the
OUTBOX_<siteID> stream directly via 'nats stream view --since 30s'
after the RPC. JetStream stores the message durably, so the read is
race-free. Verify both the event type ('subscription_read') and the
exact outbox subject (outbox.<src>.to.<dest>.subscription_read).

The unit test 'TestHandler_MessageRead_RoomIDMismatch' continues to
pass because it asserts 'room ID mismatch' as a substring.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Replace the OUTBOX-only verification with a real cross-site loop:

- Create a fresh INBOX_site-b stream on the local NATS that Sources
  from OUTBOX_site-local with a SubjectTransform rewriting
  outbox.<src>.to.site-b.<event> → chat.inbox.site-b.aggregate.<event>
- Spawn a second inbox-worker container with SITE_ID=site-b and
  MONGO_DB=chat_b, BOOTSTRAP_STREAMS=false (so it doesn't overwrite
  the Sources we just configured)
- Pre-seed chat_b.subscriptions with a parallel cache row for the
  test account
- Trigger the cross-site message.read RPC on site-local
- Verify three things in sequence:
    1. OUTBOX_site-local received the publish
    2. INBOX_site-b sourced and transformed the event
    3. Site-b's inbox-worker applied the update to chat_b.subscriptions
       (poll up to 10s; assert lastSeenAt advanced and alert converged)

Federation lives entirely on the existing chat-local NATS — Sources
within one JetStream domain don't need leaf nodes — and reuses the
existing chat-local-mongodb container with a separate database, so
the harness adds one container + one stream + one Mongo DB and tears
them all down on EXIT.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
The previous implementation invoked 'nats stream add --config /cfg.json'
inside the natsio/nats-box container. That CLI flag has inconsistent
semantics across natscli versions (some require YAML, some silently drop
unknown JSON keys, some prompt interactively for missing required fields)
so the stream creation could fail silently. The script then started
inbox-worker with BOOTSTRAP_STREAMS=false, which calls js.Stream() to
verify the stream exists, gets a 404, and exits — which the user saw as:

  bootstrap streams failed error: verify INBOX stream nats: API error
  code=404 err_code=10059 description=stream not found

Switch to a direct request to $JS.API.STREAM.CREATE.<stream> with the
JSON config as the payload. The wire format is stable and the reply is
either a stream_create_response or an error envelope we grep for
deterministically. Bail loudly with the full reply on any failure.

Also: setup_site_b_federation now checks each step's return value and
short-circuits on the first failure, so we never reach the inbox-worker
spawn step with a missing stream.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
JetStream rejects a stream Source that has both filter_subject and
subject_transforms ("source with multiple subject transforms cannot
also have a single subject filter") — when subject_transforms is
present each transform's src acts as its own filter, so the top-level
filter_subject is redundant and forbidden by validation.

Removing filter_subject; the lone subject_transforms[0].src
("outbox.site-local.to.site-b.>") serves as the source filter.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Root cause of 'reply: error: internal server error' and the missing
event in the OUTBOX dump: no service in this codebase bootstraps
OUTBOX_<siteID>. In production it's owned by ops/IaC; in a fresh dev
stack the stream simply doesn't exist, so room-service's
js.PublishMsg fails with 'no stream matched subject' and sanitizeError
maps that to a generic 'internal error' reply.

Add ensure_outbox_stream() to the harness — it's idempotent (treats
err_code 10058 / 'stream name already in use' as success) and only
sets the bare schema (Name + Subjects). Run it before
create_inbox_site_b_stream so the Source on INBOX_site-b has a real
stream to read from.

Not torn down on EXIT because other services publish to it; the
script only owns INBOX_site-b and chat_b.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
'nats stream view <stream> --since 30s' is unreliable for scripted
dumps — depending on natscli version it's an interactive viewer that
paginates rather than printing all messages, and the time filter can
miss freshly-written entries when the stream's commit time and the
client clock disagree by even a few hundred ms.

Switch to 'nats stream get <stream> --last-by-subj <subject>' which
deterministically retrieves the most-recent message at an exact
subject. We grep the response for the per-test account name so a
stale message from a prior run on the same subject can't false-pass.

Also add a 5s polling loop for the INBOX_site-b check — JetStream
Sources poll on a default ~1s interval, so the federated copy can
take a moment to arrive.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
The natsio/nats-box:latest image ships an older natscli that doesn't
recognize 'stream get --last-by-subj' — the user's run surfaced
"error: unknown long flag '----last-by-subj'". Different natscli
versions used different spellings (--last-for, --last-by, etc.) so
the CLI is not a stable target for scripting.

Switch to the JetStream API directly: $JS.API.STREAM.MSG.GET.<stream>
with a JSON {"last_by_subj":"<subject>"} payload. The wire format is
stable across NATS server / CLI versions and returns either
{"message": {...subject, seq, data...}} on hit or {"error": {...}} on
miss — both deterministic to grep.

Also drop the redundant account-name grep that was added in the prior
iteration. The OutboxEvent envelope is JSON, but its inner Payload
field is `[]byte` in Go which json-encodes as base64, so the account
name appears base64'd inside, never as plaintext. The exact-subject
targeting via last_by_subj already disambiguates which message we got.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 6, 2026

Warning

Rate limit exceeded

@Allan-Code-hub has exceeded the limit for the number of commits that can be reviewed per hour. Please wait 20 minutes and 23 seconds before requesting another review.

To continue reviewing without waiting, purchase usage credits in the billing tab.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 8d2759bd-3acf-4ef2-9e8b-5e4762af1116

📥 Commits

Reviewing files that changed from the base of the PR and between 5b9671c and 2028b28.

📒 Files selected for processing (2)
  • inbox-worker/handler.go
  • pkg/model/event.go
📝 Walkthrough

Walkthrough

This PR implements a new "message.read" RPC across the codebase. It adds model types (MessageReadRequest, SubscriptionReadEvent, OutboxSubscriptionRead), subject builders for wire protocol, store interfaces and Mongo implementations for tracking read state in room-service and inbox-worker, a room-service handler that validates requests and publishes cross-site read receipt events, inbox-worker event processing for federation, and comprehensive documentation, tests, and mocks throughout.

Changes

Message Read RPC Implementation

Layer / File(s) Summary
Model Types & Constants
pkg/model/subscription.go, pkg/model/event.go, pkg/model/model_test.go
MessageReadRequest struct, SubscriptionReadEvent payload, OutboxSubscriptionRead constant added with corresponding JSON round-trip tests.
Wire Protocol
pkg/subject/subject.go, pkg/subject/subject_test.go
MessageRead and MessageReadWildcard subject builders added to form NATS subject paths for message-read RPCs.
Room-Service Store Interface
room-service/store.go, room-service/store_mongo.go, room-service/mock_store_test.go
RoomStore interface extended with UpdateSubscriptionRead, GetUserSiteID, MinSubscriptionLastSeenByRoomID, UpdateRoomMinUserLastSeenAt; Mongo implementations provided; mocks regenerated.
Room-Service Integration Tests
room-service/integration_test.go
Integration tests cover all new store methods: subscription read updates, user site lookup, min-last-seen computation with fallback to joinedAt, and room min-user-last-seen mutation.
Room-Service Handler
room-service/handler.go
Handler.natsMessageRead and Handler.handleMessageRead implemented to parse subjects, validate requests, update subscription read state, conditionally publish cross-site outbox events, and recompute room-level min-last-seen timestamps.
Room-Service Handler Tests
room-service/handler_test.go
Comprehensive unit tests covering invalid subjects, room-ID mismatches, non-members, alert state transitions, cross-site publishing, error paths, and edge cases like nil room last-msg and zero last-seen timestamps.
Inbox-Worker Store & Handler
inbox-worker/handler.go, inbox-worker/main.go, inbox-worker/handler_test.go
InboxStore interface extended with UpdateSubscriptionRead; Mongo implementation added with idempotent, order-safe semantics; stub store extended for test verification; handler.HandleEvent switch extended to dispatch OutboxSubscriptionRead events to handleSubscriptionRead.
Inbox-Worker Tests
inbox-worker/integration_test.go, inbox-worker/handler_test.go
Unit tests for subscription-read event handling (happy path, malformed payload); integration tests validate idempotence, out-of-order skipping, equal-timestamp skipping, and missing-subscription no-ops.
Documentation
docs/client-api.md, docs/superpowers/plans/2026-05-04-message-read-rpc.md, docs/superpowers/specs/2026-05-04-message-read-rpc-design.md
Client API documentation, implementation plan with task breakdown, and detailed design spec covering wire contract, handler flow, model/store changes, error handling, testing strategy, and rollout notes.

Sequence Diagram

sequenceDiagram
    actor User
    participant RoomService
    participant Mongo as Mongo<br/>(Room-Service)
    participant OutboxPublisher
    participant InboxWorker
    participant Mongo2 as Mongo<br/>(Inbox-Worker)

    User->>RoomService: message.read RPC<br/>(subject: chat.user.{account}.request.room.{roomID}.{siteID}.message.read)
    
    RoomService->>RoomService: Parse subject<br/>Validate request RoomID match<br/>Check membership
    
    RoomService->>Mongo: Fetch subscription
    Mongo-->>RoomService: Subscription (alert, lastSeenAt)
    
    RoomService->>RoomService: Compute newAlert<br/>Update lastSeenAt = now
    
    RoomService->>Mongo: UpdateSubscriptionRead<br/>(roomID, account, now, alert)
    Mongo-->>RoomService: ✓
    
    par Cross-Site Publishing
        RoomService->>Mongo: GetUserSiteID(account)
        Mongo-->>RoomService: userSiteID
        
        alt userSiteID != siteID
            RoomService->>OutboxPublisher: Publish SubscriptionReadEvent<br/>(account, roomID, lastSeenAt, alert)
        end
    end
    
    RoomService->>Mongo: MinSubscriptionLastSeenByRoomID(roomID)
    Mongo-->>RoomService: minTime or fallback to joinedAt
    
    RoomService->>Mongo: UpdateRoomMinUserLastSeenAt(roomID, minTime)
    Mongo-->>RoomService: ✓
    
    RoomService->>User: {status: "accepted"}
    
    Note over InboxWorker,Mongo2: Cross-site propagation
    InboxWorker->>InboxWorker: Poll for OutboxSubscriptionRead<br/>events
    
    InboxWorker->>InboxWorker: HandleEvent:<br/>Unmarshal SubscriptionReadEvent
    
    InboxWorker->>Mongo2: UpdateSubscriptionRead<br/>(roomID, account, lastSeenAt, alert)<br/>(idempotent, order-safe)
    Mongo2-->>InboxWorker: ✓
Loading

Estimated Code Review Effort

🎯 4 (Complex) | ⏱️ ~60 minutes

The PR introduces a multi-layer feature spanning model definitions, wire protocol, store interfaces/implementations, handler logic with cross-site federation, and event propagation. Review complexity stems from understanding the request/response flow, idempotent write semantics, cross-site outbox publishing, room-level min-last-seen recalculation, and the coordination between room-service and inbox-worker. The breadth of affected files and the interaction between components require careful verification of correctness and consistency, though the logical structure and comprehensive test coverage support focused review.

Possibly Related PRs

  • hmchangw/chat#143: Both PRs directly manipulate Room.MinUserLastSeenAt and subscription alert/read state for tracking message visibility and read receipts.
  • hmchangw/chat#153: Directly introduces the "message.read" RPC client-facing API documentation and interface that this implementation fulfills.

Suggested Reviewers

  • yenta
  • mliu33

Poem

🐰 A message read, at last we've penned,
Cross sites and subs, from end to end,
With outbox events, alerts now clear,
The read receipts flow without fear!
Idempotent, ordered, strong and true,
This RPC makes subscriptions new. ✨

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Implement message.read RPC for marking rooms as read' accurately and clearly summarizes the main change: adding a new RPC that allows marking rooms as read.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch claude/add-message-read-rpc-3u7K9

Tip

💬 Introducing Slack Agent: The best way for teams to turn conversations into code.

Slack Agent is built on CodeRabbit's deep understanding of your code, so your team can collaborate across the entire SDLC without losing context.

  • Generate code and open pull requests
  • Plan features and break down work
  • Investigate incidents and troubleshoot customer tickets together
  • Automate recurring tasks and respond to alerts with triggers
  • Summarize progress and report instantly

Built for teams:

  • Shared memory across your entire org—no repeating context
  • Per-thread sandboxes to safely plan and execute work
  • Governance built-in—scoped access, auditability, and budget controls

One agent for your entire SDLC. Right inside Slack.

👉 Get started


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (1)
room-service/handler.go (1)

744-816: ⚡ Quick win

Reorder room-floor recompute before the outbox publish to keep retries idempotent.

The current sequence persists the subscription update, then attempts the outbox publish, and only recomputes Room.MinUserLastSeenAt if the publish succeeds. If publishToStream fails at line 801, the handler returns an error but the local subscription has already advanced past room.LastMsgAt. On the client retry, originalLastSeen is read from the persisted subscription (already after LastMsgAt), so the guard at line 806 short-circuits and the floor recompute is permanently skipped until a later message bumps LastMsgAt and another read happens to hit the window.

Moving the room-floor recompute ahead of the outbox publish keeps the local state self-consistent regardless of how many times the federation step is retried.

♻️ Proposed reorder
 	if err := h.store.UpdateSubscriptionRead(ctx, roomID, account, now, newAlert); err != nil {
 		return nil, fmt.Errorf("update subscription read: %w", err)
 	}

 	var (
 		userSiteID string
 		room       *model.Room
 	)
 	g, gctx := errgroup.WithContext(ctx)
 	g.Go(func() error {
 		s, err := h.store.GetUserSiteID(gctx, account)
 		if err != nil {
 			return fmt.Errorf("get user siteId: %w", err)
 		}
 		userSiteID = s
 		return nil
 	})
 	g.Go(func() error {
 		r, err := h.store.GetRoom(gctx, roomID)
 		if err != nil {
 			return fmt.Errorf("get room: %w", err)
 		}
 		room = r
 		return nil
 	})
 	if err := g.Wait(); err != nil {
 		return nil, err
 	}

+	if room.LastMsgAt != nil && !originalLastSeen.After(*room.LastMsgAt) {
+		minTime, err := h.store.MinSubscriptionLastSeenByRoomID(ctx, roomID)
+		if err != nil {
+			return nil, fmt.Errorf("min subscription lastSeenAt: %w", err)
+		}
+		if err := h.store.UpdateRoomMinUserLastSeenAt(ctx, roomID, minTime); err != nil {
+			return nil, fmt.Errorf("update room minUserLastSeenAt: %w", err)
+		}
+	}
+
 	switch {
 	case userSiteID == "":
 		slog.Warn("user not found locally; skipping cross-site outbox", "account", account)
 	case userSiteID != h.siteID:
 		// ... existing outbox publish ...
 	}

-	if room.LastMsgAt == nil || originalLastSeen.After(*room.LastMsgAt) {
-		return json.Marshal(map[string]string{"status": "accepted"})
-	}
-
-	minTime, err := h.store.MinSubscriptionLastSeenByRoomID(ctx, roomID)
-	if err != nil {
-		return nil, fmt.Errorf("min subscription lastSeenAt: %w", err)
-	}
-	if err := h.store.UpdateRoomMinUserLastSeenAt(ctx, roomID, minTime); err != nil {
-		return nil, fmt.Errorf("update room minUserLastSeenAt: %w", err)
-	}
-
 	return json.Marshal(map[string]string{"status": "accepted"})
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@room-service/handler.go` around lines 744 - 816, The handler currently calls
publishToStream after persisting the subscription update but before recomputing
the room floor, which can make the recompute (MinSubscriptionLastSeenByRoomID +
UpdateRoomMinUserLastSeenAt) skipped on retries; move the room-floor recompute
logic (call to h.store.MinSubscriptionLastSeenByRoomID and
h.store.UpdateRoomMinUserLastSeenAt) to occur immediately after
UpdateSubscriptionRead and before any cross-site outbox creation/publish (the
code blocks referencing MinSubscriptionLastSeenByRoomID,
UpdateRoomMinUserLastSeenAt, publishToStream, and the SubscriptionRead outbox
payload should be reordered so recompute runs deterministically regardless of
publish success), keeping the same error handling and return values.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@docs/superpowers/plans/2026-05-04-message-read-rpc.md`:
- Around line 107-113: The commit message template in the plan that contains the
line starting with "https://claude.ai/" (e.g., the template used with the commit
subject "Add MessageRead and MessageReadWildcard subject builders") must have
the Claude session URL removed before committing; search the commit message
templates in this document for any "https://claude.ai/" (or other AI session)
links and delete those lines so they are not included in the git history, then
run git commit with the cleaned message.
- Around line 964-975: The response body in the handler
(json.Marshal(map[string]string{"status":"accepted"}) returned after calling
h.store.MinSubscriptionLastSeenByRoomID and h.store.UpdateRoomMinUserLastSeenAt)
conflicts with the PR summary which says {"status":"ok"}; reconcile by making
the public API consistent: either change these json.Marshal responses to return
{"status":"ok"} and update unit tests that assert "accepted" (and any callers),
or update the PR description and client docs to document {"status":"accepted"}
so they match the existing handler and tests—pick one approach and apply it
consistently across h.store.MinSubscriptionLastSeenByRoomID,
UpdateRoomMinUserLastSeenAt, the handler response, unit tests, and client
docs/PR summary.

In `@pkg/model/event.go`:
- Around line 93-99: SubscriptionReadEvent.Timestamp is missing the bson struct
tag; update the SubscriptionReadEvent definition to add bson:"timestamp" on the
Timestamp field so it matches other NATS event structs (e.g., ensure Timestamp
is defined as `Timestamp int64 \`json:"timestamp" bson:"timestamp"\``) in the
SubscriptionReadEvent type.

In `@pkg/subject/subject.go`:
- Around line 325-327: Add a shared guard in pkg/subject (e.g.,
guardAccountToken(account string)) that checks the account token for invalid
wildcard characters ('*' and '>') and panics with a clear message when found,
and then call this guard at the start of MessageRead (and other subject builder
functions) so MessageRead validates the account token before composing the
subject string.

In `@room-service/handler_test.go`:
- Line 1974: Replace all instances where json.Marshal calls are invoked with
ignored errors in the message.read tests (e.g., body, _ :=
json.Marshal(model.MessageReadRequest{RoomID: "r1"})) by capturing the error and
asserting it with require.NoError(t, err) (or require.NoErrorf where context is
helpful); specifically, change each "body, _ := json.Marshal(...)" to "body, err
:= json.Marshal(...)" followed immediately by "require.NoError(t, err)" so
failures are explicit—apply this to the occurrences using
model.MessageReadRequest and any other json.Marshal calls in handler_test.go for
the message.read cases.

---

Nitpick comments:
In `@room-service/handler.go`:
- Around line 744-816: The handler currently calls publishToStream after
persisting the subscription update but before recomputing the room floor, which
can make the recompute (MinSubscriptionLastSeenByRoomID +
UpdateRoomMinUserLastSeenAt) skipped on retries; move the room-floor recompute
logic (call to h.store.MinSubscriptionLastSeenByRoomID and
h.store.UpdateRoomMinUserLastSeenAt) to occur immediately after
UpdateSubscriptionRead and before any cross-site outbox creation/publish (the
code blocks referencing MinSubscriptionLastSeenByRoomID,
UpdateRoomMinUserLastSeenAt, publishToStream, and the SubscriptionRead outbox
payload should be reordered so recompute runs deterministically regardless of
publish success), keeping the same error handling and return values.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 2931a2aa-ffbf-4749-a6f0-33a52f77de70

📥 Commits

Reviewing files that changed from the base of the PR and between aae3d64 and 5b9671c.

📒 Files selected for processing (18)
  • docs/client-api.md
  • docs/superpowers/plans/2026-05-04-message-read-rpc.md
  • docs/superpowers/specs/2026-05-04-message-read-rpc-design.md
  • inbox-worker/handler.go
  • inbox-worker/handler_test.go
  • inbox-worker/integration_test.go
  • inbox-worker/main.go
  • pkg/model/event.go
  • pkg/model/model_test.go
  • pkg/model/subscription.go
  • pkg/subject/subject.go
  • pkg/subject/subject_test.go
  • room-service/handler.go
  • room-service/handler_test.go
  • room-service/integration_test.go
  • room-service/mock_store_test.go
  • room-service/store.go
  • room-service/store_mongo.go

Comment on lines +107 to +113
git commit -m "Add MessageRead and MessageReadWildcard subject builders

For the new chat.user.{account}.request.room.{roomID}.{siteID}.message.read
RPC handled by room-service.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq"
```
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Remove AI session URLs from commit message templates before committing.

Every commit template in this plan (e.g., line 112: https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq) includes a Claude session URL. If used verbatim, these would appear in the permanent git history. Strip the URL lines from all commit messages before running git commit.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/plans/2026-05-04-message-read-rpc.md` around lines 107 -
113, The commit message template in the plan that contains the line starting
with "https://claude.ai/" (e.g., the template used with the commit subject "Add
MessageRead and MessageReadWildcard subject builders") must have the Claude
session URL removed before committing; search the commit message templates in
this document for any "https://claude.ai/" (or other AI session) links and
delete those lines so they are not included in the git history, then run git
commit with the cleaned message.

Comment on lines +964 to +975
return json.Marshal(map[string]string{"status": "accepted"})
}

minTime, err := h.store.MinSubscriptionLastSeenByRoomID(ctx, roomID)
if err != nil {
return nil, fmt.Errorf("min subscription lastSeenAt: %w", err)
}
if err := h.store.UpdateRoomMinUserLastSeenAt(ctx, roomID, minTime); err != nil {
return nil, fmt.Errorf("update room minUserLastSeenAt: %w", err)
}

return json.Marshal(map[string]string{"status": "accepted"})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Response status "accepted" conflicts with the PR description's "ok".

The plan and all 14 unit tests consistently assert {"status":"accepted"} (lines 610, 964, 975), but the PR objective summary describes the reply as {"status":"ok"}. If the client API documentation also documents "ok", consumers of this RPC will receive a response they don't recognize. Please verify the client docs match "accepted" and correct the PR description.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@docs/superpowers/plans/2026-05-04-message-read-rpc.md` around lines 964 -
975, The response body in the handler
(json.Marshal(map[string]string{"status":"accepted"}) returned after calling
h.store.MinSubscriptionLastSeenByRoomID and h.store.UpdateRoomMinUserLastSeenAt)
conflicts with the PR summary which says {"status":"ok"}; reconcile by making
the public API consistent: either change these json.Marshal responses to return
{"status":"ok"} and update unit tests that assert "accepted" (and any callers),
or update the PR description and client docs to document {"status":"accepted"}
so they match the existing handler and tests—pick one approach and apply it
consistently across h.store.MinSubscriptionLastSeenByRoomID,
UpdateRoomMinUserLastSeenAt, the handler response, unit tests, and client
docs/PR summary.

Comment thread pkg/model/event.go
Comment thread pkg/subject/subject.go
Comment on lines +325 to +327
func MessageRead(account, roomID, siteID string) string {
return fmt.Sprintf("chat.user.%s.request.room.%s.%s.message.read", account, roomID, siteID)
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Validate account token before composing message.read subjects.

MessageRead currently interpolates account directly. If account ever contains * or >, the built subject can change routing semantics unexpectedly. Please route this through the shared account-token validator used in pkg/subject builders so invalid tokens are rejected consistently.

Based on learnings: “For Go subject builder/parser code under pkg/subject, add a single shared guard/validator for any function that accepts an account token… reject * and >.”

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@pkg/subject/subject.go` around lines 325 - 327, Add a shared guard in
pkg/subject (e.g., guardAccountToken(account string)) that checks the account
token for invalid wildcard characters ('*' and '>') and panics with a clear
message when found, and then call this guard at the start of MessageRead (and
other subject builder functions) so MessageRead validates the account token
before composing the subject string.


func TestHandler_MessageRead_InvalidSubject(t *testing.T) {
f := newMessageReadFixture(t)
body, _ := json.Marshal(model.MessageReadRequest{RoomID: "r1"})
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Don’t silently ignore json.Marshal errors in the new tests.

The new message.read tests repeatedly use ..., _ := json.Marshal(...). Please assert these errors (require.NoError) so failures are explicit and the suite follows the project rule against silent error drops.

As per coding guidelines: “Never ignore errors silently — comment if intentionally discarded.”

Also applies to: 1981-1981, 1993-1993, 2019-2019, 2046-2046, 2065-2065, 2081-2081, 2103-2103, 2139-2139, 2160-2160, 2178-2178, 2201-2201, 2216-2216, 2232-2232, 2252-2252, 2273-2273

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@room-service/handler_test.go` at line 1974, Replace all instances where
json.Marshal calls are invoked with ignored errors in the message.read tests
(e.g., body, _ := json.Marshal(model.MessageReadRequest{RoomID: "r1"})) by
capturing the error and asserting it with require.NoError(t, err) (or
require.NoErrorf where context is helpful); specifically, change each "body, _
:= json.Marshal(...)" to "body, err := json.Marshal(...)" followed immediately
by "require.NoError(t, err)" so failures are explicit—apply this to the
occurrences using model.MessageReadRequest and any other json.Marshal calls in
handler_test.go for the message.read cases.

claude added 2 commits May 6, 2026 08:38
Per CLAUDE.md "All model structs get both json and bson tags", match
the convention sibling event types like MemberAddEvent already follow.
SubscriptionReadEvent is the OutboxEvent.Payload for subscription_read
and isn't currently persisted, but the project rule is uniform across
all pkg/model types so future bson-roundtrips (e.g. snapshotting,
debugging tools) marshal consistently with the JSON wire format.

Resolves PR review comment.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
Match the surrounding switch's style — every other case ('member_added',
'member_removed', 'room_sync', 'role_updated', 'thread_subscription_upserted')
uses a raw string literal, not a model.* constant. Keeping the new arm
consistent makes the dispatcher read uniformly.

Resolves PR review comment.

https://claude.ai/code/session_01G2qCzHCqcLBUPVExe7XzZq
@Allan-Code-hub Allan-Code-hub self-assigned this May 6, 2026
@Allan-Code-hub Allan-Code-hub requested a review from mliu33 May 6, 2026 09:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants