Skip to content

Commit 5719ce8

Browse files
committed
feat(mongodb): capture full document on delete via pre-images
1 parent 07f190f commit 5719ce8

1 file changed

Lines changed: 18 additions & 9 deletions

File tree

  • drivers/mongodb/internal

drivers/mongodb/internal/cdc.go

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,12 @@ const (
2929
var ErrIdleTermination = errors.New("change stream terminated due to idle timeout")
3030

3131
type CDCDocument struct {
32-
OperationType string `json:"operationType"`
33-
FullDocument map[string]any `json:"fullDocument"`
34-
ClusterTime primitive.Timestamp `json:"clusterTime"`
35-
WallTime primitive.DateTime `json:"wallTime"`
36-
DocumentKey map[string]any `json:"documentKey"`
32+
OperationType string `json:"operationType"`
33+
FullDocument map[string]any `json:"fullDocument"`
34+
FullDocumentBeforeChange map[string]any `json:"fullDocumentBeforeChange"`
35+
ClusterTime primitive.Timestamp `json:"clusterTime"`
36+
WallTime primitive.DateTime `json:"wallTime"`
37+
DocumentKey map[string]any `json:"documentKey"`
3738
}
3839

3940
func (m *Mongo) ChangeStreamConfig() (bool, bool, bool) {
@@ -98,7 +99,7 @@ func (m *Mongo) StreamChanges(ctx context.Context, streamIndex int, metadataStat
9899
{Key: "operationType", Value: bson.D{{Key: "$in", Value: bson.A{"insert", "update", "delete"}}}},
99100
}}},
100101
}
101-
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetMaxAwaitTime(maxAwait)
102+
changeStreamOpts := options.ChangeStream().SetFullDocument(options.UpdateLookup).SetFullDocumentBeforeChange(options.WhenAvailable).SetMaxAwaitTime(maxAwait)
102103
collection := m.client.Database(stream.Namespace(), options.Database().SetReadConcern(readconcern.Majority())).Collection(stream.Name())
103104

104105
changeStreamOpts = changeStreamOpts.SetResumeAfter(map[string]any{cdcCursorField: prevResumeToken})
@@ -168,9 +169,17 @@ func (m *Mongo) handleChangeDoc(ctx context.Context, cursor *mongo.ChangeStream,
168169
return fmt.Errorf("error while decoding: %s", err)
169170
}
170171

171-
if record.OperationType == "delete" {
172-
// replace full document(null) with documentKey
173-
record.FullDocument = record.DocumentKey
172+
switch record.OperationType {
173+
case "delete":
174+
if record.FullDocumentBeforeChange != nil {
175+
record.FullDocument = record.FullDocumentBeforeChange
176+
} else {
177+
record.FullDocument = record.DocumentKey
178+
}
179+
case "update", "replace":
180+
if record.FullDocument == nil && record.FullDocumentBeforeChange != nil {
181+
record.FullDocument = record.FullDocumentBeforeChange
182+
}
174183
}
175184

176185
filterMongoObject(record.FullDocument)

0 commit comments

Comments
 (0)