feat(data-migration): collections CDC migration — rooms / subscriptions / thread-subs / users#363
feat(data-migration): collections CDC migration — rooms / subscriptions / thread-subs / users#363general-lex wants to merge 3 commits into
Conversation
|
No actionable comments were generated in the recent review. 🎉 ℹ️ Recent review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (21)
💤 Files with no reviewable changes (1)
✅ Files skipped from review due to trivial changes (2)
🚧 Files skipped from review as they are similar to previous changes (14)
📝 WalkthroughWalkthroughAdds a new collections oplog transformer service, shared migration disposition/source-lookup primitives, and inbox-worker retry/timestamp-guard updates. The existing oplog transformer is refactored onto the shared migration APIs, with related tests and docs updated throughout. ChangesShared migration primitives
oplog-transformer refactor
Inbox worker retry behavior
New collections oplog transformer service
Documentation and infrastructure
Sequence Diagram(s)sequenceDiagram
participant JetStream as MIGRATION_OPLOG
participant ProcessOne as processOne
participant Handler as handler
participant TargetMongo as targetStore
participant Inbox as inboxPublisher
JetStream->>ProcessOne: deliver oplogEvent JSON
ProcessOne->>Handler: handle(ctx, ev)
alt rooms or subscriptions
Handler->>Inbox: publish inbox event
else thread_subscriptions
Handler->>TargetMongo: FindThreadRoom / FindUserID
Handler->>Inbox: publish thread_subscription_upserted
else users
Handler->>TargetMongo: UpsertUserIfAbsent
end
ProcessOne->>JetStream: Ack / Term / NakWithDelay
sequenceDiagram
participant Oplog as subscription oplog
participant SubHandler as handleSubscription
participant Inbox as inboxPublisher
Oplog->>SubHandler: insert/update/replace/delete
SubHandler->>SubHandler: classify open/roles/mute/favorite/read
SubHandler->>Inbox: publish member_added / role_updated / subscription_read
Estimated code review effort: 5 (Critical) | ~120 minutes Possibly related PRs
Suggested reviewers: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
0b66040 to
86555ac
Compare
1971a6a to
4f21bcd
Compare
fd09a93 to
038ae93
Compare
ngangwar962
left a comment
There was a problem hiding this comment.
Good work, few comments, and currently the CDC part in this PR does not handles the tsmc_room_member collection. Let's add that too.
Thanks
…nstead of error-retry Per PR #363 review: an empty/blank ALL_SITE_IDS (misconfig, or a partial deployment that doesn't fan user status) made publishUserStatus return a transient error, so the status event Nak-stormed to MAX_DELIVER (1000) before a loud Term — wasteful, and broke partial deployments. Detect the no-destination case at the sent==0 point and skip cleanly: one WARN log + onSkipped("status_no_sites") metric + migration.ErrSkipped (Ack-skip, no retries). Detection stays at sent==0 so malformed multi-entry values (e.g. ALL_SITE_IDS=",") still warn rather than silently ack. A startup hard-fail on empty ALL_SITE_IDS is the planned permanent form (TODO). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
…atus write commits Per PR #363 review: in UpdateUserStatus the guarded UpdateOne already commits the status write before the MatchedCount==0 branch runs. That branch's CountDocuments is only a logging aid (absent-account vs stale-event), so returning its error caused a pointless Nak that re-ran the already-applied, idempotent write. Downgrade the CountDocuments failure to a warn and continue (return nil) instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
There was a problem hiding this comment.
Actionable comments posted: 16
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
inbox-worker/integration_test.go (1)
1337-1368: 📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick winThese subtests still depend on shared DB state.
text-only update leaves stored isShow untouchedassumes the previous subtest already flippedStatusIsShow, and the stale-event case assumes thet2write already happened. That makes this block order-sensitive and brittle to future refactors/parallelization. Seed/reset the user inside each subtest, or split these into independent tests. As per coding guidelines, "Each test must be fully independent — no shared mutable state between tests, never rely on test execution order, set up and tear down all state within each test (or subtest)."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@inbox-worker/integration_test.go` around lines 1337 - 1368, The subtests in updateUserStatus are order-dependent because they share the same stored user state across cases. Make each subtest fully independent by seeding or resetting the alice record inside each t.Run before calling store.UpdateUserStatus, especially for the text-only and stale-event cases, so they do not rely on previous subtests’ StatusIsShow or statusUpdatedAt values. If needed, split the cases into separate tests to avoid shared mutable DB state and keep the assertions tied only to the local setup in each subtest.Source: Coding guidelines
🧹 Nitpick comments (3)
docs/superpowers/plans/2026-06-18-inbox-worker-member-added-wait-for-user.md (1)
86-140: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueUpdate test fixtures to include
SubIDfield for completeness.The
MemberAddEventstruct includesSubID(per the actual implementation and model changes). While not strictly required for these tests, including it makes the plan's examples match the current struct shape and avoids copy-paste surprises.change := model.MemberAddEvent{ Type: "member_added", RoomID: "room-1", Accounts: []string{"ghost"}, SiteID: "site-b", JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + SubID: "", }And similarly for the partial test at line 117:
change := model.MemberAddEvent{ Type: "member_added", RoomID: "room-1", Accounts: []string{"bob", "ghost"}, SiteID: "site-b", JoinedAt: time.Date(2026, 4, 1, 12, 0, 0, 0, time.UTC).UnixMilli(), + SubID: "", }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/superpowers/plans/2026-06-18-inbox-worker-member-added-wait-for-user.md` around lines 86 - 140, The MemberAddEvent test fixtures are missing the SubID field, so the examples no longer match the current struct shape. Update both test cases that build model.MemberAddEvent in the handler tests to include SubID alongside Type, RoomID, Accounts, SiteID, and JoinedAt, keeping the values consistent with the existing fixture setup. Use the model.MemberAddEvent symbol to locate the affected constructors.data-migration/oplog-collections-transformer/main.go (1)
269-275: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winWrap retry-loop errors with context.
These bare returns violate the repo error-wrapping rule and make startup failures harder to diagnose from callers that only receive the error.
Proposed fix
- if !errors.Is(err, jetstream.ErrStreamNotFound) || time.Now().After(deadline) { - return nil, err + if !errors.Is(err, jetstream.ErrStreamNotFound) { + return nil, fmt.Errorf("create consumer %q on stream %q: %w", cfg.Durable, streamName, err) + } + if time.Now().After(deadline) { + return nil, fmt.Errorf("wait for stream %q before creating consumer %q: %w", streamName, cfg.Durable, err) } @@ case <-ctx.Done(): - return nil, ctx.Err() + return nil, fmt.Errorf("wait for stream %q before creating consumer %q: %w", streamName, cfg.Durable, ctx.Err())As per coding guidelines, “Always wrap errors with context using
fmt.Errorf("short description: %w", err).”🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@data-migration/oplog-collections-transformer/main.go` around lines 269 - 275, The retry loop in the stream-waiting logic currently returns raw errors, which violates the repository’s error-wrapping convention and obscures startup failures. Update the error returns in the function containing the jetstream retry loop so they wrap the original error with context using fmt.Errorf("short description: %w", err), including both the jetstream.ErrStreamNotFound timeout path and the ctx.Done() path as needed. Use the surrounding symbols like the stream-waiting loop, deadline check, and slog.Warn call to locate the exact returns.Source: Coding guidelines
data-migration/oplog-transformer/processone_test.go (1)
208-215: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winKeep a
processOne-level metadata-error regression test.This now only checks the shared boundary math. It no longer exercises the changed
processOnebranch that turnsm.Metadata()failures intonumDelivered=0, so a regression there could flip Nak/Term behavior without failing this test. Add back afakeJSMsg{metaErr: ...}case at theprocessOnelevel and leaveTestIsFinalDeliveryfocused on pure helper semantics.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@data-migration/oplog-transformer/processone_test.go` around lines 208 - 215, Restore a processOne-level regression test for metadata errors: the current assertions in TestIsFinalDelivery only cover the shared boundary logic and no longer verify the changed processOne path that converts m.Metadata() failures into numDelivered=0. Add back a fakeJSMsg with metaErr in the processOne test flow so the Nak/Term behavior is exercised end-to-end, and keep TestIsFinalDelivery limited to pure IsFinalDelivery helper semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@data-migration/oplog-collections-transformer/config.go`:
- Around line 58-76: parseConfig currently trims and validates only the
collection env vars, so whitespace-only values for SITE_ID, NATS_URL,
SOURCE_MONGO_URI, and TARGET_MONGO_URI can still pass through to startup. Update
parseConfig in config.go to trim and revalidate those required fields too, and
return an error when any are empty after trimming; keep the existing fail-fast
behavior so main.go never proceeds with invalid config.
In `@data-migration/oplog-collections-transformer/deploy/docker-compose.yml`:
- Around line 25-27: The Docker build context is too shallow in the
docker-compose configuration, so the Dockerfile used by the
oplog-collections-transformer cannot access repo-root dependencies like go.mod
and pkg/. Update the build block for the oplog-collections-transformer service
to use the correct context from docker-compose.yml, keeping the Dockerfile path
the same while changing context to include the repository root so docker compose
build succeeds locally.
In `@data-migration/oplog-collections-transformer/handler_test.go`:
- Around line 153-167: The transient failure tests for handleRoom and handleUser
only assert that migration.ErrSkipped is not returned, so they still allow
publish/upsert outages to be classified as migration.ErrPoison. Update
TestHandleRoom_PublishError and TestHandleUser_UpsertError in handler_test.go to
also assert that the errors from handleRoom and handleUser are not
migration.ErrPoison, alongside the existing ErrSkipped checks, so the
retryable-vs-terminal contract is locked in.
- Around line 122-127: The `handle` test for `threadSubColl` only asserts
`migration.ErrSkipped`, which can also come from the generic fallback path and
does not prove `handleThreadSub` was invoked. Update this test in
`handler_test.go` to verify a thread-subscription-specific effect from
`handleThreadSub` (for example, a publisher/lookup interaction or a distinct
returned outcome), or use an input that only that branch can produce. Keep the
assertion tied to the `handle` dispatch for `threadSubColl` so the test fails if
that branch is removed.
In `@data-migration/oplog-collections-transformer/handler.go`:
- Around line 26-29: Degraded events are only being repaired for update
operations, so degraded insert/replace records with an empty fullDocument still
fall through resolveDoc and get poisoned. Update resolveDoc (and any helper
branches it uses) to treat degraded insert and replace the same as update when
Degraded is true: perform the source lookup from Mongo and use the recovered
document instead of returning a poison/term error. Make sure the existing
event-type checks in resolveDoc and the logic around the fullDocument empty case
continue to preserve non-degraded behavior.
In `@data-migration/oplog-collections-transformer/inboxpublisher_test.go`:
- Around line 30-43: The fixture in the InboxPublisher test uses the same value
for SiteID and DestSiteID, so it cannot detect a routing mix-up; update the test
in InboxPublisher.Publish to use distinct values for those fields and keep the
assertion against subject.InboxExternal based on the intended destination so a
source/destination swap fails. Make sure the change is localized to the test
setup around the captured Subject check.
In `@data-migration/oplog-collections-transformer/main.go`:
- Around line 169-191: The shutdown sequence in main currently stops the
consumer with cc.Stop() and then immediately drains NATS and disconnects Mongo,
which can interrupt in-flight work in processOne. Update the cleanup logic
around cc and shutdown.Wait to wait for the consumer to finish first by using
cc.Drain() and/or waiting on cc.Closed(), or by adding an in-flight WaitGroup,
before calling nc.Drain() and mongoutil.Disconnect for targetClient and source.
In `@data-migration/oplog-collections-transformer/metrics_test.go`:
- Around line 46-47: `otel.SetMeterProvider` in `metrics_test.go` mutates global
meter-provider state, so this test can leak the manual reader into later tests.
In the test that uses `sdkmetric.NewManualReader` and
`sdkmetric.NewMeterProvider`, save the current provider before setting the new
one, then restore it in `t.Cleanup` so the original provider is always reset
after the test finishes.
In `@data-migration/oplog-collections-transformer/rooms.go`:
- Around line 70-72: Wrap the room-handler failures in the oplog transformer
with operation context instead of returning bare errors. In the relevant methods
on the room handler (including the resolve/build/publish flow and the later
block referenced by the review), change each `return err` to `fmt.Errorf("short
description: %w", err)` so the worker logs show which step failed while still
preserving `errors.Is` behavior. Use the existing function names in this file to
identify the exact return sites and apply the same wrapping pattern
consistently.
- Around line 149-156: The switch in the room change handler only emits one
metadata event when both name/fname and ro are present in the same delta, so
update the logic to detect both changes and return both roomRenamedEvent(room)
and roomRestrictedEvent(room) together, along with roomSyncEvent(room). Adjust
the changed(desc, "name") / changed(desc, "fname") and changed(desc, "ro")
checks in the same transform path so a single CDC update can publish both room
metadata events instead of taking the first matching branch.
In `@data-migration/oplog-collections-transformer/subscriptions.go`:
- Around line 42-50: The source subscription time helpers are turning
zero-valued timestamps into year-0001/negative Unix millis, which then leaks
invalid joinedAt and lastSeenAt values into inbox events. Update the timestamp
conversion logic in sourceSubscription methods like lastSeenMillis and any
related joinedAt/ts handling to treat absent time.Time values as missing instead
of calling UTC().UnixMilli() on zero values, and ensure the callers in the
affected transform paths use a safe fallback or omit the field when ts/ls/lr is
unset.
- Around line 96-105: In subscriptions.go, the event flow in
`handleSubscriptionUpdate`/`publishSubscriptionState` is still emitting
subscription events for room types that `classifyRoom` marks as `Excluded`,
which leads to `member_added` events with an empty `RoomType` while the room
mapper drops the room. Add an explicit exclusion check before calling
`memberAddedEvent` or publishing the rebuilt subscription state so `l`, `v`, and
unknown `t` values are skipped entirely, and make the same guard in the related
update path referenced by the other affected block.
- Around line 80-105: Several subscription-handler errors are still returned
without operation context, so update the error paths in the subscriptions
handler to wrap every returned error with fmt.Errorf using a short descriptive
message and %w. Apply this to document-key parsing via documentKeyID, resolver
failures from h.resolveDoc, update failures from h.handleSubscriptionUpdate,
state publishing via h.publishSubscriptionState, and any related bare err
returns in the same flow so the caller can see which operation failed.
- Around line 132-137: The role update handling in `subscriptions.go` only
publishes `role_updated` when `ss.Roles` is non-empty, so cleared roles never
propagate and stale destination state remains. Update the `subChanged(desc,
"roles")` branch in the subscription handler to publish `h.roleUpdatedEvent(ss,
siteID)` whenever roles change, even when `ss.Roles` is empty, while keeping the
existing error handling and `emitted` tracking intact.
In `@data-migration/oplog-collections-transformer/users.go`:
- Around line 65-77: The `user_status_updated` fan-out is using the handler’s
local site instead of the federated user’s origin site. Update `handleUser` and
`publishUserStatus` so the derived site from `siteIDFromOrigin(...)` is passed
through and used for `InboxEvent.SiteID` in the envelope. Keep the change
localized to the `handleUser` flow and `publishUserStatus` helper so federated
users publish with the same site ID they are stored with.
In `@data-migration/SOURCE_DATA.md`:
- Around line 134-145: The read timestamp note in the SOURCE_DATA mapping is
inconsistent with the design spec decision; update the “Open”/Derived text
around the `lr` and `ls` fields to reflect that `LastSeenAt` should use `max(ls,
lr)` rather than leaning only on `lr`. Remove or rewrite the conflicting
parenthetical and any nearby wording so the guidance is unambiguous for
implementers, while keeping the references to `lr`, `ls`, and `LastSeenAt`
consistent throughout the document.
---
Outside diff comments:
In `@inbox-worker/integration_test.go`:
- Around line 1337-1368: The subtests in updateUserStatus are order-dependent
because they share the same stored user state across cases. Make each subtest
fully independent by seeding or resetting the alice record inside each t.Run
before calling store.UpdateUserStatus, especially for the text-only and
stale-event cases, so they do not rely on previous subtests’ StatusIsShow or
statusUpdatedAt values. If needed, split the cases into separate tests to avoid
shared mutable DB state and keep the assertions tied only to the local setup in
each subtest.
---
Nitpick comments:
In `@data-migration/oplog-collections-transformer/main.go`:
- Around line 269-275: The retry loop in the stream-waiting logic currently
returns raw errors, which violates the repository’s error-wrapping convention
and obscures startup failures. Update the error returns in the function
containing the jetstream retry loop so they wrap the original error with context
using fmt.Errorf("short description: %w", err), including both the
jetstream.ErrStreamNotFound timeout path and the ctx.Done() path as needed. Use
the surrounding symbols like the stream-waiting loop, deadline check, and
slog.Warn call to locate the exact returns.
In `@data-migration/oplog-transformer/processone_test.go`:
- Around line 208-215: Restore a processOne-level regression test for metadata
errors: the current assertions in TestIsFinalDelivery only cover the shared
boundary logic and no longer verify the changed processOne path that converts
m.Metadata() failures into numDelivered=0. Add back a fakeJSMsg with metaErr in
the processOne test flow so the Nak/Term behavior is exercised end-to-end, and
keep TestIsFinalDelivery limited to pure IsFinalDelivery helper semantics.
In
`@docs/superpowers/plans/2026-06-18-inbox-worker-member-added-wait-for-user.md`:
- Around line 86-140: The MemberAddEvent test fixtures are missing the SubID
field, so the examples no longer match the current struct shape. Update both
test cases that build model.MemberAddEvent in the handler tests to include SubID
alongside Type, RoomID, Accounts, SiteID, and JoinedAt, keeping the values
consistent with the existing fixture setup. Use the model.MemberAddEvent symbol
to locate the affected constructors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: e075e905-ce7e-4be5-a3b9-bd359bcb3c7e
📒 Files selected for processing (48)
.gitignoredata-migration/CDC_COVERAGE.mddata-migration/SOURCE_DATA.mddata-migration/oplog-collections-transformer/bootstrap.godata-migration/oplog-collections-transformer/classify.godata-migration/oplog-collections-transformer/classify_test.godata-migration/oplog-collections-transformer/config.godata-migration/oplog-collections-transformer/config_test.godata-migration/oplog-collections-transformer/deploy/Dockerfiledata-migration/oplog-collections-transformer/deploy/azure-pipelines.ymldata-migration/oplog-collections-transformer/deploy/docker-compose.ymldata-migration/oplog-collections-transformer/handler.godata-migration/oplog-collections-transformer/handler_test.godata-migration/oplog-collections-transformer/inboxpublisher.godata-migration/oplog-collections-transformer/inboxpublisher_test.godata-migration/oplog-collections-transformer/integration_test.godata-migration/oplog-collections-transformer/main.godata-migration/oplog-collections-transformer/metrics.godata-migration/oplog-collections-transformer/metrics_test.godata-migration/oplog-collections-transformer/rooms.godata-migration/oplog-collections-transformer/rooms_test.godata-migration/oplog-collections-transformer/subscriptions.godata-migration/oplog-collections-transformer/subscriptions_test.godata-migration/oplog-collections-transformer/targetstore.godata-migration/oplog-collections-transformer/threadsubs.godata-migration/oplog-collections-transformer/threadsubs_test.godata-migration/oplog-collections-transformer/users.godata-migration/oplog-collections-transformer/users_test.godata-migration/oplog-transformer/errors.godata-migration/oplog-transformer/handler.godata-migration/oplog-transformer/handler_test.godata-migration/oplog-transformer/historyclient.godata-migration/oplog-transformer/historyclient_test.godata-migration/oplog-transformer/integration_test.godata-migration/oplog-transformer/main.godata-migration/oplog-transformer/processone_test.godocs/superpowers/plans/2026-06-18-inbox-worker-member-added-wait-for-user.mddocs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.mdinbox-worker/handler.goinbox-worker/handler_test.goinbox-worker/integration_test.goinbox-worker/main.goinbox-worker/mock_store_test.gopkg/migration/disposition.gopkg/migration/disposition_test.gopkg/migration/sourcelookup.gopkg/model/event.gopkg/model/model_test.go
💤 Files with no reviewable changes (1)
- data-migration/oplog-transformer/errors.go
…nstead of error-retry Per PR #363 review: an empty/blank ALL_SITE_IDS (misconfig, or a partial deployment that doesn't fan user status) made publishUserStatus return a transient error, so the status event Nak-stormed to MAX_DELIVER (1000) before a loud Term — wasteful, and broke partial deployments. Detect the no-destination case at the sent==0 point and skip cleanly: one WARN log + onSkipped("status_no_sites") metric + migration.ErrSkipped (Ack-skip, no retries). Detection stays at sent==0 so malformed multi-entry values (e.g. ALL_SITE_IDS=",") still warn rather than silently ack. A startup hard-fail on empty ALL_SITE_IDS is the planned permanent form (TODO). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
…atus write commits Per PR #363 review: in UpdateUserStatus the guarded UpdateOne already commits the status write before the MatchedCount==0 branch runs. That branch's CountDocuments is only a logging aid (absent-account vs stale-event), so returning its error caused a pointless Nak that re-ran the already-applied, idempotent write. Downgrade the CountDocuments failure to a warn and continue (return nil) instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
812be99 to
a478167
Compare
…or rooms/users/subscriptions/thread-subs
Migrate the operational collections (rocketchat_rooms, rocketchat_subscriptions,
tsmc_thread_subscriptions, users) from the legacy source Mongo into the new-stack
per-site Mongo via a live CDC tail — the non-message counterpart to the message
migration. A new oplog-collections-transformer consumes MIGRATION_OPLOG and maps
each change event to either an inbox publish (applied terminally by inbox-worker)
or a direct users insert-if-absent that seeds the FK target.
Mappers:
- rooms -> room_sync / room_renamed / room_restricted (excludes livechat,
voip, group DMs; classifies channel/discussion/dm/botDM)
- subscriptions-> member_added (+ role_updated/mute/favorite/read state events);
open:false -> member_removed; source row hard-delete ->
subscription_deleted (destination adopts the source _id at
member_added time, so delete-by-_id maps back)
- thread_subs -> thread_subscription_upserted after resolving thread_room +
user FKs against target Mongo (Nak until both land)
- users -> direct insert-if-absent; statusText change fans
user_status_updated to all sites (statusUpdatedAt order-guarded)
Shared machinery (pkg/migration): JetStream disposition policy (Classify/Action/
IsFinalDelivery) and the Mongo source-lookup, extracted from the message
transformer which is refactored onto it.
inbox-worker: applies the new event types — subscription_deleted (DeleteByID),
user_status_updated (high-water guarded), thread_subscriptions rekeyed to
(threadRoomId, userAccount) to match message-worker; member_added Naks for an
unsynced user instead of dropping it.
pkg/model: MemberAddEvent.SubID, SubscriptionDeletedEvent, InboxSubscriptionDeleted.
Guards/observability: empty parentMessage._id/username Term (no Nak-storm); empty
ALL_SITE_IDS returns an error (visible Nak) rather than silently dropping a status
change; FindThreadRoom/FindUserID use explicit projections; disposition metrics
carry a collection label.
Scope is the live CDC tail from a handed-off checkpoint; bulk/initial sync is a
separate owner's job. Design + coverage matrix + source assumptions under
docs/superpowers/specs and data-migration/.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
…nstead of error-retry Per PR #363 review: an empty/blank ALL_SITE_IDS (misconfig, or a partial deployment that doesn't fan user status) made publishUserStatus return a transient error, so the status event Nak-stormed to MAX_DELIVER (1000) before a loud Term — wasteful, and broke partial deployments. Detect the no-destination case at the sent==0 point and skip cleanly: one WARN log + onSkipped("status_no_sites") metric + migration.ErrSkipped (Ack-skip, no retries). Detection stays at sent==0 so malformed multi-entry values (e.g. ALL_SITE_IDS=",") still warn rather than silently ack. A startup hard-fail on empty ALL_SITE_IDS is the planned permanent form (TODO). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
…atus write commits Per PR #363 review: in UpdateUserStatus the guarded UpdateOne already commits the status write before the MatchedCount==0 branch runs. That branch's CountDocuments is only a logging aid (absent-account vs stale-event), so returning its error caused a pointless Nak that re-ran the already-applied, idempotent write. Downgrade the CountDocuments failure to a warn and continue (return nil) instead. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
a478167 to
e1c8151
Compare
…363 - handler: recover degraded insert/replace (empty fullDocument + Degraded) via source lookup instead of poisoning — mirrors oplog-transformer, stops dropping documents. - rooms: a single update changing both name/fname AND ro now emits room_renamed AND room_restricted (plus room_sync), not just the first; wrap bare returns with context. - subscriptions: zero-guard ls/lr (→0) and ts (→now) so absent source times don't leak negative year-0001 timestamps; emit role_updated even when roles are cleared on an update; wrap resolver/documentKey returns with context. - config: trim + non-empty-validate SITE_ID/NATS_URL/SOURCE_MONGO_URI/TARGET_MONGO_URI so a whitespace-only required var fails at startup, not later. - main: warn once at startup when ALL_SITE_IDS is empty (status fan-out disabled) — the config half of the ALL_SITE_IDS handling. - tests: combined name+ro and degraded-recovery room tests; zero-timestamp and role-clear subscription tests; whitespace-config tests; tighten dispatch/error/fixture assertions and restore the global meter provider in metrics_test. - docs(SOURCE_DATA): read timestamp is max(ls,lr) (resolved), not an open question. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
There was a problem hiding this comment.
🧹 Nitpick comments (2)
data-migration/CDC_COVERAGE.md (1)
28-28: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueFix markdownlint table column count warnings.
The section header rows (
| **Rooms** |, etc.) have only 1 column while data rows have 6. Either split into separate tables per section or pad with empty cells to satisfy markdownlint.Also applies to: 33-33, 38-38, 43-43, 48-48
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@data-migration/CDC_COVERAGE.md` at line 28, The section header rows in CDC_COVERAGE.md have a mismatched column count compared with the 6-column data rows, causing markdownlint warnings. Update the affected section headers (such as the Rooms, etc. headings) so they either start separate tables per section or are padded with empty cells to match the table structure used by the rest of the document, keeping the formatting consistent across the table sections.data-migration/oplog-collections-transformer/rooms_test.go (1)
28-330: 📐 Maintainability & Code Quality | 🔵 Trivial | 💤 Low valueTest names omit the receiver type.
The repo guideline specifies
Test<Type>_<Method>(or..._<Scenario>), but these are namedTestHandleRoom_<Scenario>, dropping the handler type. ConsiderTestHandler_HandleRoom_<Scenario>for consistency. Same pattern acrosssubscriptions_test.go.As per coding guidelines: "Test naming:
Test<Type>_<Method>orTest<Type>_<Method>_<Scenario>".🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@data-migration/oplog-collections-transformer/rooms_test.go` around lines 28 - 330, The test names in handleRoom are missing the receiver type prefix required by the repo convention. Rename each TestHandleRoom_* function to follow TestHandler_HandleRoom_* so the Method name stays grouped under the Handler type, and apply the same naming pattern consistently in the related subscriptions tests; use the existing handleRoom test cases and the TestHandleRoom_* symbols as the targets for the rename.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@data-migration/CDC_COVERAGE.md`:
- Line 28: The section header rows in CDC_COVERAGE.md have a mismatched column
count compared with the 6-column data rows, causing markdownlint warnings.
Update the affected section headers (such as the Rooms, etc. headings) so they
either start separate tables per section or are padded with empty cells to match
the table structure used by the rest of the document, keeping the formatting
consistent across the table sections.
In `@data-migration/oplog-collections-transformer/rooms_test.go`:
- Around line 28-330: The test names in handleRoom are missing the receiver type
prefix required by the repo convention. Rename each TestHandleRoom_* function to
follow TestHandler_HandleRoom_* so the Method name stays grouped under the
Handler type, and apply the same naming pattern consistently in the related
subscriptions tests; use the existing handleRoom test cases and the
TestHandleRoom_* symbols as the targets for the rename.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 2d369201-6ef1-4dfc-a804-44b3d455450e
📒 Files selected for processing (48)
.gitignoredata-migration/CDC_COVERAGE.mddata-migration/SOURCE_DATA.mddata-migration/oplog-collections-transformer/bootstrap.godata-migration/oplog-collections-transformer/classify.godata-migration/oplog-collections-transformer/classify_test.godata-migration/oplog-collections-transformer/config.godata-migration/oplog-collections-transformer/config_test.godata-migration/oplog-collections-transformer/deploy/Dockerfiledata-migration/oplog-collections-transformer/deploy/azure-pipelines.ymldata-migration/oplog-collections-transformer/deploy/docker-compose.ymldata-migration/oplog-collections-transformer/handler.godata-migration/oplog-collections-transformer/handler_test.godata-migration/oplog-collections-transformer/inboxpublisher.godata-migration/oplog-collections-transformer/inboxpublisher_test.godata-migration/oplog-collections-transformer/integration_test.godata-migration/oplog-collections-transformer/main.godata-migration/oplog-collections-transformer/metrics.godata-migration/oplog-collections-transformer/metrics_test.godata-migration/oplog-collections-transformer/rooms.godata-migration/oplog-collections-transformer/rooms_test.godata-migration/oplog-collections-transformer/subscriptions.godata-migration/oplog-collections-transformer/subscriptions_test.godata-migration/oplog-collections-transformer/targetstore.godata-migration/oplog-collections-transformer/threadsubs.godata-migration/oplog-collections-transformer/threadsubs_test.godata-migration/oplog-collections-transformer/users.godata-migration/oplog-collections-transformer/users_test.godata-migration/oplog-transformer/errors.godata-migration/oplog-transformer/handler.godata-migration/oplog-transformer/handler_test.godata-migration/oplog-transformer/historyclient.godata-migration/oplog-transformer/historyclient_test.godata-migration/oplog-transformer/integration_test.godata-migration/oplog-transformer/main.godata-migration/oplog-transformer/processone_test.godocs/superpowers/plans/2026-06-18-inbox-worker-member-added-wait-for-user.mddocs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.mdinbox-worker/handler.goinbox-worker/handler_test.goinbox-worker/integration_test.goinbox-worker/main.goinbox-worker/mock_store_test.gopkg/migration/disposition.gopkg/migration/disposition_test.gopkg/migration/sourcelookup.gopkg/model/event.gopkg/model/model_test.go
💤 Files with no reviewable changes (12)
- pkg/migration/disposition_test.go
- pkg/migration/disposition.go
- data-migration/oplog-transformer/errors.go
- inbox-worker/mock_store_test.go
- pkg/model/model_test.go
- pkg/model/event.go
- inbox-worker/integration_test.go
- pkg/migration/sourcelookup.go
- docs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.md
- inbox-worker/handler_test.go
- inbox-worker/handler.go
- inbox-worker/main.go
✅ Files skipped from review due to trivial changes (3)
- .gitignore
- data-migration/oplog-transformer/integration_test.go
- data-migration/SOURCE_DATA.md
🚧 Files skipped from review as they are similar to previous changes (29)
- data-migration/oplog-collections-transformer/classify.go
- data-migration/oplog-transformer/processone_test.go
- data-migration/oplog-collections-transformer/classify_test.go
- data-migration/oplog-collections-transformer/deploy/Dockerfile
- data-migration/oplog-collections-transformer/deploy/docker-compose.yml
- data-migration/oplog-collections-transformer/metrics_test.go
- data-migration/oplog-transformer/historyclient.go
- data-migration/oplog-collections-transformer/inboxpublisher_test.go
- data-migration/oplog-collections-transformer/deploy/azure-pipelines.yml
- data-migration/oplog-collections-transformer/targetstore.go
- data-migration/oplog-transformer/historyclient_test.go
- data-migration/oplog-collections-transformer/metrics.go
- data-migration/oplog-collections-transformer/config.go
- data-migration/oplog-collections-transformer/inboxpublisher.go
- data-migration/oplog-collections-transformer/config_test.go
- data-migration/oplog-collections-transformer/bootstrap.go
- data-migration/oplog-collections-transformer/users_test.go
- data-migration/oplog-transformer/handler_test.go
- data-migration/oplog-collections-transformer/handler_test.go
- data-migration/oplog-collections-transformer/subscriptions.go
- data-migration/oplog-collections-transformer/threadsubs.go
- data-migration/oplog-collections-transformer/users.go
- data-migration/oplog-collections-transformer/rooms.go
- data-migration/oplog-collections-transformer/handler.go
- data-migration/oplog-collections-transformer/integration_test.go
- data-migration/oplog-transformer/main.go
- data-migration/oplog-transformer/handler.go
- data-migration/oplog-collections-transformer/main.go
- data-migration/oplog-collections-transformer/threadsubs_test.go
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
data-migration/oplog-collections-transformer/users_test.go (1)
83-99: 📐 Maintainability & Code Quality | 🟡 Minor | ⚡ Quick winAssert the update still seeds absent users.
This only proves that no inbox event is published. It would still pass if
handleUserstarted skipping updates entirely, even though the production path still callsUpsertUserIfAbsenton updates to seed previously unseen users. Add a target-store assertion here so the test covers both halves of the contract. As per coding guidelines, "Tests must cover: happy path, error paths, edge cases (empty collections, boundary conditions), and invalid input."🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@data-migration/oplog-collections-transformer/users_test.go` around lines 83 - 99, The TestHandleUser_PostSeedUpdate_NotPropagated case only verifies that no inbox event is published, but it does not confirm that updates still seed missing users through handleUser. Update this test to also assert the target store is seeded on the update path by checking the fakeTarget after calling handleUser, so the contract around UpsertUserIfAbsent remains covered along with fakePublisher behavior.Source: Coding guidelines
🧹 Nitpick comments (1)
inbox-worker/integration_test.go (1)
1444-1453: 📐 Maintainability & Code Quality | 🔵 Trivial | ⚡ Quick winSubtest depends on mutable state from the previous subtest.
The
text-only updatesubtest assertsStatusIsShow == false, but that value is only set by the precedingupdates text and isShowsubtest (the comment acknowledges this). Running it in isolation (e.g.-run .../text-only) or reordering subtests would break the assertion. Seed the precondition within this subtest so it is self-contained.As per coding guidelines: "Each test must be fully independent — no shared mutable state between tests, never rely on test execution order, set up and tear down all state within each test (or subtest)."
♻️ Make the subtest self-contained
t.Run("text-only update leaves stored isShow untouched", func(t *testing.T) { - // Stored isShow is currently false from the previous subtest; a nil - // isShow must not clobber it. + // Seed a known isShow=false so a nil isShow must not clobber it, + // independent of subtest ordering. + hide := false + require.NoError(t, store.UpdateUserStatus(ctx, "alice", "seed", &hide)) require.NoError(t, store.UpdateUserStatus(ctx, "alice", "heads down", nil))🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@inbox-worker/integration_test.go` around lines 1444 - 1453, The “text-only update leaves stored isShow untouched” subtest in integration_test.go depends on the previous “updates text and isShow” subtest to seed StatusIsShow=false. Make this subtest self-contained by setting up the alice user’s initial isShow state within the same subtest before calling store.UpdateUserStatus, then assert against the updated record using store.userCol.FindOne and model.User. Keep the existing behavior check that a nil isShow does not overwrite the stored value, but remove any reliance on subtest execution order.Source: Coding guidelines
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@data-migration/oplog-collections-transformer/users_test.go`:
- Around line 83-99: The TestHandleUser_PostSeedUpdate_NotPropagated case only
verifies that no inbox event is published, but it does not confirm that updates
still seed missing users through handleUser. Update this test to also assert the
target store is seeded on the update path by checking the fakeTarget after
calling handleUser, so the contract around UpsertUserIfAbsent remains covered
along with fakePublisher behavior.
---
Nitpick comments:
In `@inbox-worker/integration_test.go`:
- Around line 1444-1453: The “text-only update leaves stored isShow untouched”
subtest in integration_test.go depends on the previous “updates text and isShow”
subtest to seed StatusIsShow=false. Make this subtest self-contained by setting
up the alice user’s initial isShow state within the same subtest before calling
store.UpdateUserStatus, then assert against the updated record using
store.userCol.FindOne and model.User. Keep the existing behavior check that a
nil isShow does not overwrite the stored value, but remove any reliance on
subtest execution order.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 577aca95-e85d-49c4-bbf5-105321574919
📒 Files selected for processing (13)
data-migration/oplog-collections-transformer/config.godata-migration/oplog-collections-transformer/handler.godata-migration/oplog-collections-transformer/handler_test.godata-migration/oplog-collections-transformer/main.godata-migration/oplog-collections-transformer/subscriptions.godata-migration/oplog-collections-transformer/subscriptions_test.godata-migration/oplog-collections-transformer/users.godata-migration/oplog-collections-transformer/users_test.goinbox-worker/handler.goinbox-worker/handler_test.goinbox-worker/integration_test.goinbox-worker/main.goinbox-worker/mock_store_test.go
💤 Files with no reviewable changes (4)
- data-migration/oplog-collections-transformer/config.go
- data-migration/oplog-collections-transformer/handler.go
- data-migration/oplog-collections-transformer/main.go
- data-migration/oplog-collections-transformer/handler_test.go
✅ Files skipped from review due to trivial changes (1)
- inbox-worker/mock_store_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- inbox-worker/handler_test.go
- inbox-worker/main.go
…ation Collapses all changes made in response to review (Nitish, CodeRabbit, and a deep self-review) into one commit on top of the feature commit, so the review-response delta reads as a single diff. Subscriptions / inbox-worker (review fixes, kept): - Missing-subscription mute/favorite/read now Nak (like roles) instead of a silent no-op, so a field event that races ahead of member_added redelivers until the sub lands (extends Nitish's roles observation to the siblings). - Mute/favorite/role events stamp the source _updatedAt as their high-water mark, so a redelivered insert snapshot can't out-rank a newer update; a cleared roles set now propagates. Transformer correctness (CodeRabbit): - Degraded insert/replace recovers the doc via a source lookup instead of poisoning; a combined name+ro update emits both room_renamed and room_restricted; zero-time guards keep absent source timestamps from becoming year-0001 high-water values; required config scalars are trimmed and validated non-empty; SOURCE_DATA read timestamp = max(ls, lr); bare returns wrapped with context. Spec-conformance revert (the design spec + CDC_COVERAGE are decision-complete): - Subscription true-delete is un-actionable -> skip + metric (spec 4.0/4.3, coverage row 8). Removed the out-of-spec subscription_deleted machinery: InboxSubscriptionDeleted, SubscriptionDeletedEvent, MemberAddEvent.SubID, handleSubscriptionDeleted, DeleteSubscriptionByID, and the source-_id adoption in handleMemberAdded (back to generated UUIDv7). - Post-seed user updates are NOT propagated (spec 4.1/9, coverage row 15). Removed the user_status_updated fan-out: publishUserStatus, the statusText update path, ALL_SITE_IDS config + wiring, and the empty-sites startup warning. inbox-worker UpdateUserStatus reverted to its base last-write-wins form (drops the statusUpdatedAt guard, which only supported the fan-out). This keeps inbox-worker at the single spec-sanctioned change (handleMemberAdded skip->error) and removes the subscription_deleted membership-lane ordering hazard at its root. Build, full unit suite, lint, integration-compile, and gosec all green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
74c5689 to
1a99639
Compare
…igration Restores the user_status_updated fan-out that the earlier spec-conformance revert removed. Per reviewer feedback (ngangwar962): statusText is chat-originated, not part of the HR dataset, so no company-wide sync carries it — dropping it would silently lose every legacy statusText change during the migration window. HR fields stay insert-if-absent-only (unchanged); statusText is the one propagated post-seed user field. Transformer: - handleUser fans user_status_updated (reused event type) to all ALL_SITE_IDS on a statusText change; empty ALL_SITE_IDS -> warn + Ack-skip per event (startup hard-fail deferred). Restores publishUserStatus, the allSiteIDs config/field/wiring, and the startup warn. inbox-worker: - UpdateUserStatus applies under a statusUpdatedAt high-water guard so an out-of-order or duplicate fan-out delivery can't regress the status; keyed by account (a missing user on a site is a logged no-op). CountDocuments failure after the committed write is swallowed (logging aid only, must not Nak). Docs amended to make statusText an explicit chat-owned exception to the "post-seed user updates not propagated" rule: design spec §4.1/§4.1a/§8.1/§9, CDC_COVERAGE.md (row 15 split + 15a + handler table), SOURCE_DATA.md §6. subscription_deleted (divergence A) stays reverted — unrelated and genuinely out of scope. Build, race unit tests, lint (0), integration-compile, gosec all green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
| Subscription: model.Subscription{ | ||
| User: model.SubscriptionUser{ID: ss.U.ID, Account: ss.U.Username}, | ||
| RoomID: ss.RID, | ||
| Roles: mapSubscriptionRoles(ss.Roles), |
There was a problem hiding this comment.
mapSubscriptionRoles(ss.Roles) returns nil when ss.Roles is empty (a demoted user in Rocket.Chat). That nil makes it through to inbox-worker, which permanently drops any role_updated event with empty roles at inbox-worker/handler.go:233:
if len(roles) == 0 {
return errcode.Permanent(errcode.BadRequest("role_updated event has empty roles"))
}
The live path avoids this because room-service always ensures at least ["member"] after a demotion. The transformer should follow the same rule:
roles := mapSubscriptionRoles(ss.Roles)
if len(roles) == 0 {
roles = []model.Role{model.RoleMember} // mirrors room-service's SetOwnerRole guarantee
}
Without this, every demotion event from Rocket.Chat is silently lost and the user stays "owner" on the destination site forever.
| // subUpdatedAtMillis is the source subscription's _updatedAt — the high-water mark the field-update | ||
| // guards (mute/favorite/roles) stamp on their events, stable across redelivery so a re-delivered | ||
| // inline insert snapshot can't out-rank a newer update. Falls back to now() when the source omits it. | ||
| func (h *handler) subUpdatedAtMillis(ss *sourceSubscription) int64 { |
There was a problem hiding this comment.
The destination MongoDB has five guard fields that inbox-worker uses as high-water marks to reject stale or replayed events. The historical data migration already established the correct mapping for each; the CDC migration needs to match those same rules exactly, or a subscription written by the historical migration could end up with a guard value that blocks valid live events.
Required mapping (from the historical migration):
| Guard field | Source value | When non-zero |
|---|---|---|
rolesUpdatedAt |
_updatedAt |
always — ✓ current code is correct |
muteUpdatedAt |
_updatedAt |
only when disableNotifications=true; zero when false |
restrictUpdatedAt |
_updatedAt |
only when restricted=true; zero when false (handled by room events — out of scope for this file) |
nameUpdatedAt |
_updatedAt |
always (handled by room_renamed — out of scope for this file) |
favouriteUpdatedAt |
favoritedAt (not _updatedAt) |
only when f=true; zero when false |
Two problems in this PR:
1. muteEvent (line 267): timestamp is _updatedAt regardless of muted state
Timestamp: h.subUpdatedAtMillis(ss) always sends _updatedAt. When DisableNotifications=false it should send 0 so that the muteUpdatedAt guard in inbox-worker stays at zero and never blocks a future mute event.
var muteTs int64
if ss.DisableNotifications {
muteTs = h.subUpdatedAtMillis(ss)
}
// then: Timestamp: muteTs2. favoriteEvent (line 277) + sourceSubscription struct (line 30): wrong source field, and favoritedAt not decoded
Two changes needed together:
sourceSubscriptionis missing the Rocket.ChatfavoritedAtfield — the struct only hasF bool \bson:"f"`but never decodesfavoritedAt`.Timestamp: h.subUpdatedAtMillis(ss)uses_updatedAtinstead offavoritedAt, and sends a non-zero value even whenF=false.
// Add to sourceSubscription:
FavoritedAt time.Time `bson:"favoritedAt"`
// In favoriteEvent:
var favTs int64
if ss.F && !ss.FavoritedAt.IsZero() {
favTs = ss.FavoritedAt.UTC().UnixMilli()
}
// then: Timestamp: favTsGenerated by Claude Code
| RoomID: ss.RID, | ||
| Favorite: ss.F, | ||
| Timestamp: h.nowMillis(), | ||
| Timestamp: h.subUpdatedAtMillis(ss), |
There was a problem hiding this comment.
We cannot use _updatedAt time stamp for all the different events, like muteUpdatedAt, rolesUpdatedAt, restrictUpdatedAt, nameUpdatedAt, favoriteUpdatedAt. They have different logic for computation. Please take a look at collection fields mapping on TKMS.
| // | ||
| //nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path. | ||
| func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEvent, error) { | ||
| if ev.Op != "update" { |
There was a problem hiding this comment.
insert and replace both hit the != "update" branch and return only room_sync. For insert that is correct — it is a new room, nothing to rename or change visibility on. For replace it is wrong.
replace is a full-document replacement with no UpdateDescription delta — there is no way to check which fields changed. The safe approach is to emit all field-level events conservatively, since name and ro could both have changed:
if ev.Op == "insert" {
return []model.InboxEvent{h.roomSyncEvent(room)}, nil
}
if ev.Op != "update" {
// replace: no UpdateDescription delta available; emit all field events conservatively
// so a room rename or visibility change is never silently dropped.
return []model.InboxEvent{
h.roomRenamedEvent(room),
h.roomRestrictedEvent(room),
h.roomSyncEvent(room),
}, nil
}
room_renamed and room_restricted are idempotent on inbox-worker's side (they write the current value and advance the guard timestamp), so a false positive on an unchanged replace is harmless.
What
The next data-migration increment: migrating the operational collections (
rocketchat_rooms,rocketchat_subscriptions,tsmc_thread_subscriptions,users) from the legacy source Mongo into the new-stack per-site Mongo, via a live CDC tail. Counterpart to the message path (#331) — this handles non-message data.How it works
Per-site, the existing
oplog-connectortails these collections toMIGRATION_OPLOG. A new consumer,oplog-collections-transformer, maps each change event to either:inbox-worker(rooms →room_sync/room_renamed/room_restricted; subscriptions →member_added/member_removed/role_updated/subscription_mute_toggled/subscription_favorite_toggled/subscription_read; users →user_status_updatedon astatusTextchange, fanned to all sites), orusersinsert-if-absent by account (seeds the FK target).Scope is the live CDC tail from a handed-off checkpoint; the bulk/initial state sync is a separate owner's job.
Design:
docs/superpowers/specs/2026-06-16-oplog-transformer-collections-design.md. Coverage matrix:data-migration/CDC_COVERAGE.md. Source assumptions:data-migration/SOURCE_DATA.md. The implementation tracks these docs exactly.Key design decisions
insert/replaceuse the inline full doc;updatere-reads the full source doc by_id(the connector forwards only the delta).open:false→member_removed(deletes the row — membership is binary). A subscription true row-delete is un-actionable (only the source_id, which doesn't map to the destination's generatedUUIDv7) → skip + metric (spec §4.0/§4.3).federation.originfirst-label (absent/"local"→ deployment site).statusTextis the one exception: it's chat-originated and unowned by any other sync, so astatusTextchange fansuser_status_updatedto all sites (global-visibility), applied under astatusUpdatedAthigh-water guard so out-of-order delivery can't regress it (spec §4.1a/§9).LastSeenAt = max(ls, lr); thread-subs resolvethread_room(by parentMessageId) + user (by account) against target Mongo, Nak until both land.d>2), room/user deletes, thread unfollows — all documented.Changes
pkg/migration(new): shared JetStream disposition policy + Mongo source-lookup, extracted from the message transformer.data-migration/oplog-collections-transformer(new service): config, room-type classification, four mappers, JetStream inbox publisher, Mongo target store, runtime wiring, integration tests,deploy/.inbox-worker:handleMemberAddedNaks amember_addedfor an unsynced user instead of dropping it (§5/§7); missing-subscription mute/favorite/read updates Nak (like roles);UpdateUserStatusgains thestatusUpdatedAthigh-water guard for thestatusTextfan-out.Refinements on this branch
statusTextpropagation restored as a documented chat-owned exception to the "no post-seed user updates" rule (per reviewer feedback — nothing else carries legacystatusTextinto the new stack during migration). EmptyALL_SITE_IDS→ warn + Ack-skip (no retry storm).parentMessage._id/usernameTerm instead of Nak-storming to the cap.room_renamedandroom_restrictedwhen name +rochange together._updatedAtso a redelivered insert snapshot can't out-rank a newer update.FindThreadRoom/FindUserIDuse explicit projections (CLAUDE.md §MongoDB).collectionlabel added to the disposition metrics.Testing
-race), fullmake lint0 issues,make buildclean, gosec clean (govulncheck/semgrep blocked by network only).handle— they compile (go vet -tags=integrationclean) and run in CI (Docker).Pending (external / deferred)
SOURCE_DATA.md/CDC_COVERAGE.md: the room field backingExternalAccess(defaultedfalse), whetherreplaceoccurs, employee-id location.ALL_SITE_IDSstartup hard-fail (currently per-event warn+skip); resolver caches; collectiondrop/rename; botDM detection refinement (DMs classify asdm).🤖 Generated with Claude Code
https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
Summary by CodeRabbit
Summary by CodeRabbit