@@ -56,19 +56,16 @@ func (m *Mongo) PreCDC(cdcCtx context.Context, streams []types.StreamInterface)
5656 }
5757 m .cdcCursor .Store (stream .ID (), prevResumeToken )
5858 }
59+ return nil
60+ }
5961
60- // TODO:move it to stream change function
61- lastOplogTime , err := m .getClusterOpTime (cdcCtx , m .config .Database )
62+ func (m * Mongo ) StreamChanges (ctx context.Context , stream types.StreamInterface , OnMessage abstract.CDCMsgFn ) error {
63+ // lastOplogTime is the latest timestamp of any operation applied in the MongoDB cluster
64+ lastOplogTime , err := m .getClusterOpTime (ctx , m .config .Database )
6265 if err != nil {
6366 logger .Warnf ("Failed to get cluster op time: %s" , err )
6467 return err
6568 }
66- m .LastOplogTime = lastOplogTime
67-
68- return nil
69- }
70-
71- func (m * Mongo ) StreamChanges (ctx context.Context , stream types.StreamInterface , OnMessage abstract.CDCMsgFn ) error {
7269 pipeline := mongo.Pipeline {
7370 {{Key : "$match" , Value : bson.D {
7471 {Key : "operationType" , Value : bson.D {{Key : "$in" , Value : bson.A {"insert" , "update" , "delete" }}}},
@@ -92,32 +89,36 @@ func (m *Mongo) StreamChanges(ctx context.Context, stream types.StreamInterface,
9289 defer cursor .Close (ctx )
9390
9491 for {
95- if ! cursor .TryNext (ctx ) {
96- if err := cursor .Err (); err != nil {
97- return fmt .Errorf ("change stream error: %s" , err )
98- }
92+ hasNext := cursor .TryNext (ctx )
93+ if err := cursor .Err (); err != nil {
94+ return fmt .Errorf ("change stream error: %s" , err )
95+ }
9996
100- // PBRT checkpoint and termination check
101- if err := m .handleIdleCheckpoint (ctx , cursor , stream ); err != nil {
102- if errors .Is (err , ErrIdleTermination ) {
103- // graceful termination requested by helper
104- logger .Infof ("change stream %s caught up to cluster opTime; terminating gracefully" , stream .ID ())
105- return nil
106- }
97+ if hasNext {
98+ if err := m .handleChangeDoc (ctx , cursor , stream , OnMessage ); err != nil {
10799 return err
108100 }
109- // Wait before for a brief pause before the next iteration of the loop
110- time .Sleep (changeStreamRetryDelay )
111- continue
112101 }
113102
114- if err := m .handleChangeDoc (ctx , cursor , stream , OnMessage ); err != nil {
103+ // Check boundary AFTER emitting
104+ if err := m .handleStreamCatchup (ctx , cursor , stream , lastOplogTime ); err != nil {
105+ if errors .Is (err , ErrIdleTermination ) {
106+ // graceful termination requested by helper
107+ logger .Infof ("change stream %s caught up to cluster opTime; terminating gracefully" , stream .ID ())
108+ return nil
109+ }
115110 return err
116111 }
112+
113+ if ! hasNext {
114+ // Wait before for a brief pause before the next iteration of the loop
115+ // TryNext() method behaves differently when connecting through mongos vs. direct replica set connections:TryNext() is non-blocking and may return false immediately even when events exist
116+ time .Sleep (changeStreamRetryDelay )
117+ }
117118 }
118119}
119120
120- func (m * Mongo ) handleIdleCheckpoint (_ context.Context , cursor * mongo.ChangeStream , stream types.StreamInterface ) error {
121+ func (m * Mongo ) handleStreamCatchup (_ context.Context , cursor * mongo.ChangeStream , stream types.StreamInterface , lastOplogTime primitive. Timestamp ) error {
121122 finalToken := cursor .ResumeToken ()
122123 if finalToken == nil {
123124 return fmt .Errorf ("no resume token available for stream %s after TryNext" , stream .ID ())
@@ -138,7 +139,7 @@ func (m *Mongo) handleIdleCheckpoint(_ context.Context, cursor *mongo.ChangeStre
138139 }
139140
140141 // If stream is caught up -> request graceful termination
141- if ! m . LastOplogTime .After (streamOpTime ) {
142+ if ! lastOplogTime .After (streamOpTime ) {
142143 return ErrIdleTermination
143144 }
144145
0 commit comments