Skip to content

Add filter to GetMinUnfinishedWatermark query #34965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1815,8 +1815,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta

final Duration watermarkRefreshRate =
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
LOG.info("changliiu 0 - " + watermarkRefreshRate.getStandardSeconds());
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);

final InitializeDoFn initializeDoFn =
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
Expand Down Expand Up @@ -1853,7 +1853,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator);
readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);

LOG.info("changliiu 4 - v4");
impulseOut
.apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
.apply(Wait.on(dataChangeRecordsOut))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,15 @@ public ProcessContinuation run(
RestrictionTracker<TimestampRange, Timestamp> tracker,
OutputReceiver<PartitionMetadata> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {
LOG.info("changliiu DetectNewPartitionsAction 1");
System.out.println("changliiu - 1");

final Timestamp readTimestamp = tracker.currentRestriction().getFrom();
// Updates the current watermark as the min of the watermarks from all existing partitions
final Timestamp minWatermark = cache.getUnfinishedMinWatermark();

LOG.info("changliiu DetectNewPartitionsAction 2, ");
if (minWatermark != null) {
LOG.info("changliiu DetectNewPartitionsAction 3");
return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp);
} else {
return terminate(tracker);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ public ProcessContinuation run(
final String token = partition.getPartitionToken();
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
final Timestamp endTimestamp = partition.getEndTimestamp();

LOG.info("changliiu QueryChangeStreamAction 1");
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
// ReadChangeStreamPartitionDoFn#processElement is called
final PartitionMetadata updatedPartition =
Expand All @@ -175,7 +175,7 @@ public ProcessContinuation run(
// Interrupter with soft timeout to commit the work if any records have been processed.
RestrictionInterrupter<Timestamp> interrupter =
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);

LOG.info("changliiu QueryChangeStreamAction 2");
try (ChangeStreamResultSet resultSet =
changeStreamDao.changeStreamQuery(
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Asynchronously compute the earliest partition watermark and stores it in memory. The value will
Expand All @@ -39,16 +41,38 @@
public class AsyncWatermarkCache implements WatermarkCache {

private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d";

private static final Logger LOG = LoggerFactory.getLogger(AsyncWatermarkCache.class);
private static final Object MIN_WATERMARK_KEY = new Object();
private final LoadingCache<Object, Optional<Timestamp>> cache;

private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;

public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
LOG.info("changliiu AsyncWatermarkCache 1");
this.cache =
CacheBuilder.newBuilder()
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
.build(
CacheLoader.asyncReloading(
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
CacheLoader.from(
key -> {
LOG.info(
"changliiu AsyncWatermarkCache 2 - refresh cache, lastCachedMinWatermark:"
+ lastCachedMinWatermark.toString());
Timestamp unfinishedMinTimes =
dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark));
if (unfinishedMinTimes != null) {
lastCachedMinWatermark = unfinishedMinTimes;
}
LOG.info(
"changliiu AsyncWatermarkCache 3 - get min watermark, now lastCachedMinWatermark:"
+ lastCachedMinWatermark.toString());
System.out.println("changliiu AsyncWatermarkCache 3 --");
return Optional.ofNullable(unfinishedMinTimes);
}),
// CacheLoader.from(key ->
// Optional.ofNullable(dao.getUnfinishedMinWatermark())),
Executors.newSingleThreadExecutor(
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ public CacheFactory(DaoFactory daoFactory, Duration watermarkRefreshRate) {
}

public WatermarkCache getWatermarkCache() {
System.out.println("changliiu - getWatermarkCache");

return WATERMARK_CACHE.computeIfAbsent(
cacheId,
key ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;

import com.google.cloud.Timestamp;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Synchronously compute the earliest partition watermark, by delegating the call to {@link
Expand All @@ -29,12 +32,17 @@ public class NoOpWatermarkCache implements WatermarkCache {

private final PartitionMetadataDao dao;

private static final Logger LOG = LoggerFactory.getLogger(NoOpWatermarkCache.class);

public NoOpWatermarkCache(PartitionMetadataDao dao) {
LOG.info("changliiu NoOpWatermarkCache");
this.dao = dao;
}

@Override
public @Nullable Timestamp getUnfinishedMinWatermark() {
return dao.getUnfinishedMinWatermark();
return dao.getUnfinishedMinWatermark(Optional.empty());
// return dao.getUnfinishedMinWatermark();
// changliiu remove
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -62,6 +63,8 @@ public class PartitionMetadataDao {
private final DatabaseClient databaseClient;
private final Dialect dialect;

private static final Logger LOG = LoggerFactory.getLogger(PartitionMetadataDao.class);

/**
* Constructs a partition metadata dao object given the generated name of the tables.
*
Expand Down Expand Up @@ -178,47 +181,93 @@ public List<String> findAllTableIndexes() {
*
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
*/
public @Nullable Timestamp getUnfinishedMinWatermark() {
public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> since) {
// public @Nullable Timestamp getUnfinishedMinWatermark() {
Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE);
final String minWatermark = "min_watermark";
Statement statement;
if (this.isPostgres()) {
statement =
Statement.newBuilder(
"SELECT \""
"SELECT min(\""
+ COLUMN_WATERMARK
+ "\" FROM \""
+ "\") as "
+ minWatermark
+ " FROM \""
+ metadataTableName
+ "\" WHERE \""
+ COLUMN_STATE
+ "\" != $1"
+ " ORDER BY \""
+ " AND \""
+ COLUMN_WATERMARK
+ "\" ASC LIMIT 1")
+ "\" >= $2")
.bind("p1")
.to(State.FINISHED.name())
.bind("p2")
.to(sinceTimestamp)
.build();
} else {
statement =
Statement.newBuilder(
"SELECT "
"SELECT min("
+ COLUMN_WATERMARK
+ ") as "
+ minWatermark
+ " FROM "
+ metadataTableName
+ " WHERE "
+ COLUMN_STATE
+ " != @state"
+ " ORDER BY "
+ " AND "
+ COLUMN_WATERMARK
+ " ASC LIMIT 1")
+ " >= @since;")
.bind("state")
.to(State.FINISHED.name())
.bind("since")
.to(sinceTimestamp)
.build();
}
// if (this.isPostgres()) {
// statement =
// Statement.newBuilder(
// "SELECT \""
// + COLUMN_WATERMARK
// + "\" FROM \""
// + metadataTableName
// + "\" WHERE \""
// + COLUMN_STATE
// + "\" != $1"
// + " ORDER BY \""
// + COLUMN_WATERMARK
// + "\" ASC LIMIT 1")
// .bind("p1")
// .to(State.FINISHED.name())
// .build();
// } else {
// statement =
// Statement.newBuilder(
// "SELECT "
// + COLUMN_WATERMARK
// + " FROM "
// + metadataTableName
// + " WHERE "
// + COLUMN_STATE
// + " != @state"
// + " ORDER BY "
// + COLUMN_WATERMARK
// + " ASC LIMIT 1")
// .bind("state")
// .to(State.FINISHED.name())
// .build();
// }
LOG.info("changliiu unfinishedWaterMarkQuery:\n" + statement.toString());
try (ResultSet resultSet =
databaseClient
.singleUse()
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
if (resultSet.next()) {
return resultSet.getTimestamp(COLUMN_WATERMARK);
return resultSet.getTimestamp(minWatermark);
// return resultSet.getTimestamp(COLUMN_WATERMARK);
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public ProcessContinuation processElement(
RestrictionTracker<TimestampRange, com.google.cloud.Timestamp> tracker,
OutputReceiver<PartitionMetadata> receiver,
ManualWatermarkEstimator<Instant> watermarkEstimator) {

LOG.info("changliiu DetectNewPartitionsDoFn 1");
return detectNewPartitionsAction.run(tracker, receiver, watermarkEstimator);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
import org.apache.beam.sdk.transforms.DoFn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A DoFn responsible for initializing the change stream Connector. It handles the creation of the
Expand All @@ -42,6 +44,7 @@ public class InitializeDoFn extends DoFn<byte[], PartitionMetadata> implements S
// a change stream query might get stuck.
private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;

private static final Logger LOG = LoggerFactory.getLogger(InitializeDoFn.class);
private final DaoFactory daoFactory;
private final MapperFactory mapperFactory;
// The change streams query start time
Expand All @@ -62,6 +65,7 @@ public InitializeDoFn(

@ProcessElement
public void processElement(OutputReceiver<PartitionMetadata> receiver) {
LOG.info("changliiu InitializeDoFn 1");
PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
if (!partitionMetadataDao.tableExists()) {
// Creates partition metadata table and associated indexes
Expand All @@ -73,6 +77,7 @@ public void processElement(OutputReceiver<PartitionMetadata> receiver) {
.map(mapperFactory.partitionMetadataMapper()::from)
.orElseThrow(
() -> new IllegalStateException("Initial partition not found in metadata table."));
LOG.info("changliiu InitializeDoFn complete");
receiver.output(initialPartition);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,20 +459,59 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR
StatementResult.query(getPartitionsAfterStatement, getPartitionResultSet));
}

// private void mockGetWatermark(Timestamp watermark) {
// Statement watermarkStatement =
// Statement.newBuilder(
// "SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark
// ASC LIMIT 1")
// .bind("state")
// .to(State.FINISHED.name())
// .build();
// ResultSetMetadata watermarkResultSetMetadata =
// ResultSetMetadata.newBuilder()
// .setRowType(
// StructType.newBuilder()
// .addFields(
// Field.newBuilder()
// .setName("Watermark")
// .setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
// .build())
// .build())
// .build();
// ResultSet watermarkResultSet =
// ResultSet.newBuilder()
// .addRows(
// ListValue.newBuilder()
// .addValues(Value.newBuilder().setStringValue(watermark.toString()).build())
// .build())
// .setMetadata(watermarkResultSetMetadata)
// .build();
// mockSpannerService.putStatementResult(
// StatementResult.query(watermarkStatement, watermarkResultSet));
// }

private void mockGetWatermark(Timestamp watermark) {
final String minWatermark = "min_watermark";
// The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file.
Statement watermarkStatement =
Statement.newBuilder(
"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1")
"SELECT min(Watermark) as "
+ minWatermark
+ " FROM my-metadata-table "
+ "WHERE State != @state "
+ "AND Watermark >= @since;")
.bind("state")
.to(State.FINISHED.name())
.bind("since")
.to(Timestamp.MIN_VALUE)
.build();
ResultSetMetadata watermarkResultSetMetadata =
ResultSetMetadata.newBuilder()
.setRowType(
StructType.newBuilder()
.addFields(
Field.newBuilder()
.setName("Watermark")
.setName(minWatermark)
.setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
.build())
.build())
Expand Down
Loading