diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 17cfdce079cf..c8bb8324214f 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -1815,8 +1815,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta final Duration watermarkRefreshRate = MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE); + LOG.info("changliiu 0 - " + watermarkRefreshRate.getStandardSeconds()); final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate); - final InitializeDoFn initializeDoFn = new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp); final DetectNewPartitionsDoFn detectNewPartitionsDoFn = @@ -1853,7 +1853,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta final BytesThroughputEstimator throughputEstimator = new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator); readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator); - + LOG.info("changliiu 4 - v4"); impulseOut .apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp())) .apply(Wait.on(dataChangeRecordsOut)) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java index 40160de7b958..a9250b92b1b9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java @@ -99,12 +99,15 @@ public ProcessContinuation run( RestrictionTracker tracker, OutputReceiver receiver, ManualWatermarkEstimator watermarkEstimator) { + LOG.info("changliiu DetectNewPartitionsAction 1"); + System.out.println("changliiu - 1"); final Timestamp readTimestamp = tracker.currentRestriction().getFrom(); // Updates the current watermark as the min of the watermarks from all existing partitions final Timestamp minWatermark = cache.getUnfinishedMinWatermark(); - + LOG.info("changliiu DetectNewPartitionsAction 2, "); if (minWatermark != null) { + LOG.info("changliiu DetectNewPartitionsAction 3"); return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp); } else { return terminate(tracker); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java index 6edbd544a37c..03e6b5b8dcc4 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java @@ -161,7 +161,7 @@ public ProcessContinuation run( final String token = partition.getPartitionToken(); final Timestamp startTimestamp = tracker.currentRestriction().getFrom(); final Timestamp endTimestamp = partition.getEndTimestamp(); - + LOG.info("changliiu QueryChangeStreamAction 1"); // TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the // ReadChangeStreamPartitionDoFn#processElement is called final PartitionMetadata updatedPartition = @@ -175,7 +175,7 @@ public ProcessContinuation run( // Interrupter with soft timeout to commit the work if any records have been processed. RestrictionInterrupter interrupter = RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT); - + LOG.info("changliiu QueryChangeStreamAction 2"); try (ChangeStreamResultSet resultSet = changeStreamDao.changeStreamQuery( token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java index 827397fe6fc8..7529b2a33e98 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java @@ -28,6 +28,8 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.joda.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Asynchronously compute the earliest partition watermark and stores it in memory. The value will @@ -39,16 +41,38 @@ public class AsyncWatermarkCache implements WatermarkCache { private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d"; + + private static final Logger LOG = LoggerFactory.getLogger(AsyncWatermarkCache.class); private static final Object MIN_WATERMARK_KEY = new Object(); private final LoadingCache> cache; + private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE; + public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) { + LOG.info("changliiu AsyncWatermarkCache 1"); this.cache = CacheBuilder.newBuilder() .refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis())) .build( CacheLoader.asyncReloading( - CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())), + CacheLoader.from( + key -> { + LOG.info( + "changliiu AsyncWatermarkCache 2 - refresh cache, lastCachedMinWatermark:" + + lastCachedMinWatermark.toString()); + Timestamp unfinishedMinTimes = + dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark)); + if (unfinishedMinTimes != null) { + lastCachedMinWatermark = unfinishedMinTimes; + } + LOG.info( + "changliiu AsyncWatermarkCache 3 - get min watermark, now lastCachedMinWatermark:" + + lastCachedMinWatermark.toString()); + System.out.println("changliiu AsyncWatermarkCache 3 --"); + return Optional.ofNullable(unfinishedMinTimes); + }), + // CacheLoader.from(key -> + // Optional.ofNullable(dao.getUnfinishedMinWatermark())), Executors.newSingleThreadExecutor( new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build()))); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java index de5ddbbf3a97..c10a73f16a20 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java @@ -43,6 +43,8 @@ public CacheFactory(DaoFactory daoFactory, Duration watermarkRefreshRate) { } public WatermarkCache getWatermarkCache() { + System.out.println("changliiu - getWatermarkCache"); + return WATERMARK_CACHE.computeIfAbsent( cacheId, key -> diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java index 17275ae89834..54cb08aa4dd6 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java @@ -18,8 +18,11 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache; import com.google.cloud.Timestamp; +import java.util.Optional; import javax.annotation.Nullable; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Synchronously compute the earliest partition watermark, by delegating the call to {@link @@ -29,12 +32,17 @@ public class NoOpWatermarkCache implements WatermarkCache { private final PartitionMetadataDao dao; + private static final Logger LOG = LoggerFactory.getLogger(NoOpWatermarkCache.class); + public NoOpWatermarkCache(PartitionMetadataDao dao) { + LOG.info("changliiu NoOpWatermarkCache"); this.dao = dao; } @Override public @Nullable Timestamp getUnfinishedMinWatermark() { - return dao.getUnfinishedMinWatermark(); + return dao.getUnfinishedMinWatermark(Optional.empty()); + // return dao.getUnfinishedMinWatermark(); + // changliiu remove } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index 654fd946663c..d0e506b204cc 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -46,6 +46,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -62,6 +63,8 @@ public class PartitionMetadataDao { private final DatabaseClient databaseClient; private final Dialect dialect; + private static final Logger LOG = LoggerFactory.getLogger(PartitionMetadataDao.class); + /** * Constructs a partition metadata dao object given the generated name of the tables. * @@ -178,47 +181,93 @@ public List findAllTableIndexes() { * * @return the earliest partition watermark which is not in a {@link State#FINISHED} state. */ - public @Nullable Timestamp getUnfinishedMinWatermark() { + public @Nullable Timestamp getUnfinishedMinWatermark(Optional since) { + // public @Nullable Timestamp getUnfinishedMinWatermark() { + Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE); + final String minWatermark = "min_watermark"; Statement statement; if (this.isPostgres()) { statement = Statement.newBuilder( - "SELECT \"" + "SELECT min(\"" + COLUMN_WATERMARK - + "\" FROM \"" + + "\") as " + + minWatermark + + " FROM \"" + metadataTableName + "\" WHERE \"" + COLUMN_STATE + "\" != $1" - + " ORDER BY \"" + + " AND \"" + COLUMN_WATERMARK - + "\" ASC LIMIT 1") + + "\" >= $2") .bind("p1") .to(State.FINISHED.name()) + .bind("p2") + .to(sinceTimestamp) .build(); } else { statement = Statement.newBuilder( - "SELECT " + "SELECT min(" + COLUMN_WATERMARK + + ") as " + + minWatermark + " FROM " + metadataTableName + " WHERE " + COLUMN_STATE + " != @state" - + " ORDER BY " + + " AND " + COLUMN_WATERMARK - + " ASC LIMIT 1") + + " >= @since;") .bind("state") .to(State.FINISHED.name()) + .bind("since") + .to(sinceTimestamp) .build(); } + // if (this.isPostgres()) { + // statement = + // Statement.newBuilder( + // "SELECT \"" + // + COLUMN_WATERMARK + // + "\" FROM \"" + // + metadataTableName + // + "\" WHERE \"" + // + COLUMN_STATE + // + "\" != $1" + // + " ORDER BY \"" + // + COLUMN_WATERMARK + // + "\" ASC LIMIT 1") + // .bind("p1") + // .to(State.FINISHED.name()) + // .build(); + // } else { + // statement = + // Statement.newBuilder( + // "SELECT " + // + COLUMN_WATERMARK + // + " FROM " + // + metadataTableName + // + " WHERE " + // + COLUMN_STATE + // + " != @state" + // + " ORDER BY " + // + COLUMN_WATERMARK + // + " ASC LIMIT 1") + // .bind("state") + // .to(State.FINISHED.name()) + // .build(); + // } + LOG.info("changliiu unfinishedWaterMarkQuery:\n" + statement.toString()); try (ResultSet resultSet = databaseClient .singleUse() .executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) { if (resultSet.next()) { - return resultSet.getTimestamp(COLUMN_WATERMARK); + return resultSet.getTimestamp(minWatermark); + // return resultSet.getTimestamp(COLUMN_WATERMARK); } return null; } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java index 841ab61e0f5d..3b2ca857a1f7 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java @@ -163,7 +163,7 @@ public ProcessContinuation processElement( RestrictionTracker tracker, OutputReceiver receiver, ManualWatermarkEstimator watermarkEstimator) { - + LOG.info("changliiu DetectNewPartitionsDoFn 1"); return detectNewPartitionsAction.run(tracker, receiver, watermarkEstimator); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 60eb96ca3387..7583480d3101 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -26,6 +26,8 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata; import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State; import org.apache.beam.sdk.transforms.DoFn; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A DoFn responsible for initializing the change stream Connector. It handles the creation of the @@ -42,6 +44,7 @@ public class InitializeDoFn extends DoFn implements S // a change stream query might get stuck. private static final long DEFAULT_HEARTBEAT_MILLIS = 2000; + private static final Logger LOG = LoggerFactory.getLogger(InitializeDoFn.class); private final DaoFactory daoFactory; private final MapperFactory mapperFactory; // The change streams query start time @@ -62,6 +65,7 @@ public InitializeDoFn( @ProcessElement public void processElement(OutputReceiver receiver) { + LOG.info("changliiu InitializeDoFn 1"); PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao(); if (!partitionMetadataDao.tableExists()) { // Creates partition metadata table and associated indexes @@ -73,6 +77,7 @@ public void processElement(OutputReceiver receiver) { .map(mapperFactory.partitionMetadataMapper()::from) .orElseThrow( () -> new IllegalStateException("Initial partition not found in metadata table.")); + LOG.info("changliiu InitializeDoFn complete"); receiver.output(initialPartition); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java index c753eb7da5f2..f349047b6405 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java @@ -459,12 +459,51 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR StatementResult.query(getPartitionsAfterStatement, getPartitionResultSet)); } + // private void mockGetWatermark(Timestamp watermark) { + // Statement watermarkStatement = + // Statement.newBuilder( + // "SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark + // ASC LIMIT 1") + // .bind("state") + // .to(State.FINISHED.name()) + // .build(); + // ResultSetMetadata watermarkResultSetMetadata = + // ResultSetMetadata.newBuilder() + // .setRowType( + // StructType.newBuilder() + // .addFields( + // Field.newBuilder() + // .setName("Watermark") + // .setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()) + // .build()) + // .build()) + // .build(); + // ResultSet watermarkResultSet = + // ResultSet.newBuilder() + // .addRows( + // ListValue.newBuilder() + // .addValues(Value.newBuilder().setStringValue(watermark.toString()).build()) + // .build()) + // .setMetadata(watermarkResultSetMetadata) + // .build(); + // mockSpannerService.putStatementResult( + // StatementResult.query(watermarkStatement, watermarkResultSet)); + // } + private void mockGetWatermark(Timestamp watermark) { + final String minWatermark = "min_watermark"; + // The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file. Statement watermarkStatement = Statement.newBuilder( - "SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1") + "SELECT min(Watermark) as " + + minWatermark + + " FROM my-metadata-table " + + "WHERE State != @state " + + "AND Watermark >= @since;") .bind("state") .to(State.FINISHED.name()) + .bind("since") + .to(Timestamp.MIN_VALUE) .build(); ResultSetMetadata watermarkResultSetMetadata = ResultSetMetadata.newBuilder() @@ -472,7 +511,7 @@ private void mockGetWatermark(Timestamp watermark) { StructType.newBuilder() .addFields( Field.newBuilder() - .setName("Watermark") + .setName(minWatermark) .setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build()) .build()) .build())