message-worker: cut per-reply DB work on the thread-reply hot path#424
message-worker: cut per-reply DB work on the thread-reply hot path#424hmchangw wants to merge 1 commit into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (6)
✅ Files skipped from review due to trivial changes (1)
🚧 Files skipped from review as they are similar to previous changes (4)
📝 WalkthroughWalkthroughThe PR replaces split thread-room lookup/create operations with ChangesThread room ensure and advancing subscription upsert
Handler and reply-flow tests
Estimated code review effort: 4 (Complex) | ~45 minutes Possibly related PRs
Suggested labels: Suggested reviewers: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@message-worker/handler.go`:
- Around line 267-276: The CreateThreadRoom flow in handleFirstThreadReply
handling currently loses the redelivery self-heal path for a missing parent
subscription. Update the errThreadRoomExists branch in handler.go so
redeliveries still repair the parent-author subscription before returning from
handleSubsequentThreadReply, or otherwise reintroduce an idempotent parent
upsert in the first-reply/retry path. Make sure the recovery logic is preserved
for the thread reply handling around handleFirstThreadReply and
handleSubsequentThreadReply so a partially created thread can heal on retry.
In `@message-worker/integration_test.go`:
- Around line 1948-1980: The subtests in the integration test are sharing the
same persisted subscription state, so later cases depend on mutations from
earlier ones. Make each t.Run case self-contained by seeding the document within
that subtest (or splitting them into separate top-level tests) before calling
UpsertThreadSubscriptionAdvancingLastSeen and asserting on read(), so the checks
for lastSeenAt and _id in the subscription test do not rely on execution order.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: fa1768c1-67b3-4649-84c1-50d6965e7c37
📒 Files selected for processing (6)
message-worker/handler.gomessage-worker/handler_test.gomessage-worker/integration_test.gomessage-worker/mock_store_test.gomessage-worker/store.gomessage-worker/store_mongo.go
| err := h.threadStore.CreateThreadRoom(ctx, &threadRoom) | ||
| switch { | ||
| case err == nil: | ||
| return threadRoom.ID, h.handleFirstThreadReply(ctx, msg, eventSiteID, threadRoom.ID, replier, now, isMigration) | ||
| // First reply is rare (once per thread); it advances the replier's lastSeenAt via the | ||
| // standalone $max in the caller, so it reports replierLastSeenAdvanced=false. | ||
| return threadRoom.ID, false, h.handleFirstThreadReply(ctx, msg, eventSiteID, threadRoom.ID, replier, now, isMigration) | ||
| case errors.Is(err, errThreadRoomExists): | ||
| return h.handleSubsequentThreadReply(ctx, msg, eventSiteID, replier, now, isMigration) | ||
| default: | ||
| return "", fmt.Errorf("create thread room: %w", err) | ||
| return "", false, fmt.Errorf("create thread room: %w", err) |
There was a problem hiding this comment.
🗄️ Data Integrity & Integration | 🟠 Major | 🏗️ Heavy lift
This drops the only retry path that repairs a partially created parent subscription.
If CreateThreadRoom succeeds and any later step in handleFirstThreadReply fails, redelivery comes back through the errThreadRoomExists branch into handleSubsequentThreadReply. This code now permanently skips the parent-author upsert, so the room can recover and persist replies while the parent's thread_subscriptions row stays missing forever. Please keep a self-heal path for the parent subscription on redelivery, even if the steady-state hot path stays at one write.
Also applies to: 354-361
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@message-worker/handler.go` around lines 267 - 276, The CreateThreadRoom flow
in handleFirstThreadReply handling currently loses the redelivery self-heal path
for a missing parent subscription. Update the errThreadRoomExists branch in
handler.go so redeliveries still repair the parent-author subscription before
returning from handleSubsequentThreadReply, or otherwise reintroduce an
idempotent parent upsert in the first-reply/retry path. Make sure the recovery
logic is preserved for the thread reply handling around handleFirstThreadReply
and handleSubsequentThreadReply so a partially created thread can heal on retry.
Investigating high MongoDB CPU under the thread max-rps load test, three per-reply reductions on message-worker's thread-reply path — each removing work that runs on every subsequent reply: 1. thread_subscriptions writes 3 -> 1. Drop the parent-author subscription re-upsert on subsequent replies (it is created once, on the first reply) along with its owner-site lookup, and fold the replier's own lastSeenAt $max into the replier subscription upsert via a new UpsertThreadSubscriptionAdvancingLastSeen ($setOnInsert + $max in one write). The standalone AdvanceThreadSubscriptionLastSeen now runs only on paths that write no replier sub (migration, self-reply, system msg). 2. Resolve the thread room in one round trip. Replace the create-first pattern (CreateThreadRoom -> dup-key -> GetThreadRoomByParentMessageID) with an upserting EnsureThreadRoom (FindOneAndUpdate $setOnInsert, ReturnDocument:After). One round trip for both first and subsequent replies, with no failed unique-index insert; the caller distinguishes first vs subsequent by the returned _id. 3. Stop re-stamping the parent's thread_room_id on every subsequent reply. It is immutable and stamped once, on the first reply. Trade-offs (narrow crash windows, not worth a per-reply write to self-heal): a first reply that creates the room but crashes before writing the parent subscription / parent stamp leaves those unset for that thread. Note: profiling showed the thread max-rps ceiling is bounded by worker concurrency (MAX_WORKERS and the Mongo connection pool, both defaulting to 100) at <=70% resource utilization — not by these ops. These changes lower per-reply Mongo/Cassandra load, raising the DB ceiling once concurrency is unblocked; they do not by themselves move max RPS. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_01VzvswJ23JQB1nYyskjBpi9
193b5cf to
98e8b52
Compare
Context
Investigating high MongoDB CPU under the thread
max-rpsload test. This PR removes work thatmessage-workerdoes on every subsequent thread reply. It's three focused reductions on the thread-reply hot path.What changed
1.
thread_subscriptionswrites: 3 → 1 per replyFindUserByIDread.lastSeenAt$maxinto the replier subscription upsert via a newUpsertThreadSubscriptionAdvancingLastSeen($setOnInsert+$maxin one write, non-conflicting becauselastSeenAtis owned solely by$max).AdvanceThreadSubscriptionLastSeennow runs only on paths that write no replier sub (migration, self-reply, system message).2. Thread-room resolution: 1 round trip (both first and subsequent replies)
CreateThreadRoom→ duplicate-key →GetThreadRoomByParentMessageID) with an upsertingEnsureThreadRoom(FindOneAndUpdatewith$setOnInsert+ReturnDocument:After)._idto the candidate's. A rare concurrent-first-reply dup-key is resolved with a single read.3. Stop re-stamping the parent's
thread_room_idon every subsequent replyhandleFirstThreadReply). Removes one Cassandra write per subsequent reply.Trade-offs (intentional)
Narrow crash windows, not worth a per-reply write to self-heal: if a first reply creates the room but crashes before writing the parent subscription / parent stamp, those stay unset for that thread. Same class of trade-off across all three changes.
Important finding — this does not move max RPS on its own
Profiling during the load test showed the thread
max-rpsceiling (~500–600, with a growing consumer backlog and E1/E2 P95 creeping over SLO) is bounded by worker concurrency, not DB CPU:MAX_WORKERSand the Mongo connection pool both default to 100, while every resource sits at ≤70%.drain rate ≈ MaxWorkers / per-msg-latency ≈ 100/200ms ≈ 500/s, so the backlog grows at ~600 offered.These changes lower per-reply Mongo/Cassandra load, which raises the DB ceiling once concurrency is unblocked — but the lever that actually moves the number is raising
MAX_WORKERSandmaxPoolSizetogether (they must move together, or the pool re-caps at 100). That's a config change, tracked separately.Scope & testing
message-worker(aMESSAGES_CANONICALJetStream consumer — not achat.user.*client-facing handler, so nodocs/client-api.mdchange).make test(full suite, race) green;make lintclean;make sast-gosecclean; integration build compiles (EnsureThreadRoom+ combined-upsert integration tests added).main(linear history, no merge commit).🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests