27
27
import org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .source .offset .ChangeStreamDescriptor ;
28
28
import org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .source .offset .ChangeStreamOffset ;
29
29
import org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .utils .MongodbRecordUtils ;
30
+ import org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .utils .MongodbUtils ;
30
31
31
32
import org .apache .kafka .common .utils .SystemTime ;
32
33
import org .apache .kafka .common .utils .Time ;
46
47
import com .mongodb .client .ChangeStreamIterable ;
47
48
import com .mongodb .client .MongoChangeStreamCursor ;
48
49
import com .mongodb .client .MongoClient ;
50
+ import com .mongodb .client .model .changestream .OperationType ;
49
51
import com .mongodb .kafka .connect .source .heartbeat .HeartbeatManager ;
50
52
import io .debezium .connector .base .ChangeEventQueue ;
51
53
import io .debezium .pipeline .DataChangeEvent ;
68
70
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .ID_FIELD ;
69
71
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .ILLEGAL_OPERATION_ERROR ;
70
72
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .NS_FIELD ;
73
+ import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .OPERATION_TYPE ;
71
74
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .SNAPSHOT_FIELD ;
72
75
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .SOURCE_FIELD ;
73
76
import static org .apache .seatunnel .connectors .seatunnel .cdc .mongodb .config .MongodbSourceOptions .TS_MS_FIELD ;
@@ -117,7 +120,23 @@ public void execute(Context context) {
117
120
this .taskRunning = true ;
118
121
try {
119
122
while (taskRunning ) {
120
- Optional <BsonDocument > next = Optional .ofNullable (changeStreamCursor .tryNext ());
123
+ Optional <BsonDocument > next ;
124
+ try {
125
+ next = Optional .ofNullable (changeStreamCursor .tryNext ());
126
+ } catch (MongoCommandException e ) {
127
+ if (MongodbUtils .checkIfChangeStreamCursorExpires (e )) {
128
+ log .warn ("Change stream cursor has expired, trying to recreate cursor" );
129
+ boolean resumeTokenExpires = MongodbUtils .checkIfResumeTokenExpires (e );
130
+ if (resumeTokenExpires ) {
131
+ log .warn (
132
+ "Resume token has expired, fallback to timestamp restart mode" );
133
+ }
134
+ changeStreamCursor = openChangeStreamCursor (descriptor , resumeTokenExpires );
135
+ next = Optional .ofNullable (changeStreamCursor .tryNext ());
136
+ } else {
137
+ throw e ;
138
+ }
139
+ }
121
140
SourceRecord changeRecord = null ;
122
141
if (!next .isPresent ()) {
123
142
long untilNext = nextUpdate - time .milliseconds ();
@@ -138,34 +157,51 @@ public void execute(Context context) {
138
157
nextUpdate = time .milliseconds () + sourceConfig .getPollAwaitTimeMillis ();
139
158
} else {
140
159
BsonDocument changeStreamDocument = next .get ();
141
- MongoNamespace namespace = getMongoNamespace (changeStreamDocument );
142
-
143
- BsonDocument resumeToken = changeStreamDocument .getDocument (ID_FIELD );
144
- BsonDocument valueDocument =
145
- normalizeChangeStreamDocument (changeStreamDocument );
146
-
147
- log .trace ("Adding {} to {}" , valueDocument , namespace .getFullName ());
148
-
149
- changeRecord =
150
- MongodbRecordUtils .buildSourceRecord (
151
- createPartitionMap (
152
- sourceConfig .getHosts (),
153
- namespace .getDatabaseName (),
154
- namespace .getCollectionName ()),
155
- createSourceOffsetMap (resumeToken , false ),
156
- namespace .getFullName (),
157
- changeStreamDocument .getDocument (ID_FIELD ),
158
- valueDocument );
160
+ OperationType operationType = getOperationType (changeStreamDocument );
161
+
162
+ switch (operationType ) {
163
+ case INSERT :
164
+ case UPDATE :
165
+ case REPLACE :
166
+ case DELETE :
167
+ MongoNamespace namespace = getMongoNamespace (changeStreamDocument );
168
+
169
+ BsonDocument resumeToken = changeStreamDocument .getDocument (ID_FIELD );
170
+ BsonDocument valueDocument =
171
+ normalizeChangeStreamDocument (changeStreamDocument );
172
+
173
+ log .trace ("Adding {} to {}" , valueDocument , namespace .getFullName ());
174
+
175
+ changeRecord =
176
+ MongodbRecordUtils .buildSourceRecord (
177
+ createPartitionMap (
178
+ sourceConfig .getHosts (),
179
+ namespace .getDatabaseName (),
180
+ namespace .getCollectionName ()),
181
+ createSourceOffsetMap (resumeToken , false ),
182
+ namespace .getFullName (),
183
+ changeStreamDocument .getDocument (ID_FIELD ),
184
+ valueDocument );
185
+ break ;
186
+ default :
187
+ // Ignore drop、drop_database、rename and other record to prevent
188
+ // documentKey from being empty.
189
+ log .info ("Ignored {} record: {}" , operationType , changeStreamDocument );
190
+ }
159
191
}
160
192
161
- if (changeRecord != null ) {
193
+ if (changeRecord != null && ! isBoundedRead () ) {
162
194
queue .enqueue (new DataChangeEvent (changeRecord ));
163
195
}
164
196
165
197
if (isBoundedRead ()) {
166
198
ChangeStreamOffset currentOffset ;
167
199
if (changeRecord != null ) {
168
200
currentOffset = new ChangeStreamOffset (getResumeToken (changeRecord ));
201
+ // The log after the high watermark won't emit.
202
+ if (currentOffset .isAtOrBefore (streamSplit .getStopOffset ())) {
203
+ queue .enqueue (new DataChangeEvent (changeRecord ));
204
+ }
169
205
} else {
170
206
// Heartbeat is not turned on or there is no update event
171
207
currentOffset = new ChangeStreamOffset (getCurrentClusterTime (mongoClient ));
@@ -215,6 +251,11 @@ public IncrementalSplit getSplit() {
215
251
216
252
private MongoChangeStreamCursor <BsonDocument > openChangeStreamCursor (
217
253
ChangeStreamDescriptor changeStreamDescriptor ) {
254
+ return openChangeStreamCursor (changeStreamDescriptor , false );
255
+ }
256
+
257
+ private MongoChangeStreamCursor <BsonDocument > openChangeStreamCursor (
258
+ ChangeStreamDescriptor changeStreamDescriptor , boolean forceTimestampStartup ) {
218
259
ChangeStreamOffset offset =
219
260
new ChangeStreamOffset (streamSplit .getStartupOffset ().getOffset ());
220
261
@@ -224,7 +265,7 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
224
265
BsonDocument resumeToken = offset .getResumeToken ();
225
266
BsonTimestamp timestamp = offset .getTimestamp ();
226
267
227
- if (resumeToken != null ) {
268
+ if (resumeToken != null && ! forceTimestampStartup ) {
228
269
if (supportsStartAfter ) {
229
270
log .info ("Open the change stream after the previous offset: {}" , resumeToken );
230
271
changeStreamIterable .startAfter (resumeToken );
@@ -238,6 +279,11 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
238
279
if (supportsStartAtOperationTime ) {
239
280
log .info ("Open the change stream at the timestamp: {}" , timestamp );
240
281
changeStreamIterable .startAtOperationTime (timestamp );
282
+ } else if (forceTimestampStartup ) {
283
+ log .error ("Open change stream failed. Unable to resume from timestamp" );
284
+ throw new MongodbConnectorException (
285
+ ILLEGAL_ARGUMENT ,
286
+ "Open change stream failed. Unable to resume from timestamp" );
241
287
} else {
242
288
log .warn ("Open the change stream of the latest offset" );
243
289
}
@@ -273,6 +319,9 @@ private MongoChangeStreamCursor<BsonDocument> openChangeStreamCursor(
273
319
"Unauthorized $changeStream operation: %s %s" ,
274
320
e .getErrorMessage (), e .getErrorCode ()));
275
321
322
+ } else if (!forceTimestampStartup && MongodbUtils .checkIfResumeTokenExpires (e )) {
323
+ log .info ("Failed to open cursor with resume token, fallback to timestamp startup" );
324
+ return openChangeStreamCursor (changeStreamDescriptor , true );
276
325
} else {
277
326
throw new MongodbConnectorException (ILLEGAL_ARGUMENT , "Open change stream failed" );
278
327
}
@@ -353,6 +402,10 @@ private MongoNamespace getMongoNamespace(@Nonnull BsonDocument changeStreamDocum
353
402
ns .getString (DB_FIELD ).getValue (), ns .getString (COLL_FIELD ).getValue ());
354
403
}
355
404
405
+ private OperationType getOperationType (BsonDocument changeStreamDocument ) {
406
+ return OperationType .fromString (changeStreamDocument .getString (OPERATION_TYPE ).getValue ());
407
+ }
408
+
356
409
private boolean isBoundedRead () {
357
410
return !NO_STOPPING_OFFSET .equals (streamSplit .getStopOffset ());
358
411
}
0 commit comments