Skip to content

Commit f6b1e1f

Browse files
Merge branch 'staging' into feat/kafka-driver-change
2 parents 6dce626 + 01d807c commit f6b1e1f

1 file changed

Lines changed: 34 additions & 11 deletions

File tree

  • drivers/mongodb/internal

drivers/mongodb/internal/cdc.go

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ const (
2929
var ErrIdleTermination = errors.New("change stream terminated due to idle timeout")
3030

3131
type CDCDocument struct {
32-
OperationType string `json:"operationType"`
33-
FullDocument map[string]any `json:"fullDocument"`
34-
ClusterTime primitive.Timestamp `json:"clusterTime"`
35-
WallTime primitive.DateTime `json:"wallTime"`
36-
DocumentKey map[string]any `json:"documentKey"`
32+
OperationType string `json:"operationType"`
33+
FullDocument map[string]any `json:"fullDocument"`
34+
FullDocumentBeforeChange map[string]any `json:"fullDocumentBeforeChange"`
35+
ClusterTime primitive.Timestamp `json:"clusterTime"`
36+
WallTime primitive.DateTime `json:"wallTime"`
37+
DocumentKey map[string]any `json:"documentKey"`
3738
}
3839

3940
func (m *Mongo) ChangeStreamConfig() (bool, bool, bool) {
@@ -45,7 +46,7 @@ func (m *Mongo) PreCDC(cdcCtx context.Context, streams []types.StreamInterface)
4546
collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
4647
pipeline := mongo.Pipeline{
4748
{{Key: "$match", Value: bson.D{
48-
{Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "delete"}}}},
49+
{Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "replace", "delete"}}}},
4950
}}},
5051
}
5152

@@ -95,10 +96,10 @@ func (m *Mongo) StreamChanges(ctx context.Context, streamIndex int, metadataStat
9596
}
9697
pipeline := mongo.Pipeline{
9798
{{Key: "$match", Value: bson.D{
98-
{Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "delete"}}}},
99+
{Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "replace", "delete"}}}},
99100
}}},
100101
}
101-
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(maxAwait)
102+
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable).SetMaxAwaitTime(maxAwait)
102103
collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
103104

104105
changeStreamOpts = changeStreamOpts.SetResumeAfter(map[string]any{cdcCursorField: prevResumeToken})
@@ -168,9 +169,19 @@ func (m *Mongo) handleChangeDoc(ctx context.Context, cursor *mongo.ChangeStream,
168169
return fmt.Errorf("error while decoding: %s", err)
169170
}
170171

171-
if record.OperationType == "delete" {
172-
// replace full document(null) with documentKey
173-
record.FullDocument = record.DocumentKey
172+
record.OperationType = normalizeOperationType(record.OperationType)
173+
174+
switch record.OperationType {
175+
case "delete":
176+
if record.FullDocumentBeforeChange != nil {
177+
record.FullDocument = record.FullDocumentBeforeChange
178+
} else {
179+
record.FullDocument = record.DocumentKey
180+
}
181+
case "update":
182+
if record.FullDocument == nil && record.FullDocumentBeforeChange != nil {
183+
record.FullDocument = record.FullDocumentBeforeChange
184+
}
174185
}
175186

176187
filterMongoObject(record.FullDocument)
@@ -298,3 +309,15 @@ func GetResumeToken(cursor *mongo.ChangeStream, streamID string) (string, error)
298309

299310
return token, nil
300311
}
312+
313+
// normalizeOperationType maps MongoDB-specific operation types to the standard
314+
// set understood by the abstract CDC layer (insert, update, delete).
315+
// "replace" swaps the full document but keeps _id unchanged, so it is treated as an update.
316+
func normalizeOperationType(opType string) string {
317+
switch opType {
318+
case "replace":
319+
return "update"
320+
default:
321+
return opType
322+
}
323+
}

0 commit comments

Comments
 (0)