Skip to content

Commit c9b2db9

Browse files
committed
address PR comments, fix tests
1 parent 184223b commit c9b2db9

File tree

2 files changed

+29
-19
lines changed

2 files changed

+29
-19
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

+12-11
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ protected final void executeSafely(Runnable runnable) {
246246
* serially. If we recently sent a message before we attempt to schedule the health check, the
247247
* stream has been restarted/closed, there is a scheduled health check that hasn't completed
248248
* or there was a more recent send by the time we enter the synchronized block, we skip the
249-
* attempt to send scheduled the health check.
249+
* attempt to send the health check.
250250
*/
251251
public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
252252
if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
@@ -255,17 +255,17 @@ public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
255255
executeSafely(
256256
() -> {
257257
synchronized (this) {
258-
if (!clientClosed
259-
&& debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
260-
try {
258+
try {
259+
if (!clientClosed
260+
&& debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
261261
sendHealthCheck();
262-
} catch (Exception e) {
263-
logger.debug("Received exception sending health check.", e);
264262
}
263+
} catch (Exception e) {
264+
logger.debug("Received exception sending health check.", e);
265+
} finally {
266+
// Ready to send another health check after we attempt the scheduled health check.
267+
isHealthCheckScheduled.set(false);
265268
}
266-
267-
// Ready to send another health check after we attempt the scheduled health check.
268-
isHealthCheckScheduled.set(false);
269269
}
270270
});
271271
}
@@ -286,11 +286,12 @@ public final void appendSummaryHtml(PrintWriter writer) {
286286
.ifPresent(
287287
metrics ->
288288
writer.format(
289-
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
289+
", %d restarts, last restart reason [ %s ] at [%s], %d errors, isHealthCheckScheduled=[%s]",
290290
metrics.restartCount(),
291291
metrics.lastRestartReason(),
292292
metrics.lastRestartTime().orElse(null),
293-
metrics.errorCount()));
293+
metrics.errorCount(),
294+
isHealthCheckScheduled.get()));
294295

295296
if (summaryMetrics.isClientClosed()) {
296297
writer.write(", client closed");

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java

+17-8
Original file line numberDiff line numberDiff line change
@@ -117,9 +117,6 @@ public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold(
117117
throw new RuntimeException(e);
118118
}
119119

120-
// Sleep a bit to give sendExecutor time to execute the send().
121-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
122-
123120
// Set a really long reporting threshold.
124121
Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1));
125122

@@ -129,7 +126,8 @@ public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold(
129126
testStream.maybeScheduleHealthCheck(reportingThreshold);
130127
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
131128

132-
callStreamObserver.waitForSend();
129+
callStreamObserver.waitForSends(1);
130+
// Sleep just to ensure an async health check doesn't show up
133131
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
134132

135133
assertThat(testStream.numHealthChecks.get()).isEqualTo(0);
@@ -204,6 +202,7 @@ protected void shutdownInternal() {}
204202
private static class TestCallStreamObserver extends CallStreamObserver<Integer> {
205203
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
206204
private final CountDownLatch sendBlocker = new CountDownLatch(1);
205+
private final AtomicInteger numSends = new AtomicInteger();
207206

208207
private final boolean waitForSend;
209208

@@ -215,12 +214,12 @@ private void unblockSend() {
215214
sendBlocker.countDown();
216215
}
217216

218-
private void waitForSend() {
217+
private void waitForSendUnblocked() {
219218
try {
220219
int waitedMillis = 0;
221220
while (!sendBlocker.await(100, TimeUnit.MILLISECONDS)) {
222221
waitedMillis += 100;
223-
LOG.info("Waiting from send for {}ms", waitedMillis);
222+
LOG.info("Waiting from send to be unblocked for {}ms", waitedMillis);
224223
}
225224
} catch (InterruptedException e) {
226225
LOG.error("Interrupted waiting for send().");
@@ -230,9 +229,19 @@ private void waitForSend() {
230229
@Override
231230
public void onNext(Integer integer) {
232231
if (waitForSend) {
233-
waitForSend();
232+
waitForSendUnblocked();
234233
} else {
235-
sendBlocker.countDown();
234+
numSends.incrementAndGet();
235+
}
236+
}
237+
238+
private void waitForSends(int expectedSends) {
239+
int millisWaited = 0;
240+
while (numSends.get() < expectedSends) {
241+
LOG.info(
242+
"Waited {}ms for {} sends, current sends: {}", millisWaited, expectedSends, numSends);
243+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
244+
millisWaited += 100;
236245
}
237246
}
238247

0 commit comments

Comments
 (0)