Skip to content

Commit 3a33f4d

Browse files
committed
fix testShutdown_notBlockedBySend test
1 parent c9b2db9 commit 3a33f4d

File tree

1 file changed

+24
-37
lines changed

1 file changed

+24
-37
lines changed

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStreamTest.java

+24-37
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import java.io.PrintWriter;
2424
import java.util.Set;
2525
import java.util.concurrent.ConcurrentHashMap;
26-
import java.util.concurrent.CountDownLatch;
2726
import java.util.concurrent.ExecutionException;
2827
import java.util.concurrent.ExecutorService;
2928
import java.util.concurrent.Executors;
@@ -64,7 +63,7 @@ private TestStream newStream(
6463

6564
@Test
6665
public void testShutdown_notBlockedBySend() throws InterruptedException, ExecutionException {
67-
TestCallStreamObserver callStreamObserver = new TestCallStreamObserver(/* waitForSend= */ true);
66+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.notReady();
6867
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
6968
ignored -> callStreamObserver;
7069

@@ -73,21 +72,24 @@ public void testShutdown_notBlockedBySend() throws InterruptedException, Executi
7372
ExecutorService sendExecutor = Executors.newSingleThreadExecutor();
7473
Future<WindmillStreamShutdownException> sendFuture =
7574
sendExecutor.submit(
76-
() ->
77-
assertThrows(WindmillStreamShutdownException.class, () -> testStream.testSend(1)));
78-
testStream.shutdown();
75+
() -> {
76+
// Send a few times to trigger blocking in the CallStreamObserver.
77+
testStream.testSend();
78+
testStream.testSend();
79+
return assertThrows(WindmillStreamShutdownException.class, testStream::testSend);
80+
});
81+
82+
// Wait for 1 send since it always goes through, the rest may buffer.
83+
callStreamObserver.waitForSends(1);
7984

80-
// Sleep a bit to give sendExecutor time to execute the send().
81-
Uninterruptibles.sleepUninterruptibly(100, TimeUnit.MILLISECONDS);
85+
testStream.shutdown();
8286

83-
callStreamObserver.unblockSend();
8487
assertThat(sendFuture.get()).isInstanceOf(WindmillStreamShutdownException.class);
8588
}
8689

8790
@Test
8891
public void testMaybeScheduleHealthCheck() {
89-
TestCallStreamObserver callStreamObserver =
90-
new TestCallStreamObserver(/* waitForSend= */ false);
92+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.create();
9193
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
9294
ignored -> callStreamObserver;
9395

@@ -103,8 +105,7 @@ public void testMaybeScheduleHealthCheck() {
103105

104106
@Test
105107
public void testMaybeSendHealthCheck_doesNotSendIfLastScheduleLessThanThreshold() {
106-
TestCallStreamObserver callStreamObserver =
107-
new TestCallStreamObserver(/* waitForSend= */ false);
108+
TestCallStreamObserver callStreamObserver = TestCallStreamObserver.create();
108109
Function<StreamObserver<Integer>, StreamObserver<Integer>> clientFactory =
109110
ignored -> callStreamObserver;
110111

@@ -171,8 +172,8 @@ protected boolean hasPendingRequests() {
171172
@Override
172173
protected void startThrottleTimer() {}
173174

174-
public void testSend(Integer i) throws WindmillStreamShutdownException {
175-
trySend(i);
175+
private void testSend() throws WindmillStreamShutdownException {
176+
trySend(1);
176177
}
177178

178179
@Override
@@ -201,38 +202,24 @@ protected void shutdownInternal() {}
201202

202203
private static class TestCallStreamObserver extends CallStreamObserver<Integer> {
203204
private static final Logger LOG = LoggerFactory.getLogger(AbstractWindmillStreamTest.class);
204-
private final CountDownLatch sendBlocker = new CountDownLatch(1);
205205
private final AtomicInteger numSends = new AtomicInteger();
206+
private final boolean isReady;
206207

207-
private final boolean waitForSend;
208-
209-
private TestCallStreamObserver(boolean waitForSend) {
210-
this.waitForSend = waitForSend;
208+
private TestCallStreamObserver(boolean isReady) {
209+
this.isReady = isReady;
211210
}
212211

213-
private void unblockSend() {
214-
sendBlocker.countDown();
212+
private static TestCallStreamObserver create() {
213+
return new TestCallStreamObserver(true);
215214
}
216215

217-
private void waitForSendUnblocked() {
218-
try {
219-
int waitedMillis = 0;
220-
while (!sendBlocker.await(100, TimeUnit.MILLISECONDS)) {
221-
waitedMillis += 100;
222-
LOG.info("Waiting from send to be unblocked for {}ms", waitedMillis);
223-
}
224-
} catch (InterruptedException e) {
225-
LOG.error("Interrupted waiting for send().");
226-
}
216+
private static TestCallStreamObserver notReady() {
217+
return new TestCallStreamObserver(false);
227218
}
228219

229220
@Override
230221
public void onNext(Integer integer) {
231-
if (waitForSend) {
232-
waitForSendUnblocked();
233-
} else {
234-
numSends.incrementAndGet();
235-
}
222+
numSends.incrementAndGet();
236223
}
237224

238225
private void waitForSends(int expectedSends) {
@@ -253,7 +240,7 @@ public void onCompleted() {}
253240

254241
@Override
255242
public boolean isReady() {
256-
return true;
243+
return isReady;
257244
}
258245

259246
@Override

0 commit comments

Comments
 (0)