Skip to content

Commit 5a5f4a5

Browse files
committed
add filter to watermark query
1 parent 63193fe commit 5a5f4a5

File tree

4 files changed

+50
-17
lines changed

4 files changed

+50
-17
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
1919

2020
import com.google.cloud.Timestamp;
21-
import java.util.Optional;
2221
import java.util.concurrent.ExecutionException;
2322
import java.util.concurrent.Executors;
2423
import javax.annotation.Nullable;
@@ -40,23 +39,40 @@ public class AsyncWatermarkCache implements WatermarkCache {
4039

4140
private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d";
4241
private static final Object MIN_WATERMARK_KEY = new Object();
43-
private final LoadingCache<Object, Optional<Timestamp>> cache;
42+
private final LoadingCache<Object, Timestamp> cache;
43+
44+
// This is to cache the result of getUnfinishedMinWatermark query and filter the query in the next
45+
// run. For the initial query, the value of this cache is min timestamp. If there is no partition
46+
// in the metadata table, then this cache will not be updated. If the getUnfinishedMinWatermark
47+
// query fails or times out, then this cache will not be updated.
48+
// Note that, all the reload operations on this key are serialized due to use of the single
49+
// threaded async reloading executor.
50+
private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
4451

4552
public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
4653
this.cache =
4754
CacheBuilder.newBuilder()
4855
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
4956
.build(
5057
CacheLoader.asyncReloading(
51-
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
58+
CacheLoader.from(
59+
key -> {
60+
Timestamp unfinishedMinTimes =
61+
dao.getUnfinishedMinWatermarkFrom(lastCachedMinWatermark);
62+
if (unfinishedMinTimes != null
63+
&& lastCachedMinWatermark.compareTo(unfinishedMinTimes) < 0) {
64+
lastCachedMinWatermark = unfinishedMinTimes;
65+
}
66+
return unfinishedMinTimes;
67+
}),
5268
Executors.newSingleThreadExecutor(
5369
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
5470
}
5571

5672
@Override
5773
public @Nullable Timestamp getUnfinishedMinWatermark() {
5874
try {
59-
return cache.get(MIN_WATERMARK_KEY).orElse(null);
75+
return cache.get(MIN_WATERMARK_KEY);
6076
} catch (ExecutionException e) {
6177
throw new RuntimeException(e);
6278
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ public NoOpWatermarkCache(PartitionMetadataDao dao) {
3535

3636
@Override
3737
public @Nullable Timestamp getUnfinishedMinWatermark() {
38-
return dao.getUnfinishedMinWatermark();
38+
return dao.getUnfinishedMinWatermarkFrom(Timestamp.MIN_VALUE);
3939
}
4040
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -178,47 +178,56 @@ public List<String> findAllTableIndexes() {
178178
*
179179
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
180180
*/
181-
public @Nullable Timestamp getUnfinishedMinWatermark() {
181+
public @Nullable Timestamp getUnfinishedMinWatermarkFrom(Timestamp sinceTimestamp) {
182182
Statement statement;
183+
final String minWatermark = "min_watermark";
183184
if (this.isPostgres()) {
184185
statement =
185186
Statement.newBuilder(
186-
"SELECT \""
187+
"SELECT MIN(\""
187188
+ COLUMN_WATERMARK
188-
+ "\" FROM \""
189+
+ "\") as "
190+
+ minWatermark
191+
+ " FROM \""
189192
+ metadataTableName
190193
+ "\" WHERE \""
191194
+ COLUMN_STATE
192195
+ "\" != $1"
193-
+ " ORDER BY \""
196+
+ " AND \""
194197
+ COLUMN_WATERMARK
195-
+ "\" ASC LIMIT 1")
198+
+ "\" >= $2")
196199
.bind("p1")
197200
.to(State.FINISHED.name())
201+
.bind("p2")
202+
.to(sinceTimestamp)
198203
.build();
199204
} else {
200205
statement =
201206
Statement.newBuilder(
202-
"SELECT "
207+
"SELECT MIN("
203208
+ COLUMN_WATERMARK
209+
+ ") as "
210+
+ minWatermark
204211
+ " FROM "
205212
+ metadataTableName
206213
+ " WHERE "
207214
+ COLUMN_STATE
208215
+ " != @state"
209-
+ " ORDER BY "
216+
+ " AND "
210217
+ COLUMN_WATERMARK
211-
+ " ASC LIMIT 1")
218+
+ " >= @since;")
212219
.bind("state")
213220
.to(State.FINISHED.name())
221+
.bind("since")
222+
.to(sinceTimestamp)
214223
.build();
215224
}
216225
try (ResultSet resultSet =
217226
databaseClient
218227
.singleUse()
219-
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
228+
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermarkFrom"))) {
220229
if (resultSet.next()) {
221-
return resultSet.getTimestamp(COLUMN_WATERMARK);
230+
return resultSet.getTimestamp(minWatermark);
222231
}
223232
return null;
224233
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,19 +460,27 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR
460460
}
461461

462462
private void mockGetWatermark(Timestamp watermark) {
463+
final String minWatermark = "min_watermark";
464+
// The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file.
463465
Statement watermarkStatement =
464466
Statement.newBuilder(
465-
"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1")
467+
"SELECT MIN(Watermark) as "
468+
+ minWatermark
469+
+ " FROM my-metadata-table "
470+
+ "WHERE State != @state "
471+
+ "AND Watermark >= @since;")
466472
.bind("state")
467473
.to(State.FINISHED.name())
474+
.bind("since")
475+
.to(Timestamp.MIN_VALUE)
468476
.build();
469477
ResultSetMetadata watermarkResultSetMetadata =
470478
ResultSetMetadata.newBuilder()
471479
.setRowType(
472480
StructType.newBuilder()
473481
.addFields(
474482
Field.newBuilder()
475-
.setName("Watermark")
483+
.setName(minWatermark)
476484
.setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
477485
.build())
478486
.build())

0 commit comments

Comments
 (0)