Skip to content

Use now + 2mins as the end timestamp for change stream read API if the connector endTimestamp is omitted #34967

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1792,8 +1792,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
// interval into a closed-open in the read change stream restriction (prevents overflow)
// Uses (Timestamp.MAX - 1ns) at max for end timestamp to indicate this connector is expected
// to run forever.
final Timestamp endTimestamp =
getInclusiveEndAt().compareTo(MAX_INCLUSIVE_END_AT) > 0
? MAX_INCLUSIVE_END_AT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.action;

import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;

import com.google.cloud.Timestamp;
import com.google.cloud.spanner.ErrorCode;
import com.google.cloud.spanner.SpannerException;
Expand Down Expand Up @@ -160,7 +162,10 @@ public ProcessContinuation run(
BundleFinalizer bundleFinalizer) {
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();
final Timestamp changeStreamQueryEndTimestamp =
partition.getEndTimestamp().equals(MAX_INCLUSIVE_END_AT)
? getNextReadChangeStreamEndTimestamp()
: partition.getEndTimestamp();

// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
Expand All @@ -178,7 +183,7 @@ public ProcessContinuation run(

try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
token, startTimestamp, changeStreamQueryEndTimestamp, partition.getHeartbeatMillis())) {

metrics.incQueryCounter();
while (resultSet.next()) {
Expand Down Expand Up @@ -243,7 +248,7 @@ public ProcessContinuation run(
"[{}] query change stream is out of range for {} to {}, finishing stream.",
token,
startTimestamp,
endTimestamp,
changeStreamQueryEndTimestamp,
e);
} else {
throw e;
Expand All @@ -253,13 +258,13 @@ public ProcessContinuation run(
"[{}] query change stream had exception processing range {} to {}.",
token,
startTimestamp,
endTimestamp,
changeStreamQueryEndTimestamp,
e);
throw e;
}

LOG.debug("[{}] change stream completed successfully", token);
if (tracker.tryClaim(endTimestamp)) {
if (tracker.tryClaim(changeStreamQueryEndTimestamp)) {
LOG.debug("[{}] Finishing partition", token);
partitionMetadataDao.updateToFinished(token);
metrics.decActivePartitionReadCounter();
Expand Down Expand Up @@ -292,4 +297,12 @@ private boolean isTimestampOutOfRange(SpannerException e) {
&& e.getMessage() != null
&& e.getMessage().contains(OUT_OF_RANGE_ERROR_MESSAGE);
}

// Return (now + 2 mins) as the end timestamp for reading change streams. This is only used if
// users want to run the connector forever. This approach works because Apache beam checkpoints
// every 5s or 5MB output provided and the change stream query has deadline for 1 min.
private Timestamp getNextReadChangeStreamEndTimestamp() {
final Timestamp current = Timestamp.now();
return Timestamp.ofTimeSecondsAndNanos(current.getSeconds() + 2 * 60, current.getNanos());
}
}
Loading