Skip to content

Commit 351fa66

Browse files
authored
[Dataflow Streaming] do not block other streams from health checking if a stream is blocked (#34283)
1 parent 3132bff commit 351fa66

File tree

7 files changed

+175
-57
lines changed

7 files changed

+175
-57
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java

+35-9
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Executors;
2727
import java.util.concurrent.RejectedExecutionException;
2828
import java.util.concurrent.TimeUnit;
29+
import java.util.concurrent.atomic.AtomicBoolean;
2930
import java.util.function.Function;
3031
import javax.annotation.concurrent.GuardedBy;
3132
import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.observers.StreamObserverCancelledException;
@@ -83,6 +84,7 @@ public abstract class AbstractWindmillStream<RequestT, ResponseT> implements Win
8384
private final String backendWorkerToken;
8485
private final ResettableThrowingStreamObserver<RequestT> requestObserver;
8586
private final StreamDebugMetrics debugMetrics;
87+
private final AtomicBoolean isHealthCheckScheduled;
8688

8789
@GuardedBy("this")
8890
protected boolean clientClosed;
@@ -115,6 +117,7 @@ protected AbstractWindmillStream(
115117
this.clientClosed = false;
116118
this.isShutdown = false;
117119
this.started = false;
120+
this.isHealthCheckScheduled = new AtomicBoolean(false);
118121
this.finishLatch = new CountDownLatch(1);
119122
this.logger = logger;
120123
this.requestObserver =
@@ -236,13 +239,35 @@ protected final void executeSafely(Runnable runnable) {
236239
}
237240
}
238241

239-
public final synchronized void maybeSendHealthCheck(Instant lastSendThreshold) {
240-
if (!clientClosed && debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
241-
try {
242-
sendHealthCheck();
243-
} catch (Exception e) {
244-
logger.debug("Received exception sending health check.", e);
245-
}
242+
/**
243+
* Schedule an application level keep-alive health check to be sent on the stream.
244+
*
245+
* @implNote This is sent asynchronously via an executor to minimize blocking. Messages are sent
246+
* serially. If we recently sent a message before we attempt to schedule the health check, the
247+
* stream has been restarted/closed, there is a scheduled health check that hasn't completed
248+
* or there was a more recent send by the time we enter the synchronized block, we skip the
249+
* attempt to send the health check.
250+
*/
251+
public final void maybeScheduleHealthCheck(Instant lastSendThreshold) {
252+
if (debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()
253+
&& isHealthCheckScheduled.compareAndSet(false, true)) {
254+
// Don't block other streams when sending health check.
255+
executeSafely(
256+
() -> {
257+
synchronized (this) {
258+
try {
259+
if (!clientClosed
260+
&& debugMetrics.getLastSendTimeMs() < lastSendThreshold.getMillis()) {
261+
sendHealthCheck();
262+
}
263+
} catch (Exception e) {
264+
logger.debug("Received exception sending health check.", e);
265+
} finally {
266+
// Ready to send another health check after we attempt the scheduled health check.
267+
isHealthCheckScheduled.set(false);
268+
}
269+
}
270+
});
246271
}
247272
}
248273

@@ -261,11 +286,12 @@ public final void appendSummaryHtml(PrintWriter writer) {
261286
.ifPresent(
262287
metrics ->
263288
writer.format(
264-
", %d restarts, last restart reason [ %s ] at [%s], %d errors",
289+
", %d restarts, last restart reason [ %s ] at [%s], %d errors, isHealthCheckScheduled=[%s]",
265290
metrics.restartCount(),
266291
metrics.lastRestartReason(),
267292
metrics.lastRestartTime().orElse(null),
268-
metrics.errorCount()));
293+
metrics.errorCount(),
294+
isHealthCheckScheduled.get()));
269295

270296
if (summaryMetrics.isClientClosed()) {
271297
writer.write(", client closed");

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ protected boolean hasPendingRequests() {
150150
}
151151

152152
@Override
153-
public void sendHealthCheck() throws WindmillStreamShutdownException {
153+
protected void sendHealthCheck() throws WindmillStreamShutdownException {
154154
if (hasPendingRequests()) {
155155
StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder();
156156
builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -238,7 +238,7 @@ public void appendSpecificHtml(PrintWriter writer) {
238238
}
239239

240240
@Override
241-
public void sendHealthCheck() throws WindmillStreamShutdownException {
241+
protected void sendHealthCheck() throws WindmillStreamShutdownException {
242242
trySend(HEALTH_CHECK_REQUEST);
243243
}
244244

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ public void onHeartbeatResponse(List<Windmill.ComputationHeartbeatResponse> resp
296296
}
297297

298298
@Override
299-
public void sendHealthCheck() throws WindmillStreamShutdownException {
299+
protected void sendHealthCheck() throws WindmillStreamShutdownException {
300300
if (hasPendingRequests()) {
301301
trySend(HEALTH_CHECK_REQUEST);
302302
}

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void appendSpecificHtml(PrintWriter writer) {
166166
}
167167

168168
@Override
169-
public void sendHealthCheck() throws WindmillStreamShutdownException {
169+
protected void sendHealthCheck() throws WindmillStreamShutdownException {
170170
trySend(HEALTH_CHECK);
171171
}
172172

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillStreamFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ public void run() {
155155
Instant reportThreshold =
156156
Instant.now().minus(Duration.millis(healthCheckIntervalMillis));
157157
for (AbstractWindmillStream<?, ?> stream : streamFactory.streamRegistry) {
158-
stream.maybeSendHealthCheck(reportThreshold);
158+
stream.maybeScheduleHealthCheck(reportThreshold);
159159
}
160160
}
161161
},

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java

+135-43
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.PrintWriter;
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.CountDownLatch;
2726
import java.util.concurrent.ExecutionException;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Executors;
@@ -36,10 +35,13 @@
3635
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.CallStreamObserver;
3736
import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver;
3837
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Uninterruptibles;
38+
import org.joda.time.Duration;
39+
import org.joda.time.Instant;
3940
import org.junit.Before;
4041
import org.junit.Test;
4142
import org.junit.runner.RunWith;
4243
import org.junit.runners.JUnit4;
44+
import org.slf4j.Logger;
4345
import org.slf4j.LoggerFactory;
4446

4547
@RunWith(JUnit4.class)
@@ -61,61 +63,83 @@ private TestStream newStream(
6163

6264
@Test
6365
public void testShutdown_notBlockedBySend() throws InterruptedException, ExecutionException {
64-
CountDownLatch sendBlocker = new CountDownLatch(1);
66+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.notReady();
6567
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
66-
ignored ->
67-
new CallStreamObserver<Integer>() {
68-
@Override
69-
public void onNext(Integer integer) {
70-
try {
71-
sendBlocker.await();
72-
} catch (InterruptedException e) {
73-
throw new RuntimeException(e);
74-
}
75-
}
68+
ignored -> callStreamObserver;
7669

77-
@Override
78-
public void onError(Throwable throwable) {}
70+
TestStream testStream = newStream(clientFactory);
71+
testStream.start();
72+
ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
73+
Future<WindmillStreamShutdownException> sendFuture =
74+
sendExecutor.submit(
75+
() -> {
76+
// Send a few times to trigger blocking in the CallStreamObserver.
77+
testStream.testSend();
78+
testStream.testSend();
79+
return assertThrows(WindmillStreamShutdownException.class, testStream::testSend);
80+
});
7981

80-
@Override
81-
public void onCompleted() {}
82+
// Wait for 1 send since it always goes through, the rest may buffer.
83+
callStreamObserver.waitForSends(1);
8284

83-
@Override
84-
public boolean isReady() {
85-
return false;
86-
}
85+
testStream.shutdown();
8786

88-
@Override
89-
public void setOnReadyHandler(Runnable runnable) {}
87+
assertThat(sendFuture.get()).isInstanceOf(WindmillStreamShutdownException.class);
88+
}
9089

91-
@Override
92-
public void disableAutoInboundFlowControl() {}
90+
@Test
91+
public void testMaybeScheduleHealthCheck() {
92+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.create();
93+
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
94+
ignored -> callStreamObserver;
9395

94-
@Override
95-
public void request(int i) {}
96+
TestStream testStream = newStream(clientFactory);
97+
testStream.start();
98+
Instant reportingThreshold = Instant.now().minus(Duration.millis(1));
9699

97-
@Override
98-
public void setMessageCompression(boolean b) {}
99-
};
100+
testStream.maybeScheduleHealthCheck(reportingThreshold);
101+
testStream.waitForHealthChecks(1);
102+
assertThat(testStream.numHealthChecks.get()).isEqualTo(1);
103+
testStream.shutdown();
104+
}
105+
106+
@Test
107+
public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() {
108+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.create();
109+
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
110+
ignored -> callStreamObserver;
100111

101112
TestStream testStream = newStream(clientFactory);
102113
testStream.start();
103-
ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
104-
Future<WindmillStreamShutdownException> sendFuture =
105-
sendExecutor.submit(
106-
() ->
107-
assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1)));
108-
testStream.shutdown();
109114

110-
// Sleep a bit to give sendExecutor time to execute the send().
115+
try {
116+
testStream.trySend(1);
117+
} catch (WindmillStreamShutdownException e) {
118+
throw new RuntimeException(e);
119+
}
120+
121+
// Set a really long reporting threshold.
122+
Instant reportingThreshold = Instant.now().minus(Duration.standardHours(1));
123+
124+
// Should not send health checks since we just sent the above message.
125+
testStream.maybeScheduleHealthCheck(reportingThreshold);
126+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
127+
testStream.maybeScheduleHealthCheck(reportingThreshold);
111128
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
112129

113-
sendBlocker.countDown();
114-
assertThat(sendFuture.get()).isInstanceOf(WindmillStreamShutdownException.class);
130+
callStreamObserver.waitForSends(1);
131+
// Sleep just to ensure an async health check doesn't show up
132+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
133+
134+
assertThat(testStream.numHealthChecks.get()).isEqualTo(0);
135+
testStream.shutdown();
115136
}
116137

117138
private static class TestStream extends AbstractWindmillStream<Integer, Integer> {
139+
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
140+
118141
private final AtomicInteger numStarts = new AtomicInteger();
142+
private final AtomicInteger numHealthChecks = new AtomicInteger();
119143

120144
private TestStream(
121145
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory,
@@ -148,19 +172,87 @@ protected boolean hasPendingRequests() {
148172
@Override
149173
protected void startThrottleTimer() {}
150174

151-
public void testSend(Integer i)
152-
throws ResettableThrowingStreamObserver.StreamClosedException,
153-
WindmillStreamShutdownException {
154-
trySend(i);
175+
private void testSend() throws WindmillStreamShutdownException {
176+
trySend(1);
155177
}
156178

157179
@Override
158-
protected void sendHealthCheck() {}
180+
protected void sendHealthCheck() {
181+
numHealthChecks.incrementAndGet();
182+
}
183+
184+
private void waitForHealthChecks(int expectedHealthChecks) {
185+
int waitedMillis = 0;
186+
while (numHealthChecks.get() < expectedHealthChecks) {
187+
LOG.info(
188+
"Waited for {}ms for {} health checks. Current health check count is {}.",
189+
waitedMillis,
190+
numHealthChecks.get(),
191+
expectedHealthChecks);
192+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
193+
}
194+
}
159195

160196
@Override
161197
protected void appendSpecificHtml(PrintWriter writer) {}
162198

163199
@Override
164200
protected void shutdownInternal() {}
165201
}
202+
203+
private static class TestCallStreamObserver extends CallStreamObserver<Integer> {
204+
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
205+
private final AtomicInteger numSends = new AtomicInteger();
206+
private final boolean isReady;
207+
208+
private TestCallStreamObserver(boolean isReady) {
209+
this.isReady = isReady;
210+
}
211+
212+
private static TestCallStreamObserver create() {
213+
return new TestCallStreamObserver(true);
214+
}
215+
216+
private static TestCallStreamObserver notReady() {
217+
return new TestCallStreamObserver(false);
218+
}
219+
220+
@Override
221+
public void onNext(Integer integer) {
222+
numSends.incrementAndGet();
223+
}
224+
225+
private void waitForSends(int expectedSends) {
226+
int millisWaited = 0;
227+
while (numSends.get() < expectedSends) {
228+
LOG.info(
229+
"Waited {}ms for {} sends, current sends: {}", millisWaited, expectedSends, numSends);
230+
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
231+
millisWaited += 100;
232+
}
233+
}
234+
235+
@Override
236+
public void onError(Throwable throwable) {}
237+
238+
@Override
239+
public void onCompleted() {}
240+
241+
@Override
242+
public boolean isReady() {
243+
return isReady;
244+
}
245+
246+
@Override
247+
public void setOnReadyHandler(Runnable runnable) {}
248+
249+
@Override
250+
public void disableAutoInboundFlowControl() {}
251+
252+
@Override
253+
public void request(int i) {}
254+
255+
@Override
256+
public void setMessageCompression(boolean b) {}
257+
}
166258
}

0 commit comments

Comments
 (0)