Skip to content
Open
2 changes: 1 addition & 1 deletion cmd/tools/integration/packages.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
{"path": "./internal/impl/memcached"},
{"path": "./internal/impl/mssqlserver", "timeout": "10m"},
{"path": "./internal/impl/mongodb"},
{"path": "./internal/impl/mongodb/cdc"},
{"path": "./internal/impl/mongodb/cdc", "timeout": "15m"},
{"path": "./internal/impl/mqtt"},
{"path": "./internal/impl/mysql"},
{"path": "./internal/impl/nanomsg"},
Expand Down
17 changes: 16 additions & 1 deletion internal/impl/mongodb/cdc/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,6 +716,9 @@ func (m *mongoCDC) readSnapshotRange(
case m.readChan <- b:
case <-ctx.Done():
_ = b.ackFn(ctx, nil)
case <-m.shutsig.SoftStopChan():
_ = b.ackFn(ctx, nil)
return context.Canceled
}
mb = nil
}
Expand Down Expand Up @@ -875,6 +878,17 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
if !afterOk {
return fmt.Errorf("%s event did not have fullDocument", opType)
}
if afterDoc == nil {
// In update_lookup mode, fullDocument is null when the document
// is deleted before the post-update lookup completes. Fall back
// to documentKey so we still emit the event.
if opType == "update" {
doc = data["documentKey"]
keyOnly = true
break
}
return fmt.Errorf("%s event had null fullDocument", opType)
}
doc = afterDoc
case "delete":
doc = data["fullDocumentBeforeChange"]
Expand Down Expand Up @@ -917,7 +931,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
}
m.resumeTokenMu.Lock()
defer m.resumeTokenMu.Unlock()
m.resumeToken = stream.ResumeToken()
m.resumeToken = *resumeToken
if m.checkpointFlusher == nil {
return m.checkpoint.Store(ctx, m.resumeToken)
}
Expand All @@ -926,6 +940,7 @@ func (m *mongoCDC) readFromStream(ctx context.Context, cp *checkpoint.Capped[bso
select {
case m.readChan <- mongoBatch{mb, ackFn}:
case <-ctx.Done():
case <-m.shutsig.SoftStopChan():
}
mb = nil
}
Expand Down
41 changes: 19 additions & 22 deletions internal/impl/mongodb/cdc/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,16 +53,10 @@ func (s *streamHelper) RunAsync(t *testing.T) func() {
stream := s.makeStream(t)
var wg sync.WaitGroup
wg.Go(func() {
require.NoError(t, stream.Run(t.Context()))
})
return wg.Wait
}

func (s *streamHelper) RunAsyncWithErrors(t *testing.T) func() {
stream := s.makeStream(t)
var wg sync.WaitGroup
wg.Go(func() {
require.Error(t, stream.Run(t.Context()))
err := stream.Run(t.Context())
if err != nil && !errors.Is(err, context.Canceled) {
require.NoError(t, err)
}
})
return wg.Wait
}
Expand Down Expand Up @@ -341,7 +335,11 @@ func setup(t *testing.T, template string, opts ...setupOption) (*streamHelper, *
ApplyURI(uri).
SetDirect(true))
require.NoError(t, err)
require.NoError(t, mongoClient.Ping(t.Context(), nil))
// The replica set can take a moment after container readiness before it
// accepts client connections through the mapped port, so retry the ping.
require.EventuallyWithT(t, func(c *assert.CollectT) {
assert.NoError(c, mongoClient.Ping(t.Context(), nil))
}, 60*time.Second, time.Second)
for _, opt := range opts {
require.NoError(t, opt(mongoClient))
}
Expand Down Expand Up @@ -393,6 +391,9 @@ mongodb_cdc:
db.UpdateOne(t, "foo", "1", bson.M{
"$set": bson.M{"foo": "hello!"},
})
// Sleep so the update_lookup post-image fetch completes before the
// delete removes the document, avoiding a null fullDocument race.
time.Sleep(500 * time.Millisecond)
db.DeleteByID(t, "foo", "1")
time.Sleep(3 * time.Second)
stream.StopWithin(t, 10*time.Second)
Expand Down Expand Up @@ -489,8 +490,9 @@ read_until:
snapshot_auto_bucket_sharding: `+strconv.FormatBool(autoBuckets))

db.CreateCollection(t, "foo")
// Write a million messages
for batch := range 1_000 {
// Write 100k messages — enough to exercise parallel snapshot with 8 workers
// while keeping the total package runtime under the 5-minute go test timeout.
for batch := range 100 {
idRangeStart := batch * 1_000
batch := []any{}
for id := range 1_000 {
Expand All @@ -500,7 +502,7 @@ read_until:
}
stream.Run(t)
expected := map[any]bool{}
for i := range 1_000_000 {
for i := range 100_000 {
expected[strconv.Itoa(i+1)] = true
}
seen := map[any]bool{}
Expand Down Expand Up @@ -569,12 +571,7 @@ mongodb_cdc:
db.CreateCollection(t, "foo")
db.InsertOne(t, "foo", bson.M{"_id": 1, "data": "hello"})
output.NackAll()
// For some reason the stream's Run doesn't exit until the context is cancelled.
// I'm not sure why that doesn't work, but for this test we can just cancel and
// let the cancellation happen after the test is done.
//
// Ideally wait would return immediately after StopNow is called...
wait := stream.RunAsyncWithErrors(t)
wait := stream.RunAsync(t)
t.Cleanup(wait)
time.Sleep(time.Second)
stream.StopNow(t)
Expand Down Expand Up @@ -801,12 +798,12 @@ mongodb_cdc:
db.InsertOne(t, "foo", bson.M{"_id": 1, "data": "hello"})
db.InsertOne(t, "foo", bson.M{"_id": 2, "data": "hello"})
wait := stream.RunAsync(t)
time.Sleep(5 * time.Second)
require.Eventually(t, func() bool { return len(output.Messages(t)) == 2 }, 10*time.Second, 100*time.Millisecond)
stream.Stop(t)
wait()
require.JSONEq(t, `[{"_id":1,"data":"hello"}, {"_id":2,"data":"hello"}]`, output.MessagesJSON(t))
wait = stream.RunAsync(t)
time.Sleep(5 * time.Second)
time.Sleep(2 * time.Second)
stream.Stop(t)
wait()
require.JSONEq(t, `[{"_id":1,"data":"hello"}, {"_id":2,"data":"hello"}]`, output.MessagesJSON(t))
Expand Down
Loading