Skip to content

Commit 7cc7fcc

Browse files
committed
debug timeout
1 parent ab976df commit 7cc7fcc

File tree

10 files changed

+151
-48
lines changed

10 files changed

+151
-48
lines changed

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1815,8 +1815,8 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
18151815

18161816
final Duration watermarkRefreshRate =
18171817
MoreObjects.firstNonNull(getWatermarkRefreshRate(), DEFAULT_WATERMARK_REFRESH_RATE);
1818+
LOG.info("changliiu 0 - " + watermarkRefreshRate.getStandardSeconds());
18181819
final CacheFactory cacheFactory = new CacheFactory(daoFactory, watermarkRefreshRate);
1819-
18201820
final InitializeDoFn initializeDoFn =
18211821
new InitializeDoFn(daoFactory, mapperFactory, startTimestamp, endTimestamp);
18221822
final DetectNewPartitionsDoFn detectNewPartitionsDoFn =
@@ -1853,7 +1853,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
18531853
final BytesThroughputEstimator<DataChangeRecord> throughputEstimator =
18541854
new BytesThroughputEstimator<>(THROUGHPUT_WINDOW_SECONDS, dataChangeRecordSizeEstimator);
18551855
readChangeStreamPartitionDoFn.setThroughputEstimator(throughputEstimator);
1856-
1856+
LOG.info("changliiu 4 - v2");
18571857
impulseOut
18581858
.apply(WithTimestamps.of(e -> GlobalWindow.INSTANCE.maxTimestamp()))
18591859
.apply(Wait.on(dataChangeRecordsOut))

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/DetectNewPartitionsAction.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,12 +99,15 @@ public ProcessContinuation run(
9999
RestrictionTracker<TimestampRange, Timestamp> tracker,
100100
OutputReceiver<PartitionMetadata> receiver,
101101
ManualWatermarkEstimator<Instant> watermarkEstimator) {
102+
LOG.info("changliiu DetectNewPartitionsAction 1");
103+
System.out.println("changliiu - 1");
102104

103105
final Timestamp readTimestamp = tracker.currentRestriction().getFrom();
104106
// Updates the current watermark as the min of the watermarks from all existing partitions
105107
final Timestamp minWatermark = cache.getUnfinishedMinWatermark();
106-
108+
LOG.info("changliiu DetectNewPartitionsAction 2, ");
107109
if (minWatermark != null) {
110+
LOG.info("changliiu DetectNewPartitionsAction 3");
108111
return processPartitions(tracker, receiver, watermarkEstimator, minWatermark, readTimestamp);
109112
} else {
110113
return terminate(tracker);

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/action/QueryChangeStreamAction.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public ProcessContinuation run(
161161
final String token = partition.getPartitionToken();
162162
final Timestamp startTimestamp = tracker.currentRestriction().getFrom();
163163
final Timestamp endTimestamp = partition.getEndTimestamp();
164-
164+
LOG.info("changliiu QueryChangeStreamAction 1");
165165
// TODO: Potentially we can avoid this fetch, by enriching the runningAt timestamp when the
166166
// ReadChangeStreamPartitionDoFn#processElement is called
167167
final PartitionMetadata updatedPartition =
@@ -175,7 +175,7 @@ public ProcessContinuation run(
175175
// Interrupter with soft timeout to commit the work if any records have been processed.
176176
RestrictionInterrupter<Timestamp> interrupter =
177177
RestrictionInterrupter.withSoftTimeout(RESTRICTION_TRACKER_TIMEOUT);
178-
178+
LOG.info("changliiu QueryChangeStreamAction 2");
179179
try (ChangeStreamResultSet resultSet =
180180
changeStreamDao.changeStreamQuery(
181181
token, startTimestamp, endTimestamp, partition.getHeartbeatMillis())) {

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/AsyncWatermarkCache.java

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
2929
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
3030
import org.joda.time.Duration;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
3133

3234
/**
3335
* Asynchronously compute the earliest partition watermark and stores it in memory. The value will
@@ -39,26 +41,40 @@
3941
public class AsyncWatermarkCache implements WatermarkCache {
4042

4143
private static final String THREAD_NAME_FORMAT = "watermark_loading_thread_%d";
44+
45+
private static final Logger LOG = LoggerFactory.getLogger(AsyncWatermarkCache.class);
4246
private static final Object MIN_WATERMARK_KEY = new Object();
4347
private final LoadingCache<Object, Optional<Timestamp>> cache;
4448

45-
private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
49+
// private Timestamp lastCachedMinWatermark = Timestamp.MIN_VALUE;
4650

4751
public AsyncWatermarkCache(PartitionMetadataDao dao, Duration refreshRate) {
52+
LOG.info("changliiu AsyncWatermarkCache 1");
4853
this.cache =
4954
CacheBuilder.newBuilder()
5055
.refreshAfterWrite(java.time.Duration.ofMillis(refreshRate.getMillis()))
5156
.build(
5257
CacheLoader.asyncReloading(
53-
CacheLoader.from(
54-
key -> {
55-
Timestamp unfinishedMinTimes =
56-
dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark));
57-
if (unfinishedMinTimes != null) {
58-
lastCachedMinWatermark = unfinishedMinTimes;
59-
}
60-
return Optional.ofNullable(unfinishedMinTimes);
61-
}),
58+
// CacheLoader.from(
59+
// key -> {
60+
// LOG.info(
61+
// "changliiu AsyncWatermarkCache 2 - refresh cache,
62+
// lastCachedMinWatermark:"
63+
// + lastCachedMinWatermark.toString());
64+
// Timestamp unfinishedMinTimes =
65+
// dao.getUnfinishedMinWatermark(Optional.of(lastCachedMinWatermark));
66+
// if (unfinishedMinTimes != null) {
67+
// lastCachedMinWatermark = unfinishedMinTimes;
68+
// }
69+
// LOG.info(
70+
// "changliiu AsyncWatermarkCache 3 - get min watermark, now
71+
// lastCachedMinWatermark:"
72+
// + lastCachedMinWatermark.toString());
73+
// System.out.println("changliiu AsyncWatermarkCache 3 --");
74+
// return Optional.ofNullable(unfinishedMinTimes);
75+
// }),
76+
CacheLoader.from(key -> Optional.ofNullable(dao.getUnfinishedMinWatermark())),
77+
// changliiu remove the above
6278
Executors.newSingleThreadExecutor(
6379
new ThreadFactoryBuilder().setNameFormat(THREAD_NAME_FORMAT).build())));
6480
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/CacheFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,8 @@ public CacheFactory(DaoFactory daoFactory, Duration watermarkRefreshRate) {
4343
}
4444

4545
public WatermarkCache getWatermarkCache() {
46+
System.out.println("changliiu - getWatermarkCache");
47+
4648
return WATERMARK_CACHE.computeIfAbsent(
4749
cacheId,
4850
key ->

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/cache/NoOpWatermarkCache.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@
1818
package org.apache.beam.sdk.io.gcp.spanner.changestreams.cache;
1919

2020
import com.google.cloud.Timestamp;
21-
import java.util.Optional;
2221
import javax.annotation.Nullable;
2322
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataDao;
23+
import org.slf4j.Logger;
24+
import org.slf4j.LoggerFactory;
25+
26+
// import java.util.Optional;
2427

2528
/**
2629
* Synchronously compute the earliest partition watermark, by delegating the call to {@link
@@ -30,12 +33,17 @@ public class NoOpWatermarkCache implements WatermarkCache {
3033

3134
private final PartitionMetadataDao dao;
3235

36+
private static final Logger LOG = LoggerFactory.getLogger(NoOpWatermarkCache.class);
37+
3338
public NoOpWatermarkCache(PartitionMetadataDao dao) {
39+
LOG.info("changliiu NoOpWatermarkCache");
3440
this.dao = dao;
3541
}
3642

3743
@Override
3844
public @Nullable Timestamp getUnfinishedMinWatermark() {
39-
return dao.getUnfinishedMinWatermark(Optional.empty());
45+
// return dao.getUnfinishedMinWatermark(Optional.empty());
46+
return dao.getUnfinishedMinWatermark();
47+
// changliiu remove
4048
}
4149
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java

Lines changed: 59 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
import java.util.HashSet;
4747
import java.util.List;
4848
import java.util.Map;
49-
import java.util.Optional;
5049
import java.util.function.Function;
5150
import java.util.stream.Collectors;
5251
import javax.annotation.Nullable;
@@ -56,13 +55,17 @@
5655
import org.slf4j.Logger;
5756
import org.slf4j.LoggerFactory;
5857

58+
// import java.util.Optional;
59+
5960
/** Data access object for the Connector metadata tables. */
6061
public class PartitionMetadataDao {
6162

6263
private final String metadataTableName;
6364
private final DatabaseClient databaseClient;
6465
private final Dialect dialect;
6566

67+
private static final Logger LOG = LoggerFactory.getLogger(PartitionMetadataDao.class);
68+
6669
/**
6770
* Constructs a partition metadata dao object given the generated name of the tables.
6871
*
@@ -179,57 +182,93 @@ public List<String> findAllTableIndexes() {
179182
*
180183
* @return the earliest partition watermark which is not in a {@link State#FINISHED} state.
181184
*/
182-
public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> since) {
183-
Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE);
185+
// public @Nullable Timestamp getUnfinishedMinWatermark(Optional<Timestamp> since) {
186+
public @Nullable Timestamp getUnfinishedMinWatermark() {
187+
// Timestamp sinceTimestamp = since.orElse(Timestamp.MIN_VALUE);
188+
// final String minWatermark = "min_watermark";
184189
Statement statement;
185-
final String minWatermark = "min_watermark";
190+
// if (this.isPostgres()) {
191+
// statement =
192+
// Statement.newBuilder(
193+
// "SELECT min(\""
194+
// + COLUMN_WATERMARK
195+
// + "\") as "
196+
// + minWatermark
197+
// + " FROM \""
198+
// + metadataTableName
199+
// + "\" WHERE \""
200+
// + COLUMN_STATE
201+
// + "\" != $1"
202+
// + " AND \""
203+
// + COLUMN_WATERMARK
204+
// + "\" >= $2")
205+
// .bind("p1")
206+
// .to(State.FINISHED.name())
207+
// .bind("p2")
208+
// .to(sinceTimestamp)
209+
// .build();
210+
// } else {
211+
// statement =
212+
// Statement.newBuilder(
213+
// "SELECT min("
214+
// + COLUMN_WATERMARK
215+
// + ") as "
216+
// + minWatermark
217+
// + " FROM "
218+
// + metadataTableName
219+
// + " WHERE "
220+
// + COLUMN_STATE
221+
// + " != @state"
222+
// + " AND "
223+
// + COLUMN_WATERMARK
224+
// + " >= @since;")
225+
// .bind("state")
226+
// .to(State.FINISHED.name())
227+
// .bind("since")
228+
// .to(sinceTimestamp)
229+
// .build();
230+
// }
186231
if (this.isPostgres()) {
187232
statement =
188233
Statement.newBuilder(
189-
"SELECT min(\""
234+
"SELECT \""
190235
+ COLUMN_WATERMARK
191-
+ "\") as "
192-
+ minWatermark
193-
+ " FROM \""
236+
+ "\" FROM \""
194237
+ metadataTableName
195238
+ "\" WHERE \""
196239
+ COLUMN_STATE
197240
+ "\" != $1"
198-
+ " AND \""
241+
+ " ORDER BY \""
199242
+ COLUMN_WATERMARK
200-
+ "\" >= $2")
243+
+ "\" ASC LIMIT 1")
201244
.bind("p1")
202245
.to(State.FINISHED.name())
203-
.bind("p2")
204-
.to(sinceTimestamp)
205246
.build();
206247
} else {
207248
statement =
208249
Statement.newBuilder(
209-
"SELECT min("
250+
"SELECT "
210251
+ COLUMN_WATERMARK
211-
+ ") as "
212-
+ minWatermark
213252
+ " FROM "
214253
+ metadataTableName
215254
+ " WHERE "
216255
+ COLUMN_STATE
217256
+ " != @state"
218-
+ " AND "
257+
+ " ORDER BY "
219258
+ COLUMN_WATERMARK
220-
+ " >= @since;")
259+
+ " ASC LIMIT 1")
221260
.bind("state")
222261
.to(State.FINISHED.name())
223-
.bind("since")
224-
.to(sinceTimestamp)
225262
.build();
226263
}
264+
LOG.info("changliiu unfinishedWaterMarkQuery:\n" + statement.toString());
227265
try (ResultSet resultSet =
228266
databaseClient
229267
.singleUse()
230268
.executeQuery(statement, Options.tag("query=getUnfinishedMinWatermark"))) {
231269
if (resultSet.next()) {
232-
return resultSet.getTimestamp(minWatermark);
270+
// return resultSet.getTimestamp(minWatermark);
271+
return resultSet.getTimestamp(COLUMN_WATERMARK);
233272
}
234273
return null;
235274
}

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/DetectNewPartitionsDoFn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ public ProcessContinuation processElement(
163163
RestrictionTracker<TimestampRange, com.google.cloud.Timestamp> tracker,
164164
OutputReceiver<PartitionMetadata> receiver,
165165
ManualWatermarkEstimator<Instant> watermarkEstimator) {
166-
166+
LOG.info("changliiu DetectNewPartitionsDoFn 1");
167167
return detectNewPartitionsAction.run(tracker, receiver, watermarkEstimator);
168168
}
169169

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata;
2727
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.PartitionMetadata.State;
2828
import org.apache.beam.sdk.transforms.DoFn;
29+
import org.slf4j.Logger;
30+
import org.slf4j.LoggerFactory;
2931

3032
/**
3133
* A DoFn responsible for initializing the change stream Connector. It handles the creation of the
@@ -42,6 +44,7 @@ public class InitializeDoFn extends DoFn<byte[], PartitionMetadata> implements S
4244
// a change stream query might get stuck.
4345
private static final long DEFAULT_HEARTBEAT_MILLIS = 2000;
4446

47+
private static final Logger LOG = LoggerFactory.getLogger(InitializeDoFn.class);
4548
private final DaoFactory daoFactory;
4649
private final MapperFactory mapperFactory;
4750
// The change streams query start time
@@ -62,6 +65,7 @@ public InitializeDoFn(
6265

6366
@ProcessElement
6467
public void processElement(OutputReceiver<PartitionMetadata> receiver) {
68+
LOG.info("changliiu InitializeDoFn 1");
6569
PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
6670
if (!partitionMetadataDao.tableExists()) {
6771
// Creates partition metadata table and associated indexes
@@ -73,6 +77,7 @@ public void processElement(OutputReceiver<PartitionMetadata> receiver) {
7377
.map(mapperFactory.partitionMetadataMapper()::from)
7478
.orElseThrow(
7579
() -> new IllegalStateException("Initial partition not found in metadata table."));
80+
LOG.info("changliiu InitializeDoFn complete");
7681
receiver.output(initialPartition);
7782
}
7883

0 commit comments

Comments
 (0)