17
17
*/
18
18
package org .apache .beam .sdk .io .gcp .spanner .changestreams .action ;
19
19
20
+ import static org .apache .beam .sdk .io .gcp .spanner .changestreams .ChangeStreamsConstants .MAX_INCLUSIVE_END_AT ;
21
+
20
22
import com .google .cloud .Timestamp ;
21
23
import com .google .cloud .spanner .ErrorCode ;
22
24
import com .google .cloud .spanner .SpannerException ;
@@ -160,7 +162,10 @@ public ProcessContinuation run(
160
162
BundleFinalizer bundleFinalizer ) {
161
163
final String token = partition .getPartitionToken ();
162
164
final Timestamp startTimestamp = tracker .currentRestriction ().getFrom ();
163
- final Timestamp endTimestamp = partition .getEndTimestamp ();
165
+ final Timestamp changeStreamQueryEndTimestamp =
166
+ partition .getEndTimestamp ().equals (MAX_INCLUSIVE_END_AT )
167
+ ? getNextReadChangeStreamEndTimestamp ()
168
+ : partition .getEndTimestamp ();
164
169
165
170
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
166
171
// ReadChangeStreamPartitionDoFn#processElement is called
@@ -178,7 +183,7 @@ public ProcessContinuation run(
178
183
179
184
try (ChangeStreamResultSet resultSet =
180
185
changeStreamDao .changeStreamQuery (
181
- token , startTimestamp , endTimestamp , partition .getHeartbeatMillis ())) {
186
+ token , startTimestamp , changeStreamQueryEndTimestamp , partition .getHeartbeatMillis ())) {
182
187
183
188
metrics .incQueryCounter ();
184
189
while (resultSet .next ()) {
@@ -243,7 +248,7 @@ public ProcessContinuation run(
243
248
"[{}] query change stream is out of range for {} to {}, finishing stream." ,
244
249
token ,
245
250
startTimestamp ,
246
- endTimestamp ,
251
+ changeStreamQueryEndTimestamp ,
247
252
e );
248
253
} else {
249
254
throw e ;
@@ -253,13 +258,13 @@ public ProcessContinuation run(
253
258
"[{}] query change stream had exception processing range {} to {}." ,
254
259
token ,
255
260
startTimestamp ,
256
- endTimestamp ,
261
+ changeStreamQueryEndTimestamp ,
257
262
e );
258
263
throw e ;
259
264
}
260
265
261
266
LOG .debug ("[{}] change stream completed successfully" , token );
262
- if (tracker .tryClaim (endTimestamp )) {
267
+ if (tracker .tryClaim (changeStreamQueryEndTimestamp )) {
263
268
LOG .debug ("[{}] Finishing partition" , token );
264
269
partitionMetadataDao .updateToFinished (token );
265
270
metrics .decActivePartitionReadCounter ();
@@ -292,4 +297,12 @@ private boolean isTimestampOutOfRange(SpannerException e) {
292
297
&& e .getMessage () != null
293
298
&& e .getMessage ().contains (OUT_OF_RANGE_ERROR_MESSAGE );
294
299
}
300
+
301
+ // Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
302
+ // users want to run the connector forever. This approach works because Apache beam checkpoints
303
+ // every 5s or 5MB output provided and the change stream query has deadline for 1 min.
304
+ private Timestamp getNextReadChangeStreamEndTimestamp () {
305
+ final Timestamp current = Timestamp .now ();
306
+ return Timestamp .ofTimeSecondsAndNanos (current .getSeconds () + 2 * 60 , current .getNanos ());
307
+ }
295
308
}
0 commit comments