Skip to content

Commit 5f0a76a

Browse files
committed
add filter to watermark query
1 parent 63193fe commit 5f0a76a

File tree

4 files changed

+59
-17
lines changed

4 files changed

+59
-17
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,9 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
1919

2020
import com.google.cloud.Timestamp;
21-
import java.util.Optional;
2221
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.locks.Lock;
23+
import java.util.concurrent.locks.ReentrantLock;
2324
import java.util.concurrent.Executors;
2425
import javax.annotation.Nullable;
2526
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
@@ -40,23 +41,47 @@ public class AsyncWatermarkCache implements WatermarkCache {
4041

4142
private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d";
4243
private static final Object MIN_WATERMARK_KEY = new Object();
43-
private final LoadingCache<Object, Optional<Timestamp>> cache;
44+
private final LoadingCache<Object, Timestamp> cache;
45+
46+
// This is to cache the result of getUnfinishedMinWatermark query and filter the query in the next
47+
// run. For the initial query, the value of this cache is min timestamp. If there is no partition
48+
// in the metadata table, then this cache will not be updated. If the getUnfinishedMinWatermark
49+
// query fails or times out, then this cache will not be updated.
50+
// Note that, all the reload operations on this key are serialized due to use of the single
51+
// threaded async reloading executor.
52+
private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
53+
54+
private final Lock lock = new ReentrantLock();
4455

4556
public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
4657
this.cache =
4758
CacheBuilder.newBuilder()
4859
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
4960
.build(
5061
CacheLoader.asyncReloading(
51-
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
62+
CacheLoader.from(
63+
key -> {
64+
Timestamp unfinishedMinTimes =
65+
dao.getUnfinishedMinWatermarkFrom(lastCachedMinWatermark);
66+
if (unfinishedMinTimes != null
67+
&& lastCachedMinWatermark.compareTo(unfinishedMinTimes) < 0) {
68+
lock.lock();
69+
try {
70+
lastCachedMinWatermark = unfinishedMinTimes;
71+
} finally {
72+
lock.unlock();
73+
}
74+
}
75+
return unfinishedMinTimes;
76+
}),
5277
Executors.newSingleThreadExecutor(
5378
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
5479
}
5580

5681
@Override
5782
public @Nullable Timestamp getUnfinishedMinWatermark() {
5883
try {
59-
return cache.get(MIN_WATERMARK_KEY).orElse(null);
84+
return cache.get(MIN_WATERMARK_KEY);
6085
} catch (ExecutionException e) {
6186
throw new RuntimeException(e);
6287
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,6 @@ public NoOpWatermarkCache(PartitionMetadataDao dao) {
3535

3636
@Override
3737
public @Nullable Timestamp getUnfinishedMinWatermark() {
38-
return dao.getUnfinishedMinWatermark();
38+
return dao.getUnfinishedMinWatermarkFrom(Timestamp.MIN_VALUE);
3939
}
4040
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -178,47 +178,56 @@ public List<String> findAllTableIndexes() {
178178
*
179179
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
180180
*/
181-
public @Nullable Timestamp getUnfinishedMinWatermark() {
181+
public @Nullable Timestamp getUnfinishedMinWatermarkFrom(Timestamp sinceTimestamp) {
182182
Statement statement;
183+
final String minWatermark = "min_watermark";
183184
if (this.isPostgres()) {
184185
statement =
185186
Statement.newBuilder(
186-
"SELECT \""
187+
"SELECT MIN(\""
187188
+ COLUMN_WATERMARK
188-
+ "\" FROM \""
189+
+ "\") as "
190+
+ minWatermark
191+
+ " FROM \""
189192
+ metadataTableName
190193
+ "\" WHERE \""
191194
+ COLUMN_STATE
192195
+ "\" != $1"
193-
+ " ORDER BY \""
196+
+ " AND \""
194197
+ COLUMN_WATERMARK
195-
+ "\" ASC LIMIT 1")
198+
+ "\" >= $2")
196199
.bind("p1")
197200
.to(State.FINISHED.name())
201+
.bind("p2")
202+
.to(sinceTimestamp)
198203
.build();
199204
} else {
200205
statement =
201206
Statement.newBuilder(
202-
"SELECT "
207+
"SELECT MIN("
203208
+ COLUMN_WATERMARK
209+
+ ") as "
210+
+ minWatermark
204211
+ " FROM "
205212
+ metadataTableName
206213
+ " WHERE "
207214
+ COLUMN_STATE
208215
+ " != @state"
209-
+ " ORDER BY "
216+
+ " AND "
210217
+ COLUMN_WATERMARK
211-
+ " ASC LIMIT 1")
218+
+ " >= @since;")
212219
.bind("state")
213220
.to(State.FINISHED.name())
221+
.bind("since")
222+
.to(sinceTimestamp)
214223
.build();
215224
}
216225
try (ResultSet resultSet =
217226
databaseClient
218227
.singleUse()
219-
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
228+
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermarkFrom"))) {
220229
if (resultSet.next()) {
221-
return resultSet.getTimestamp(COLUMN_WATERMARK);
230+
return resultSet.getTimestamp(minWatermark);
222231
}
223232
return null;
224233
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,19 +460,27 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR
460460
}
461461

462462
private void mockGetWatermark(Timestamp watermark) {
463+
final String minWatermark = "min_watermark";
464+
// The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file.
463465
Statement watermarkStatement =
464466
Statement.newBuilder(
465-
"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1")
467+
"SELECT MIN(Watermark) as "
468+
+ minWatermark
469+
+ " FROM my-metadata-table "
470+
+ "WHERE State != @state "
471+
+ "AND Watermark >= @since;")
466472
.bind("state")
467473
.to(State.FINISHED.name())
474+
.bind("since")
475+
.to(Timestamp.MIN_VALUE)
468476
.build();
469477
ResultSetMetadata watermarkResultSetMetadata =
470478
ResultSetMetadata.newBuilder()
471479
.setRowType(
472480
StructType.newBuilder()
473481
.addFields(
474482
Field.newBuilder()
475-
.setName("Watermark")
483+
.setName(minWatermark)
476484
.setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
477485
.build())
478486
.build())

0 commit comments

Comments
 (0)