Skip to content

Commit ab976df

Browse files
committed
add filter to watermark query
1 parent 6e0fc5a commit ab976df

File tree

4 files changed

+43
-13
lines changed

4 files changed

+43
-13
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,13 +42,23 @@ public class AsyncWatermarkCache implements WatermarkCache {
4242
private static final Object MIN_WATERMARK_KEY = new Object();
4343
private final LoadingCache<Object, Optional<Timestamp>> cache;
4444

45+
private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
46+
4547
public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
4648
this.cache =
4749
CacheBuilder.newBuilder()
4850
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
4951
.build(
5052
CacheLoader.asyncReloading(
51-
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
53+
CacheLoader.from(
54+
key -> {
55+
Timestamp unfinishedMinTimes =
56+
dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark));
57+
if (unfinishedMinTimes != null) {
58+
lastCachedMinWatermark = unfinishedMinTimes;
59+
}
60+
return Optional.ofNullable(unfinishedMinTimes);
61+
}),
5262
Executors.newSingleThreadExecutor(
5363
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
5464
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
1919

2020
import com.google.cloud.Timestamp;
21+
import java.util.Optional;
2122
import javax.annotation.Nullable;
2223
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
2324

@@ -35,6 +36,6 @@ public NoOpWatermarkCache(PartitionMetadataDao dao) {
3536

3637
@Override
3738
public @Nullable Timestamp getUnfinishedMinWatermark() {
38-
return dao.getUnfinishedMinWatermark();
39+
return dao.getUnfinishedMinWatermark(Optional.empty());
3940
}
4041
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java

Lines changed: 20 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.util.HashSet;
4747
import java.util.List;
4848
import java.util.Map;
49+
import java.util.Optional;
4950
import java.util.function.Function;
5051
import java.util.stream.Collectors;
5152
import javax.annotation.Nullable;
@@ -178,47 +179,57 @@ public List<String> findAllTableIndexes() {
178179
*
179180
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
180181
*/
181-
public @Nullable Timestamp getUnfinishedMinWatermark() {
182+
public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> since) {
183+
Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE);
182184
Statement statement;
185+
final String minWatermark = "min_watermark";
183186
if (this.isPostgres()) {
184187
statement =
185188
Statement.newBuilder(
186-
"SELECT \""
189+
"SELECT min(\""
187190
+ COLUMN_WATERMARK
188-
+ "\" FROM \""
191+
+ "\") as "
192+
+ minWatermark
193+
+ " FROM \""
189194
+ metadataTableName
190195
+ "\" WHERE \""
191196
+ COLUMN_STATE
192197
+ "\" != $1"
193-
+ " ORDER BY \""
198+
+ " AND \""
194199
+ COLUMN_WATERMARK
195-
+ "\" ASC LIMIT 1")
200+
+ "\" >= $2")
196201
.bind("p1")
197202
.to(State.FINISHED.name())
203+
.bind("p2")
204+
.to(sinceTimestamp)
198205
.build();
199206
} else {
200207
statement =
201208
Statement.newBuilder(
202-
"SELECT "
209+
"SELECT min("
203210
+ COLUMN_WATERMARK
211+
+ ") as "
212+
+ minWatermark
204213
+ " FROM "
205214
+ metadataTableName
206215
+ " WHERE "
207216
+ COLUMN_STATE
208217
+ " != @state"
209-
+ " ORDER BY "
218+
+ " AND "
210219
+ COLUMN_WATERMARK
211-
+ " ASC LIMIT 1")
220+
+ " >= @since;")
212221
.bind("state")
213222
.to(State.FINISHED.name())
223+
.bind("since")
224+
.to(sinceTimestamp)
214225
.build();
215226
}
216227
try (ResultSet resultSet =
217228
databaseClient
218229
.singleUse()
219230
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
220231
if (resultSet.next()) {
221-
return resultSet.getTimestamp(COLUMN_WATERMARK);
232+
return resultSet.getTimestamp(minWatermark);
222233
}
223234
return null;
224235
}

sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/SpannerChangeStreamErrorTest.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -460,19 +460,27 @@ private void mockGetPartitionsAfter(Timestamp timestamp, ResultSet getPartitionR
460460
}
461461

462462
private void mockGetWatermark(Timestamp watermark) {
463+
final String minWatermark = "min_watermark";
464+
// The query needs to sync with getUnfinishedMinWatermark() in PartitionMetadataDao file.
463465
Statement watermarkStatement =
464466
Statement.newBuilder(
465-
"SELECT Watermark FROM my-metadata-table WHERE State != @state ORDER BY Watermark ASC LIMIT 1")
467+
"SELECT min(Watermark) as "
468+
+ minWatermark
469+
+ " FROM my-metadata-table "
470+
+ "WHERE State != @state "
471+
+ "AND Watermark >= @since;")
466472
.bind("state")
467473
.to(State.FINISHED.name())
474+
.bind("since")
475+
.to(Timestamp.MIN_VALUE)
468476
.build();
469477
ResultSetMetadata watermarkResultSetMetadata =
470478
ResultSetMetadata.newBuilder()
471479
.setRowType(
472480
StructType.newBuilder()
473481
.addFields(
474482
Field.newBuilder()
475-
.setName("Watermark")
483+
.setName(minWatermark)
476484
.setType(Type.newBuilder().setCode(TypeCode.TIMESTAMP).build())
477485
.build())
478486
.build())

0 commit comments

Comments
 (0)