Skip to content

Commit a142d78

Browse files
authored
Fix FanOutStreamingEngineWorkerHarnessTest - testOnNewWorkerMetadata_redistributesBudget (#34531)
* Fix Resources weren't released in time issue in testOnNewWorkerMetadata_redistributesBudget * Import Assert.fail separately * Refactor FanOutStreamingEngineWorkerHarness related components and tests to avoid cleanUp hanging indefinitely * Run spotlessJavaApply
1 parent 4ead940 commit a142d78

File tree

3 files changed

+31
-4
lines changed

3 files changed

+31
-4
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWi
298298
newWindmillEndpoints,
299299
activeMetadataVersion,
300300
newWindmillEndpoints.version());
301-
closeStreamsNotIn(newWindmillEndpoints);
301+
closeStreamsNotIn(newWindmillEndpoints).join();
302302
ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
303303
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
304304
StreamingEngineBackends newBackends =

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSender.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletableFuture;
2323
import java.util.concurrent.ExecutorService;
2424
import java.util.concurrent.Executors;
25+
import java.util.concurrent.TimeUnit;
2526
import java.util.concurrent.atomic.AtomicBoolean;
2627
import java.util.concurrent.atomic.AtomicReference;
2728
import java.util.function.Function;
@@ -40,6 +41,7 @@
4041
import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender;
4142
import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.FixedStreamHeartbeatSender;
4243
import org.apache.beam.sdk.annotations.Internal;
44+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
4345
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
4446

4547
/**
@@ -58,6 +60,7 @@
5860
@ThreadSafe
5961
final class WindmillStreamSender implements GetWorkBudgetSpender, StreamSender {
6062
private static final String STREAM_STARTER_THREAD_NAME = "StartWindmillStreamThread-%d";
63+
private static final int TERMINATION_TIMEOUT_SECONDS = 5;
6164
private final AtomicBoolean started;
6265
private final AtomicReference<GetWorkBudget> getWorkBudget;
6366
private final GetWorkStream getWorkStream;
@@ -142,11 +145,26 @@ synchronized void start() {
142145

143146
@Override
144147
public synchronized void close() {
145-
streamStarter.shutdownNow();
148+
streamStarter.shutdown();
146149
getWorkStream.shutdown();
147150
getDataStream.shutdown();
148151
workCommitter.stop();
149152
commitWorkStream.shutdown();
153+
try {
154+
if (!Preconditions.checkNotNull(streamStarter)
155+
.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
156+
streamStarter.shutdownNow();
157+
}
158+
Preconditions.checkNotNull(getWorkStream)
159+
.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
160+
Preconditions.checkNotNull(getDataStream)
161+
.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
162+
Preconditions.checkNotNull(commitWorkStream)
163+
.awaitTermination(TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS);
164+
} catch (InterruptedException e) {
165+
Thread.currentThread().interrupt();
166+
throw new RuntimeException("Interrupted while waiting for streams to terminate", e);
167+
}
150168
}
151169

152170
@Override

runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java

+11-2
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@
8080
@RunWith(JUnit4.class)
8181
public class FanOutStreamingEngineWorkerHarnessTest {
8282
private static final String CHANNEL_NAME = "FanOutStreamingEngineWorkerHarnessTest";
83+
private static final long WAIT_FOR_METADATA_INJECTIONS_SECONDS = 5;
84+
private static final long SERVER_SHUTDOWN_TIMEOUT_SECONDS = 30;
8385
private static final WindmillServiceAddress DEFAULT_WINDMILL_SERVICE_ADDRESS =
8486
WindmillServiceAddress.create(HostAndPort.fromParts(WindmillChannels.LOCALHOST, 443));
8587
private static final ImmutableMap<String, WorkerMetadataResponse.Endpoint> DEFAULT =
@@ -101,7 +103,7 @@ public class FanOutStreamingEngineWorkerHarnessTest {
101103
.build();
102104

103105
@Rule
104-
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule().setTimeout(1, TimeUnit.MINUTES);
106+
public final GrpcCleanupRule grpcCleanup = new GrpcCleanupRule().setTimeout(3, TimeUnit.MINUTES);
105107

106108
private final GrpcWindmillStreamFactory streamFactory =
107109
spy(GrpcWindmillStreamFactory.of(JOB_HEADER).build());
@@ -167,10 +169,15 @@ public void setUp() throws IOException {
167169
}
168170

169171
@After
170-
public void cleanUp() {
172+
public void cleanUp() throws InterruptedException {
171173
Preconditions.checkNotNull(fanOutStreamingEngineWorkProvider).shutdown();
172174
stubFactory.shutdown();
173175
fakeStreamingEngineServer.shutdown();
176+
if (!Preconditions.checkNotNull(
177+
fakeStreamingEngineServer.awaitTermination(
178+
SERVER_SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS))) {
179+
fakeStreamingEngineServer.shutdownNow();
180+
}
174181
}
175182

176183
private FanOutStreamingEngineWorkerHarness newFanOutStreamingEngineWorkerHarness(
@@ -329,8 +336,10 @@ public void testOnNewWorkerMetadata_redistributesBudget() throws InterruptedExce
329336

330337
fakeGetWorkerMetadataStub.injectWorkerMetadata(firstWorkerMetadata);
331338
verify(getWorkBudgetDistributor, times(1)).distributeBudget(any(), any());
339+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
332340
fakeGetWorkerMetadataStub.injectWorkerMetadata(secondWorkerMetadata);
333341
verify(getWorkBudgetDistributor, times(2)).distributeBudget(any(), any());
342+
TimeUnit.SECONDS.sleep(WAIT_FOR_METADATA_INJECTIONS_SECONDS);
334343
}
335344

336345
private static class WindmillServiceFakeStub

0 commit comments

Comments
 (0)