Skip to content

Commit 1a99639

Browse files
committed
fix(data-migration): post-review changes for the collections CDC migration
Collapses all changes made in response to review (Nitish, CodeRabbit, and a deep self-review) into one commit on top of the feature commit, so the review-response delta reads as a single diff. Subscriptions / inbox-worker (review fixes, kept): - Missing-subscription mute/favorite/read now Nak (like roles) instead of a silent no-op, so a field event that races ahead of member_added redelivers until the sub lands (extends Nitish's roles observation to the siblings). - Mute/favorite/role events stamp the source _updatedAt as their high-water mark, so a redelivered insert snapshot can't out-rank a newer update; a cleared roles set now propagates. Transformer correctness (CodeRabbit): - Degraded insert/replace recovers the doc via a source lookup instead of poisoning; a combined name+ro update emits both room_renamed and room_restricted; zero-time guards keep absent source timestamps from becoming year-0001 high-water values; required config scalars are trimmed and validated non-empty; SOURCE_DATA read timestamp = max(ls, lr); bare returns wrapped with context. Spec-conformance revert (the design spec + CDC_COVERAGE are decision-complete): - Subscription true-delete is un-actionable -> skip + metric (spec 4.0/4.3, coverage row 8). Removed the out-of-spec subscription_deleted machinery: InboxSubscriptionDeleted, SubscriptionDeletedEvent, MemberAddEvent.SubID, handleSubscriptionDeleted, DeleteSubscriptionByID, and the source-_id adoption in handleMemberAdded (back to generated UUIDv7). - Post-seed user updates are NOT propagated (spec 4.1/9, coverage row 15). Removed the user_status_updated fan-out: publishUserStatus, the statusText update path, ALL_SITE_IDS config + wiring, and the empty-sites startup warning. inbox-worker UpdateUserStatus reverted to its base last-write-wins form (drops the statusUpdatedAt guard, which only supported the fan-out). This keeps inbox-worker at the single spec-sanctioned change (handleMemberAdded skip->error) and removes the subscription_deleted membership-lane ordering hazard at its root. Build, full unit suite, lint, integration-compile, and gosec all green. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_012X9qhQT4NwmCHjdndwNtFD
1 parent fbde695 commit 1a99639

21 files changed

Lines changed: 352 additions & 541 deletions

data-migration/SOURCE_DATA.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ One row per (user, room). ✅ Unique index `{ rid:1, 'u._id':1 }`.
131131
| `ts` | date | Join time (set once, stable across re-joins) ||
132132
| `roles[]` | string[] | `owner`/`moderator`/`leader`/`user` (role-based ownership) ||
133133
| `ls` | date | Last **seen** (scrolled cursor) ||
134-
| `lr` | date | Last **read** (explicit mark) *we plan to use this* ||
134+
| `lr` | date | Last **read** (explicit mark) ||
135135
| `alert` | bool | True if **any** unread content (not just mentions) ||
136136
| `userMentions` / `groupMentions` | int | Unread `@user` / `@all`,`@here` counts ||
137137
| `tunread[]` | string[] | Parent-message ids (`tmid`) of threads with any unread ||
@@ -142,7 +142,7 @@ One row per (user, room). ✅ Unique index `{ rid:1, 'u._id':1 }`.
142142
| `name` / `fname` | string | Machine name / friendly display name ||
143143
| `federation.origin` | string (opt) | Origin site (assumed consistent with room) | ✅ ❓ |
144144

145-
Derived: "has mention" = `userMentions>0 || groupMentions>0`; "muted" = `disableNotifications`. **Open:** read timestamp = `lr` or `ls`? (we lean `lr`).
145+
Derived: "has mention" = `userMentions>0 || groupMentions>0`; "muted" = `disableNotifications`; **read timestamp (`lastSeenAt`) = `max(ls, lr)`** (resolved per design D1 — the furthest point consumed by either the scrolled cursor or the explicit mark-read).
146146

147147
## 5. `tsmc_thread_subscriptions`
148148

data-migration/oplog-collections-transformer/config.go

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
type config struct {
1313
SiteID string `env:"SITE_ID,required"`
1414

15-
// AllSiteIDs is every federated site. statusText updates fan to all of them (incl.
16-
// ours) — status is global-visibility, not home-routed like rooms/subs.
17-
AllSiteIDs []string `env:"ALL_SITE_IDS" envDefault:"" envSeparator:","`
18-
1915
NatsURL string `env:"NATS_URL,required"`
2016
NatsCredsFile string `env:"NATS_CREDS_FILE" envDefault:""`
2117

@@ -60,11 +56,22 @@ func parseConfig() (config, error) {
6056
if err != nil {
6157
return config{}, fmt.Errorf("parse config: %w", err)
6258
}
59+
// caarlos0/env `required` only rejects an unset var, not a whitespace-only one. Trim and
60+
// re-validate the required scalars too, so a value like " " fails here rather than breaking
61+
// subject building / connections later at runtime.
62+
cfg.SiteID = strings.TrimSpace(cfg.SiteID)
63+
cfg.NatsURL = strings.TrimSpace(cfg.NatsURL)
64+
cfg.SourceMongoURI = strings.TrimSpace(cfg.SourceMongoURI)
65+
cfg.TargetMongoURI = strings.TrimSpace(cfg.TargetMongoURI)
6366
cfg.RoomsCollection = strings.TrimSpace(cfg.RoomsCollection)
6467
cfg.SubscriptionsCollection = strings.TrimSpace(cfg.SubscriptionsCollection)
6568
cfg.ThreadSubsCollection = strings.TrimSpace(cfg.ThreadSubsCollection)
6669
cfg.UsersCollection = strings.TrimSpace(cfg.UsersCollection)
6770
for name, v := range map[string]string{
71+
"SITE_ID": cfg.SiteID,
72+
"NATS_URL": cfg.NatsURL,
73+
"SOURCE_MONGO_URI": cfg.SourceMongoURI,
74+
"TARGET_MONGO_URI": cfg.TargetMongoURI,
6875
"ROOMS_COLLECTION": cfg.RoomsCollection,
6976
"SUBSCRIPTIONS_COLLECTION": cfg.SubscriptionsCollection,
7077
"THREAD_SUBS_COLLECTION": cfg.ThreadSubsCollection,

data-migration/oplog-collections-transformer/config_test.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,3 +137,19 @@ func TestParseConfig_CustomCollectionNames(t *testing.T) {
137137
assert.Equal(t, "my_thread_subs", cfg.ThreadSubsCollection)
138138
assert.Equal(t, "my_users", cfg.UsersCollection)
139139
}
140+
141+
func TestParseConfig_WhitespaceSiteID_Errors(t *testing.T) {
142+
setRequiredEnv(t)
143+
t.Setenv("SITE_ID", " ")
144+
_, err := parseConfig()
145+
require.Error(t, err)
146+
assert.Contains(t, err.Error(), "SITE_ID must be non-empty")
147+
}
148+
149+
func TestParseConfig_WhitespaceSourceMongoURI_Errors(t *testing.T) {
150+
setRequiredEnv(t)
151+
t.Setenv("SOURCE_MONGO_URI", " ")
152+
_, err := parseConfig()
153+
require.Error(t, err)
154+
assert.Contains(t, err.Error(), "SOURCE_MONGO_URI must be non-empty")
155+
}

data-migration/oplog-collections-transformer/handler.go

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ type targetStore interface {
4747

4848
type handler struct {
4949
siteID string
50-
allSiteIDs []string
5150
roomsColl string
5251
subsColl string
5352
threadSubsColl string
@@ -99,31 +98,20 @@ func (h *handler) resolveDoc(ctx context.Context, ev oplogEvent) (doc []byte, sk
9998
switch ev.Op {
10099
case "insert", "replace":
101100
if len(ev.FullDocument) == 0 {
102-
// The connector always carries the doc for insert/replace — a missing one is a
103-
// contract violation that can never succeed on redelivery. Poison.
104-
return nil, false, fmt.Errorf("%w: %s without fullDocument", migration.ErrPoison, ev.Op)
101+
if !ev.Degraded {
102+
// The connector always carries the doc for insert/replace — a non-degraded missing
103+
// one is a contract violation that can never succeed on redelivery. Poison.
104+
return nil, false, fmt.Errorf("%w: %s without fullDocument", migration.ErrPoison, ev.Op)
105+
}
106+
// Degraded: the connector couldn't encode fullDocument (left nil) but still published.
107+
// Recover the live source doc by _id rather than drop it — mirrors oplog-transformer.
108+
slog.Warn("recovering degraded insert/replace via source lookup",
109+
"eventId", ev.EventID, "reason", ev.DegradedReason, "request_id", natsutil.RequestIDFromContext(ctx))
110+
return h.resolveBySourceLookup(ctx, ev)
105111
}
106112
return ev.FullDocument, false, nil
107113
case "update":
108-
id, idErr := documentKeyID(ev.DocumentKey)
109-
if idErr != nil {
110-
return nil, false, idErr
111-
}
112-
lk := h.lookups[ev.Collection]
113-
if lk == nil {
114-
// An update for a collection we don't have a source lookup for is a misconfiguration
115-
// (filter subjects and the lookups map disagree) — it can never succeed. Poison.
116-
return nil, false, fmt.Errorf("%w: no source lookup for collection %q", migration.ErrPoison, ev.Collection)
117-
}
118-
got, lookupErr := lk.FindByID(ctx, id)
119-
if lookupErr != nil {
120-
return nil, false, fmt.Errorf("lookup %q: %w", id, lookupErr)
121-
}
122-
if got == nil {
123-
// Doc vanished from source between the change event and our re-read — nothing to apply.
124-
return nil, true, nil
125-
}
126-
return got, false, nil
114+
return h.resolveBySourceLookup(ctx, ev)
127115
case "delete":
128116
// Un-actionable: only documentKey._id, and the destination doesn't key by source _id.
129117
return nil, true, nil
@@ -133,6 +121,33 @@ func (h *handler) resolveDoc(ctx context.Context, ev oplogEvent) (doc []byte, sk
133121
}
134122
}
135123

124+
// resolveBySourceLookup re-reads the full current source doc by documentKey._id — used for updates
125+
// (the connector forwards only the delta) and degraded insert/replace (fullDocument couldn't encode).
126+
// skip=true when the doc vanished from source between the event and our re-read.
127+
//
128+
//nolint:gocritic // ev passed by value to mirror resolveDoc's signature; off the hot path.
129+
func (h *handler) resolveBySourceLookup(ctx context.Context, ev oplogEvent) (doc []byte, skip bool, err error) {
130+
id, idErr := documentKeyID(ev.DocumentKey)
131+
if idErr != nil {
132+
return nil, false, idErr
133+
}
134+
lk := h.lookups[ev.Collection]
135+
if lk == nil {
136+
// No source lookup for this collection is a misconfiguration (filter subjects and the
137+
// lookups map disagree) — it can never succeed. Poison.
138+
return nil, false, fmt.Errorf("%w: no source lookup for collection %q", migration.ErrPoison, ev.Collection)
139+
}
140+
got, lookupErr := lk.FindByID(ctx, id)
141+
if lookupErr != nil {
142+
return nil, false, fmt.Errorf("lookup %q: %w", id, lookupErr)
143+
}
144+
if got == nil {
145+
// Doc vanished from source between the change event and our re-read — nothing to apply.
146+
return nil, true, nil
147+
}
148+
return got, false, nil
149+
}
150+
136151
// documentKeyID decodes documentKey → _id (the common string case). Returns migration.ErrPoison
137152
// when missing/malformed — mirrors the message transformer's documentKeyID.
138153
func documentKeyID(documentKey json.RawMessage) (string, error) {

data-migration/oplog-collections-transformer/handler_test.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ const (
7373
func newTestHandler(pub inboxPublisher, target targetStore, lookup migration.SourceLookup) *handler {
7474
return &handler{
7575
siteID: testSiteID,
76-
allSiteIDs: []string{testSiteID, "s2"},
7776
roomsColl: roomsColl,
7877
subsColl: subsColl,
7978
threadSubsColl: threadSubColl,
@@ -119,11 +118,13 @@ func TestHandle_Dispatch(t *testing.T) {
119118
assert.NotEmpty(t, pub.events)
120119
})
121120

122-
t.Run("thread-subs delete routes to handleThreadSub and skips", func(t *testing.T) {
121+
t.Run("thread-subs routes to handleThreadSub (poison is branch-specific)", func(t *testing.T) {
123122
h := newTestHandler(&fakePublisher{}, &fakeTarget{}, &fakeLookup{})
124-
err := h.handle(context.Background(), oplogEvent{Op: "delete", Collection: threadSubColl,
125-
DocumentKey: json.RawMessage(`{"_id":"ts1"}`)})
126-
assert.ErrorIs(t, err, migration.ErrSkipped)
123+
// Empty parentMessage._id poisons ONLY on the thread-sub branch; the default-skip would
124+
// return ErrSkipped, so ErrPoison proves the event actually reached handleThreadSub.
125+
err := h.handle(context.Background(), oplogEvent{Op: "insert", Collection: threadSubColl,
126+
FullDocument: json.RawMessage(`{"_id":"ts1","u":{"username":"alice"},"parentMessage":{"_id":""}}`)})
127+
assert.ErrorIs(t, err, migration.ErrPoison)
127128
})
128129

129130
t.Run("unknown collection skipped", func(t *testing.T) {
@@ -157,6 +158,7 @@ func TestHandleRoom_PublishError(t *testing.T) {
157158
err := h.handleRoom(context.Background(), roomEv("insert", doc, ""))
158159
require.Error(t, err)
159160
assert.NotErrorIs(t, err, migration.ErrSkipped)
161+
assert.NotErrorIs(t, err, migration.ErrPoison, "a transient publish failure must Nak, not poison")
160162
}
161163

162164
func TestHandleUser_UpsertError(t *testing.T) {
@@ -165,6 +167,7 @@ func TestHandleUser_UpsertError(t *testing.T) {
165167
err := h.handleUser(context.Background(), userEv("insert", `{"_id":"u1","username":"alice"}`))
166168
require.Error(t, err)
167169
assert.NotErrorIs(t, err, migration.ErrSkipped)
170+
assert.NotErrorIs(t, err, migration.ErrPoison, "a transient upsert failure must Nak, not poison")
168171
}
169172

170173
func TestResolveDoc(t *testing.T) {

data-migration/oplog-collections-transformer/inboxpublisher_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,11 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
2727
},
2828
}
2929

30+
// Distinct SiteID (home) vs DestSiteID (routing) so a source/dest mix-up in subject
31+
// routing would be caught — the subject must use DestSiteID.
3032
evt := model.InboxEvent{
3133
Type: "room_sync",
32-
SiteID: site,
34+
SiteID: "home-site",
3335
DestSiteID: site,
3436
Payload: []byte(`{"id":"r1"}`),
3537
Timestamp: 1700000000000,
@@ -39,8 +41,8 @@ func TestJetstreamPublisher_Publish(t *testing.T) {
3941
require.NoError(t, err)
4042
require.NotNil(t, captured)
4143

42-
wantSubject := subject.InboxExternal(site, "room_sync")
43-
assert.Equal(t, wantSubject, captured.Subject, "subject must be chat.inbox.s1.aggregate.room_sync")
44+
wantSubject := subject.InboxExternal(evt.DestSiteID, "room_sync")
45+
assert.Equal(t, wantSubject, captured.Subject, "subject must route on DestSiteID, not SiteID")
4446

4547
var got model.InboxEvent
4648
require.NoError(t, json.Unmarshal(captured.Data, &got))

data-migration/oplog-collections-transformer/main.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,6 @@ func main() {
131131

132132
h := &handler{
133133
siteID: cfg.SiteID,
134-
allSiteIDs: cfg.AllSiteIDs,
135134
roomsColl: cfg.RoomsCollection,
136135
subsColl: cfg.SubscriptionsCollection,
137136
threadSubsColl: cfg.ThreadSubsCollection,

data-migration/oplog-collections-transformer/metrics_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,9 @@ func TestMetrics_NilSafe(t *testing.T) {
4343
// TestMetrics_DispositionCountersCarryCollection verifies the disposition counters are labelled
4444
// by both op and collection, so ops can see which source collection is stuck/poisoning.
4545
func TestMetrics_DispositionCountersCarryCollection(t *testing.T) {
46+
// Restore the global meter provider so this test doesn't leak its manual reader into siblings.
47+
prev := otel.GetMeterProvider()
48+
t.Cleanup(func() { otel.SetMeterProvider(prev) })
4649
reader := sdkmetric.NewManualReader()
4750
otel.SetMeterProvider(sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)))
4851

data-migration/oplog-collections-transformer/rooms.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
6969

7070
doc, skip, err := h.resolveDoc(ctx, ev)
7171
if err != nil {
72-
return err
72+
return fmt.Errorf("resolve room doc: %w", err)
7373
}
7474
if skip {
7575
h.metrics.onSkipped(ctx, ev.Op+"_skip")
@@ -120,18 +120,18 @@ func (h *handler) handleRoom(ctx context.Context, ev oplogEvent) error {
120120

121121
evts, err := h.roomEvents(ev, &room)
122122
if err != nil {
123-
return err
123+
return fmt.Errorf("build room events: %w", err)
124124
}
125125
for _, evt := range evts {
126126
if err := h.pub.Publish(ctx, evt); err != nil {
127-
return err
127+
return fmt.Errorf("publish room event %q: %w", evt.Type, err)
128128
}
129129
}
130130
return nil
131131
}
132132

133-
// roomEvents builds the InboxEvents for a room change: always room_sync, preceded by
134-
// room_renamed (name/fname changed) or room_restricted (ro changed) on the matching update.
133+
// roomEvents builds the InboxEvents for a room change: always room_sync, preceded by room_renamed
134+
// (name/fname changed) and/or room_restricted (ro changed) — both when a single update changes both.
135135
//
136136
//nolint:gocritic // ev passed by value to mirror handle's signature; off the hot path.
137137
func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEvent, error) {
@@ -146,14 +146,18 @@ func (h *handler) roomEvents(ev oplogEvent, room *model.Room) ([]model.InboxEven
146146
}
147147
}
148148

149-
switch {
150-
case changed(desc, "name") || changed(desc, "fname"):
151-
return []model.InboxEvent{h.roomRenamedEvent(room), h.roomSyncEvent(room)}, nil
152-
case changed(desc, "ro"):
153-
return []model.InboxEvent{h.roomRestrictedEvent(room), h.roomSyncEvent(room)}, nil
154-
default:
155-
return []model.InboxEvent{h.roomSyncEvent(room)}, nil
149+
// A single update delta can change name/fname AND ro together — emit every matching event,
150+
// not just the first, so a combined rename+restrict doesn't drop the visibility change.
151+
var evts []model.InboxEvent
152+
if changed(desc, "name") || changed(desc, "fname") {
153+
evts = append(evts, h.roomRenamedEvent(room))
154+
}
155+
if changed(desc, "ro") {
156+
evts = append(evts, h.roomRestrictedEvent(room))
156157
}
158+
// room_sync always trails so the room doc itself converges alongside the subscription-side events.
159+
evts = append(evts, h.roomSyncEvent(room))
160+
return evts, nil
157161
}
158162

159163
// changed reports whether the named field appears in the update delta (set or removed).

data-migration/oplog-collections-transformer/rooms_test.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,3 +291,40 @@ func TestHandleRoom_UpdateOtherFieldEmitsOnlyRoomSync(t *testing.T) {
291291
require.Len(t, pub.events, 1, "unrelated field update must emit exactly one room_sync")
292292
assert.Equal(t, model.InboxEventType("room_sync"), pub.events[0].Type)
293293
}
294+
295+
func TestHandleRoom_UpdateNameAndRo_EmitsRenamedRestrictedAndSync(t *testing.T) {
296+
pub := &fakePublisher{}
297+
full := `{"_id":"r1","t":"c","fname":"New","ro":true,"uids":["u1"],"_updatedAt":{"$date":"2024-05-01T12:00:00.000Z"}}`
298+
h := newTestHandler(pub, &fakeTarget{}, &fakeLookup{doc: json.RawMessage(full)})
299+
300+
err := h.handleRoom(context.Background(), roomEv("update", "", `{"updatedFields":{"fname":"New","ro":true}}`))
301+
require.NoError(t, err)
302+
303+
require.Len(t, pub.events, 3, "a combined name+ro update must emit room_renamed, room_restricted, and room_sync")
304+
types := []model.InboxEventType{pub.events[0].Type, pub.events[1].Type, pub.events[2].Type}
305+
assert.Contains(t, types, model.InboxRoomRenamed)
306+
assert.Contains(t, types, model.InboxRoomRestricted)
307+
assert.Contains(t, types, model.InboxEventType("room_sync"))
308+
}
309+
310+
func TestHandleRoom_DegradedInsertRecoversViaSourceLookup(t *testing.T) {
311+
pub := &fakePublisher{}
312+
full := `{"_id":"r1","t":"c","fname":"Recovered","uids":["u1"]}`
313+
h := newTestHandler(pub, &fakeTarget{}, &fakeLookup{doc: json.RawMessage(full)})
314+
315+
// Degraded insert: the connector couldn't encode fullDocument (empty) but flagged Degraded.
316+
ev := oplogEvent{Op: "insert", Collection: roomsColl, EventID: "e1",
317+
DocumentKey: json.RawMessage(`{"_id":"r1"}`), Degraded: true, DegradedReason: "fullDocument encode failed"}
318+
require.NoError(t, h.handleRoom(context.Background(), ev))
319+
320+
require.Len(t, pub.events, 1)
321+
var room model.Room
322+
require.NoError(t, json.Unmarshal(pub.events[0].Payload, &room))
323+
assert.Equal(t, "Recovered", room.Name)
324+
}
325+
326+
func TestHandleRoom_NonDegradedInsertWithoutFullDocument_Poisons(t *testing.T) {
327+
h := newTestHandler(&fakePublisher{}, &fakeTarget{}, &fakeLookup{})
328+
ev := oplogEvent{Op: "insert", Collection: roomsColl, DocumentKey: json.RawMessage(`{"_id":"r1"}`)}
329+
assert.ErrorIs(t, h.handleRoom(context.Background(), ev), migration.ErrPoison)
330+
}

0 commit comments

Comments
 (0)