Skip to content

Commit 58dee20

Browse files
committed
fix(oplog-collections-transformer): address CodeRabbit findings on PR #363
- handler: recover degraded insert/replace (empty fullDocument + Degraded) via source lookup instead of poisoning — mirrors oplog-transformer, stops dropping documents. - rooms: a single update changing both name/fname AND ro now emits room_renamed AND room_restricted (plus room_sync), not just the first; wrap bare returns with context. - subscriptions: zero-guard ls/lr (→0) and ts (→now) so absent source times don't leak negative year-0001 timestamps; emit role_updated even when roles are cleared on an update; wrap resolver/documentKey returns with context. - config: trim + non-empty-validate SITE_ID/NATS_URL/SOURCE_MONGO_URI/TARGET_MONGO_URI so a whitespace-only required var fails at startup, not later. - main: warn once at startup when ALL_SITE_IDS is empty (status fan-out disabled) — the config half of the ALL_SITE_IDS handling. - tests: combined name+ro and degraded-recovery room tests; zero-timestamp and role-clear subscription tests; whitespace-config tests; tighten dispatch/error/fixture assertions and restore the global meter provider in metrics_test. - docs(SOURCE_DATA): read timestamp is max(ls,lr) (resolved), not an open question. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
1 parent e1cce38 commit 58dee20

12 files changed

Lines changed: 207 additions & 50 deletions

File tree

data-migration/SOURCE_DATA.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ One row per (user, room). ✅ Unique index `{ rid:1, 'u._id':1 }`.
131131
| `ts` | date | Join time (set once, stable across re-joins) ||
132132
| `roles[]` | string[] | `owner`/`moderator`/`leader`/`user` (role-based ownership) ||
133133
| `ls` | date | Last **seen** (scrolled cursor) ||
134-
| `lr` | date | Last **read** (explicit mark) *we plan to use this* ||
134+
| `lr` | date | Last **read** (explicit mark) ||
135135
| `alert` | bool | True if **any** unread content (not just mentions) ||
136136
| `userMentions` / `groupMentions` | int | Unread `@user` / `@all`,`@here` counts ||
137137
| `tunread[]` | string[] | Parent-message ids (`tmid`) of threads with any unread ||
@@ -142,7 +142,7 @@ One row per (user, room). ✅ Unique index `{ rid:1, 'u._id':1 }`.
142142
| `name` / `fname` | string | Machine name / friendly display name ||
143143
| `federation.origin` | string (opt) | Origin site (assumed consistent with room) | ✅ ❓ |
144144

145-
Derived: "has mention" = `userMentions>0 || groupMentions>0`; "muted" = `disableNotifications`. **Open:** read timestamp = `lr` or `ls`? (we lean `lr`).
145+
Derived: "has mention" = `userMentions>0 || groupMentions>0`; "muted" = `disableNotifications`; **read timestamp (`lastSeenAt`) = `max(ls, lr)`** (resolved per design D1 — the furthest point consumed by either the scrolled cursor or the explicit mark-read).
146146

147147
## 5. `tsmc_thread_subscriptions`
148148

