diff --git a/cmd/tools/integration/packages.json b/cmd/tools/integration/packages.json index ad3af530dc..2199b51685 100644 --- a/cmd/tools/integration/packages.json +++ b/cmd/tools/integration/packages.json @@ -24,7 +24,7 @@ {"path": "./internal/impl/memcached"}, {"path": "./internal/impl/mssqlserver", "timeout": "10m"}, {"path": "./internal/impl/mongodb"}, - {"path": "./internal/impl/mongodb/cdc"}, + {"path": "./internal/impl/mongodb/cdc", "timeout": "15m"}, {"path": "./internal/impl/mqtt"}, {"path": "./internal/impl/mysql"}, {"path": "./internal/impl/nanomsg"}, diff --git a/internal/impl/mongodb/cdc/input.go b/internal/impl/mongodb/cdc/input.go index 0b3f6fb38e..b569e589b0 100644 --- a/internal/impl/mongodb/cdc/input.go +++ b/internal/impl/mongodb/cdc/input.go @@ -716,6 +716,9 @@ func (m *mongoCDC) readSnapshotRange( case m.readChan <- b: case <-ctx.Done(): _ = b.ackFn(ctx, nil) + case <-m.shutsig.SoftStopChan(): + _ = b.ackFn(ctx, nil) + return context.Canceled } mb = nil } @@ -875,6 +878,17 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso if !afterOk { return fmt.Errorf("%s event did not have fullDocument", opType) } + if afterDoc == nil { + // In update_lookup mode, fullDocument is null when the document + // is deleted before the post-update lookup completes. Fall back + // to documentKey so we still emit the event. + if opType == "update" { + doc = data["documentKey"] + keyOnly = true + break + } + return fmt.Errorf("%s event had null fullDocument", opType) + } doc = afterDoc case "delete": doc = data["fullDocumentBeforeChange"] @@ -917,7 +931,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso } m.resumeTokenMu.Lock() defer m.resumeTokenMu.Unlock() - m.resumeToken = stream.ResumeToken() + m.resumeToken = *resumeToken if m.checkpointFlusher == nil { return m.checkpoint.Store(ctx, m.resumeToken) } @@ -926,6 +940,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso select { case m.readChan <- mongoBatch{mb, ackFn}: case <-ctx.Done(): + case <-m.shutsig.SoftStopChan(): } mb = nil } diff --git a/internal/impl/mongodb/cdc/integration_test.go b/internal/impl/mongodb/cdc/integration_test.go index 17cf92c0c3..80428e76db 100644 --- a/internal/impl/mongodb/cdc/integration_test.go +++ b/internal/impl/mongodb/cdc/integration_test.go @@ -53,16 +53,10 @@ func (s *streamHelper) RunAsync(t *testing.T) func() { stream := s.makeStream(t) var wg sync.WaitGroup wg.Go(func() { - require.NoError(t, stream.Run(t.Context())) - }) - return wg.Wait -} - -func (s *streamHelper) RunAsyncWithErrors(t *testing.T) func() { - stream := s.makeStream(t) - var wg sync.WaitGroup - wg.Go(func() { - require.Error(t, stream.Run(t.Context())) + err := stream.Run(t.Context()) + if err != nil && !errors.Is(err, context.Canceled) { + require.NoError(t, err) + } }) return wg.Wait } @@ -341,7 +335,11 @@ func setup(t *testing.T, template string, opts ...setupOption) (*streamHelper, * ApplyURI(uri). SetDirect(true)) require.NoError(t, err) - require.NoError(t, mongoClient.Ping(t.Context(), nil)) + // The replica set can take a moment after container readiness before it + // accepts client connections through the mapped port, so retry the ping. + require.EventuallyWithT(t, func(c *assert.CollectT) { + assert.NoError(c, mongoClient.Ping(t.Context(), nil)) + }, 60*time.Second, time.Second) for _, opt := range opts { require.NoError(t, opt(mongoClient)) } @@ -393,6 +391,9 @@ mongodb_cdc: db.UpdateOne(t, "foo", "1", bson.M{ "$set": bson.M{"foo": "hello!"}, }) + // Sleep so the update_lookup post-image fetch completes before the + // delete removes the document, avoiding a null fullDocument race. + time.Sleep(500 * time.Millisecond) db.DeleteByID(t, "foo", "1") time.Sleep(3 * time.Second) stream.StopWithin(t, 10*time.Second) @@ -489,8 +490,9 @@ read_until: snapshot_auto_bucket_sharding: `+strconv.FormatBool(autoBuckets)) db.CreateCollection(t, "foo") - // Write a million messages - for batch := range 1_000 { + // Write 100k messages — enough to exercise parallel snapshot with 8 workers + // while keeping the total package runtime under the 5-minute go test timeout. + for batch := range 100 { idRangeStart := batch * 1_000 batch := []any{} for id := range 1_000 { @@ -500,7 +502,7 @@ read_until: } stream.Run(t) expected := map[any]bool{} - for i := range 1_000_000 { + for i := range 100_000 { expected[strconv.Itoa(i+1)] = true } seen := map[any]bool{} @@ -569,12 +571,7 @@ mongodb_cdc: db.CreateCollection(t, "foo") db.InsertOne(t, "foo", bson.M{"_id": 1, "data": "hello"}) output.NackAll() - // For some reason the stream's Run doesn't exit until the context is cancelled. - // I'm not sure why that doesn't work, but for this test we can just cancel and - // let the cancellation happen after the test is done. - // - // Ideally wait would return immediately after StopNow is called... - wait := stream.RunAsyncWithErrors(t) + wait := stream.RunAsync(t) t.Cleanup(wait) time.Sleep(time.Second) stream.StopNow(t) @@ -801,12 +798,12 @@ mongodb_cdc: db.InsertOne(t, "foo", bson.M{"_id": 1, "data": "hello"}) db.InsertOne(t, "foo", bson.M{"_id": 2, "data": "hello"}) wait := stream.RunAsync(t) - time.Sleep(5 * time.Second) + require.Eventually(t, func() bool { return len(output.Messages(t)) == 2 }, 10*time.Second, 100*time.Millisecond) stream.Stop(t) wait() require.JSONEq(t, `[{"_id":1,"data":"hello"}, {"_id":2,"data":"hello"}]`, output.MessagesJSON(t)) wait = stream.RunAsync(t) - time.Sleep(5 * time.Second) + time.Sleep(2 * time.Second) stream.Stop(t) wait() require.JSONEq(t, `[{"_id":1,"data":"hello"}, {"_id":2,"data":"hello"}]`, output.MessagesJSON(t))