data-migration/oplog-collections-transformer/config.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,11 +60,22 @@ func parseConfig() (config, error) {
6060
if err != nil {
6161
return config{}, fmt.Errorf("parse config: %w", err)
6262
}
63+
// caarlos0/env `required` only rejects an unset var, not a whitespace-only one. Trim and
64+
// re-validate the required scalars too, so a value like " " fails here rather than breaking
65+
// subject building / connections later at runtime.
66+
cfg.SiteID = strings.TrimSpace(cfg.SiteID)
67+
cfg.NatsURL = strings.TrimSpace(cfg.NatsURL)
68+
cfg.SourceMongoURI = strings.TrimSpace(cfg.SourceMongoURI)
69+
cfg.TargetMongoURI = strings.TrimSpace(cfg.TargetMongoURI)
6370
cfg.RoomsCollection = strings.TrimSpace(cfg.RoomsCollection)
6471
cfg.SubscriptionsCollection = strings.TrimSpace(cfg.SubscriptionsCollection)
6572
cfg.ThreadSubsCollection = strings.TrimSpace(cfg.ThreadSubsCollection)
6673
cfg.UsersCollection = strings.TrimSpace(cfg.UsersCollection)
6774
for name, v := range map[string]string{
75+
"SITE_ID": cfg.SiteID,
76+
"NATS_URL": cfg.NatsURL,
77+
"SOURCE_MONGO_URI": cfg.SourceMongoURI,
78+
"TARGET_MONGO_URI": cfg.TargetMongoURI,
6879
"ROOMS_COLLECTION": cfg.RoomsCollection,
6980
"SUBSCRIPTIONS_COLLECTION": cfg.SubscriptionsCollection,
7081
"THREAD_SUBS_COLLECTION": cfg.ThreadSubsCollection,

data-migration/oplog-collections-transformer/config_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,19 @@ func TestParseConfig_CustomCollectionNames(t *testing.T) {
137137
assert.Equal(t, "my_thread_subs", cfg.ThreadSubsCollection)
138138
assert.Equal(t, "my_users", cfg.UsersCollection)
139139
}
140+
141+
func TestParseConfig_WhitespaceSiteID_Errors(t *testing.T) {
142+
setRequiredEnv(t)
143+
t.Setenv("SITE_ID", " ")
144+
_, err := parseConfig()
145+
require.Error(t, err)
146+
assert.Contains(t, err.Error(), "SITE_ID must be non-empty")
147+
}
148+
149+
func TestParseConfig_WhitespaceSourceMongoURI_Errors(t *testing.T) {
150+
setRequiredEnv(t)
151+
t.Setenv("SOURCE_MONGO_URI", " ")
152+
_, err := parseConfig()
153+
require.Error(t, err)
154+
assert.Contains(t, err.Error(), "SOURCE_MONGO_URI must be non-empty")
155+
}

data-migration/oplog-collections-transformer/handler.go

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -99,31 +99,20 @@ func (h *handler) resolveDoc(ctx context.Context, ev oplogEvent) (doc []byte, sk
9999
switch ev.Op {
100100
case "insert", "replace":
101101
if len(ev.FullDocument) == 0 {
102-
// The connector always carries the doc for insert/replace — a missing one is a
103-
// contract violation that can never succeed on redelivery. Poison.
104-
return nil, false, fmt.Errorf("%w: %s without fullDocument", migration.ErrPoison, ev.Op)
102+
if !ev.Degraded {
103+
// The connector always carries the doc for insert/replace — a non-degraded missing
104+
// one is a contract violation that can never succeed on redelivery. Poison.
105+
return nil, false, fmt.Errorf("%w: %s without fullDocument", migration.ErrPoison, ev.Op)
106+
}
107+
// Degraded: the connector couldn't encode fullDocument (left nil) but still published.
108+
// Recover the live source doc by _id rather than drop it — mirrors oplog-transformer.
109+
slog.Warn("recovering degraded insert/replace via source lookup",
110+
"eventId", ev.EventID, "reason", ev.DegradedReason, "request_id", natsutil.RequestIDFromContext(ctx))
111+
return h.resolveBySourceLookup(ctx, ev)
105112
}
106113
return ev.FullDocument, false, nil
107114
case "update":
108-
id, idErr := documentKeyID(ev.DocumentKey)
109-
if idErr != nil {
110-
return nil, false, idErr
111-
}
112-
lk := h.lookups[ev.Collection]
113-
if lk == nil {
114-
// An update for a collection we don't have a source lookup for is a misconfiguration
115-
// (filter subjects and the lookups map disagree) — it can never succeed. Poison.
116-
return nil, false, fmt.Errorf("%w: no source lookup for collection %q", migration.ErrPoison, ev.Collection)
117-
}
118-
got, lookupErr := lk.FindByID(ctx, id)
119-
if lookupErr != nil {
120-
return nil, false, fmt.Errorf("lookup %q: %w", id, lookupErr)
121-
}
122-
if got == nil {
123-
// Doc vanished from source between the change event and our re-read — nothing to apply.
124-
return nil, true, nil
125-
}
126-
return got, false, nil
115+
return h.resolveBySourceLookup(ctx, ev)
127116
case "delete":
128117
// Un-actionable: only documentKey._id, and the destination doesn't key by source _id.
129118
return nil, true, nil
@@ -133,6 +122,33 @@ func (h *handler) resolveDoc(ctx context.Context, ev oplogEvent) (doc []byte, sk
133122
}
134123
}
135124

125+
// resolveBySourceLookup re-reads the full current source doc by documentKey._id — used for updates
126+
// (the connector forwards only the delta) and degraded insert/replace (fullDocument couldn't encode).
127+
// skip=true when the doc vanished from source between the event and our re-read.
128+
//
129+
//nolint:gocritic // ev passed by value to mirror resolveDoc's signature; off the hot path.
130+
func (h *handler) resolveBySourceLookup(ctx context.Context, ev oplogEvent) (doc []byte, skip bool, err error) {
131+
id, idErr := documentKeyID(ev.DocumentKey)
132+
if idErr != nil {
133+
return nil, false, idErr
134+
}
135+
lk := h.lookups[ev.Collection]
136+
if lk == nil {
137+
// No source lookup for this collection is a misconfiguration (filter subjects and the
138+
// lookups map disagree) — it can never succeed. Poison.
139+
return nil, false, fmt.Errorf("%w: no source lookup for collection %q", migration.ErrPoison, ev.Collection)
140+
}
141+
got, lookupErr := lk.FindByID(ctx, id)
142+
if lookupErr != nil {
143+
return nil, false, fmt.Errorf("lookup %q: %w", id, lookupErr)
144+
}
145+
if got == nil {
146+
// Doc vanished from source between the change event and our re-read — nothing to apply.
147+
return nil, true, nil
148+
}
149+
return got, false, nil
150+
}
151+
136152
// documentKeyID decodes documentKey → _id (the common string case). Returns migration.ErrPoison
137153
// when missing/malformed — mirrors the message transformer's documentKeyID.
138154
func documentKeyID(documentKey json.RawMessage) (string, error) {

data-migration/oplog-collections-transformer/handler_test.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ func TestHandle_Dispatch(t *testing.T) {
119119
assert.NotEmpty(t, pub.events)
120120
})
121121

122-
t.Run("thread-subs delete routes to handleThreadSub and skips", func(t *testing.T) {
122+
t.Run("thread-subs routes to handleThreadSub (poison is branch-specific)", func(t *testing.T) {
123123
h := newTestHandler(&fakePublisher{}, &fakeTarget{}, &fakeLookup{})
124-
err := h.handle(context.Background(), oplogEvent{Op: "delete", Collection: threadSubColl,
125-
DocumentKey: json.RawMessage(`{"_id":"ts1"}`)})
126-
assert.ErrorIs(t, err, migration.ErrSkipped)
124+
// Empty parentMessage._id poisons ONLY on the thread-sub branch; the default-skip would
125+
// return ErrSkipped, so ErrPoison proves the event actually reached handleThreadSub.
126+
err := h.handle(context.Background(), oplogEvent{Op: "insert", Collection: threadSubColl,
127+
FullDocument: json.RawMessage(`{"_id":"ts1","u":{"username":"alice"},"parentMessage":{"_id":""}}`)})
128+
assert.ErrorIs(t, err, migration.ErrPoison)
127129
})
128130

129131
t.Run("unknown collection skipped", func(t *testing.T) {
@@ -157,6 +159,7 @@ func TestHandleRoom_PublishError(t *testing.T) {
157159
err := h.handleRoom(context.Background(), roomEv("insert", doc, ""))
158160
require.Error(t, err)
159161
assert.NotErrorIs(t, err, migration.ErrSkipped)
162+
assert.NotErrorIs(t, err, migration.ErrPoison, "a transient publish failure must Nak, not poison")
160163
}
161164

162165
func TestHandleUser_UpsertError(t *testing.T) {
@@ -165,6 +168,7 @@ func TestHandleUser_UpsertError(t *testing.T) {
165168
err := h.handleUser(context.Background(), userEv("insert", `{"_id":"u1","username":"alice"}`))
166169
require.Error(t, err)
167170
assert.NotErrorIs(t, err, migration.ErrSkipped)
171+
assert.NotErrorIs(t, err, migration.ErrPoison, "a transient upsert failure must Nak, not poison")
168172
}
169173

170174
func TestResolveDoc(t *testing.T) {

data-migration/oplog-collections-transformer/inboxpublisher_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
2727
},
2828
}
2929

30+
// Distinct SiteID (home) vs DestSiteID (routing) so a source/dest mix-up in subject
31+
// routing would be caught — the subject must use DestSiteID.
3032
evt := model.InboxEvent{
3133
Type: "room_sync",
32-
SiteID: site,
34+
SiteID: "home-site",
3335
DestSiteID: site,
3436
Payload: []byte(`{"id":"r1"}`),
3537
Timestamp: 1700000000000,
@@ -39,8 +41,8 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
3941
require.NoError(t, err)
4042
require.NotNil(t, captured)
4143

42-
wantSubject := subject.InboxExternal(site, "room_sync")
43-
assert.Equal(t, wantSubject, captured.Subject, "subject must be chat.inbox.s1.aggregate.room_sync")
44+
wantSubject := subject.InboxExternal(evt.DestSiteID, "room_sync")
45+
assert.Equal(t, wantSubject, captured.Subject, "subject must route on DestSiteID, not SiteID")
4446

4547
var got model.InboxEvent
4648
require.NoError(t, json.Unmarshal(captured.Data, &got))

data-migration/oplog-collections-transformer/main.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,13 @@ func main() {
3737
}
3838
slog.SetDefault(slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: parseLevel(cfg.LogLevel)})))
3939

40+
// Surface an empty ALL_SITE_IDS once at startup: user statusText changes won't propagate
41+
// (publishUserStatus skips with a per-event metric). Legitimate for a rooms/subs-only partial
42+
// deployment; a misconfig otherwise. (Future: promote to a hard-fail once the modes are known.)
43+
if !hasDestinationSite(cfg.AllSiteIDs) {
44+
slog.Warn("ALL_SITE_IDS is empty — user status fan-out is disabled (intentional only for a partial deployment)")
45+
}
46+
4047
ctx := context.Background()
4148

4249
tracerShutdown, err := otelutil.InitTracer(ctx, "oplog-collections-transformer")
@@ -252,6 +259,16 @@ func processOne(ctx context.Context, h *handler, m jetstream.Msg, mtr *metrics,
252259

253260
func nowMs() int64 { return time.Now().UTC().UnixMilli() }
254261

262+
// hasDestinationSite reports whether sites has at least one non-empty entry (a real fan-out target).
263+
func hasDestinationSite(sites []string) bool {
264+
for _, s := range sites {
265+
if s != "" {
266+
return true
267+
}
268+
}
269+
return false
270+
}
271+
255272
// streamWaitTimeout bounds how long startup waits for the connector to bootstrap MIGRATION_OPLOG.
256273
const streamWaitTimeout = 60 * time.Second
257274

data-migration/oplog-collections-transformer/metrics_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func TestMetrics_NilSafe(t *testing.T) {
4343
// TestMetrics_DispositionCountersCarryCollection verifies the disposition counters are labelled
4444
// by both op and collection, so ops can see which source collection is stuck/poisoning.
4545
func TestMetrics_DispositionCountersCarryCollection(t *testing.T) {
46+
// Restore the global meter provider so this test doesn't leak its manual reader into siblings.
47+
prev := otel.GetMeterProvider()
48+
t.Cleanup(func() { otel.SetMeterProvider(prev) })
4649
reader := sdkmetric.NewManualReader()
4750
otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)))
4851

data-migration/oplog-collections-transformer/rooms.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
6969

7070
doc, skip, err := h.resolveDoc(ctx, ev)
7171
if err != nil {
72-
return err
72+
return fmt.Errorf("resolve room doc: %w", err)
7373
}
7474
if skip {
7575
h.metrics.onSkipped(ctx, ev.Op+"_skip")
@@ -120,18 +120,18 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
120120

121121
evts, err := h.roomEvents(ev, &room)
122122
if err != nil {
123-
return err
123+
return fmt.Errorf("build room events: %w", err)
124124
}
125125
for _, evt := range evts {
126126
if err := h.pub.Publish(ctx, evt); err != nil {
127-
return err
127+
return fmt.Errorf("publish room event %q: %w", evt.Type, err)
128128
}
129129
}
130130
return nil
131131
}
132132

133-
// roomEvents builds the InboxEvents for a room change: always room_sync, preceded by
134-
// room_renamed (name/fname changed) or room_restricted (ro changed) on the matching update.
133+
// roomEvents builds the InboxEvents for a room change: always room_sync, preceded by room_renamed
134+
// (name/fname changed) and/or room_restricted (ro changed) — both when a single update changes both.
135135
//
136136
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
137137
func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEvent, error) {
@@ -146,14 +146,18 @@ func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEven
146146
}
147147
}
148148

149-
switch {
150-
case changed(desc, "name") || changed(desc, "fname"):
151-
return []model.InboxEvent{h.roomRenamedEvent(room), h.roomSyncEvent(room)}, nil
152-
case changed(desc, "ro"):
153-
return []model.InboxEvent{h.roomRestrictedEvent(room), h.roomSyncEvent(room)}, nil
154-
default:
155-
return []model.InboxEvent{h.roomSyncEvent(room)}, nil
149+
// A single update delta can change name/fname AND ro together — emit every matching event,
150+
// not just the first, so a combined rename+restrict doesn't drop the visibility change.
151+
var evts []model.InboxEvent
152+
if changed(desc, "name") || changed(desc, "fname") {
153+
evts = append(evts, h.roomRenamedEvent(room))
154+
}
155+
if changed(desc, "ro") {
156+
evts = append(evts, h.roomRestrictedEvent(room))
156157
}
158+
// room_sync always trails so the room doc itself converges alongside the subscription-side events.
159+
evts = append(evts, h.roomSyncEvent(room))
160+
return evts, nil
157161
}
158162

159163
// changed reports whether the named field appears in the update delta (set or removed).

data-migration/oplog-collections-transformer/rooms_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,40 @@ func TestHandleRoom_UpdateOtherFieldEmitsOnlyRoomSync(t *testing.T) {
291291
require.Len(t, pub.events, 1, "unrelated field update must emit exactly one room_sync")
292292
assert.Equal(t, model.InboxEventType("room_sync"), pub.events[0].Type)
293293
}
294+
295+
func TestHandleRoom_UpdateNameAndRo_EmitsRenamedRestrictedAndSync(t *testing.T) {
296+
pub := &fakePublisher{}
297+
full := `{"_id":"r1","t":"c","fname":"New","ro":true,"uids":["u1"],"_updatedAt":{"$date":"2024-05-01T12:00:00.000Z"}}`
298+
h := newTestHandler(pub, &fakeTarget{}, &fakeLookup{doc: json.RawMessage(full)})
299+
300+
err := h.handleRoom(context.Background(), roomEv("update", "", `{"updatedFields":{"fname":"New","ro":true}}`))
301+
require.NoError(t, err)
302+
303+
require.Len(t, pub.events, 3, "a combined name+ro update must emit room_renamed, room_restricted, and room_sync")
304+
types := []model.InboxEventType{pub.events[0].Type, pub.events[1].Type, pub.events[2].Type}
305+
assert.Contains(t, types, model.InboxRoomRenamed)
306+
assert.Contains(t, types, model.InboxRoomRestricted)
307+
assert.Contains(t, types, model.InboxEventType("room_sync"))
308+
}
309+
310+
func TestHandleRoom_DegradedInsertRecoversViaSourceLookup(t *testing.T) {
311+
pub := &fakePublisher{}
312+
full := `{"_id":"r1","t":"c","fname":"Recovered","uids":["u1"]}`
313+
h := newTestHandler(pub, &fakeTarget{}, &fakeLookup{doc: json.RawMessage(full)})
314+
315+
// Degraded insert: the connector couldn't encode fullDocument (empty) but flagged Degraded.
316+
ev := oplogEvent{Op: "insert", Collection: roomsColl, EventID: "e1",
317+
DocumentKey: json.RawMessage(`{"_id":"r1"}`), Degraded: true, DegradedReason: "fullDocument encode failed"}
318+
require.NoError(t, h.handleRoom(context.Background(), ev))
319+
320+
require.Len(t, pub.events, 1)
321+
var room model.Room
322+
require.NoError(t, json.Unmarshal(pub.events[0].Payload, &room))
323+
assert.Equal(t, "Recovered", room.Name)
324+
}
325+
326+
func TestHandleRoom_NonDegradedInsertWithoutFullDocument_Poisons(t *testing.T) {
327+
h := newTestHandler(&fakePublisher{}, &fakeTarget{}, &fakeLookup{})
328+
ev := oplogEvent{Op: "insert", Collection: roomsColl, DocumentKey: json.RawMessage(`{"_id":"r1"}`)}
329+
assert.ErrorIs(t, h.handleRoom(context.Background(), ev), migration.ErrPoison)
330+
}

0 commit comments

Comments
 (0)