Skip to content

Commit 420f3b0

Browse files
committed
Fallback implementation - contd.
1 parent 784060e commit 420f3b0

File tree

9 files changed

+393
-94
lines changed

9 files changed

+393
-94
lines changed

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java

+35-29
Original file line numberDiff line numberDiff line change
@@ -243,10 +243,30 @@ private StreamingDataflowWorker(
243243
@Nullable ChannelzServlet channelzServlet = null;
244244
Consumer<PrintWriter> getDataStatusProvider;
245245
Supplier<Long> currentActiveCommitBytesProvider;
246-
247-
if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) {
248-
// Direct path pipelines.
249-
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
246+
GetDataClient getDataClient;
247+
HeartbeatSender heartbeatSender;
248+
WorkCommitter workCommitter;
249+
WeightedSemaphore<Commit> maxCommitByteSemaphore = Commits.maxCommitByteSemaphore();
250+
WindmillStreamPool<GetDataStream> getDataStreamPool =
251+
WindmillStreamPool.create(
252+
Math.max(1, options.getWindmillGetDataStreamCount()),
253+
GET_DATA_STREAM_TIMEOUT,
254+
windmillServer::getDataStream);
255+
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
256+
heartbeatSender =
257+
createStreamingEngineHeartbeatSender(
258+
options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
259+
workCommitter =
260+
StreamingEngineWorkCommitter.builder()
261+
.setCommitWorkStreamFactory(
262+
WindmillStreamPool.create(
263+
numCommitThreads, COMMIT_STREAM_TIMEOUT, windmillServer::commitWorkStream)
264+
::getCloseableStream)
265+
.setCommitByteSemaphore(maxCommitByteSemaphore)
266+
.setNumCommitSenders(numCommitThreads)
267+
.setOnCommitComplete(this::onCompleteCommit)
268+
.build();
269+
if (isDirectPathPipeline(options)) {
250270
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness =
251271
FanOutStreamingEngineWorkerHarness.create(
252272
createJobHeader(options, clientId),
@@ -286,7 +306,13 @@ private StreamingDataflowWorker(
286306
.setCommitWorkStreamFactory(
287307
() -> CloseableStream.create(commitWorkStream, () -> {}))
288308
.build(),
289-
getDataMetricTracker);
309+
getDataMetricTracker,
310+
workCommitter,
311+
getDataClient,
312+
heartbeatSender,
313+
streamingWorkScheduler,
314+
() -> memoryMonitor.waitForResources("GetWork"),
315+
computationStateCache::get);
290316
getDataStatusProvider = getDataMetricTracker::printHtml;
291317
currentActiveCommitBytesProvider =
292318
fanOutStreamingEngineWorkerHarness::currentActiveCommitBytes;
@@ -302,34 +328,10 @@ private StreamingDataflowWorker(
302328
.setMaxItems(chooseMaxBundlesOutstanding(options))
303329
.setMaxBytes(MAX_GET_WORK_FETCH_BYTES)
304330
.build();
305-
GetDataClient getDataClient;
306-
HeartbeatSender heartbeatSender;
307-
WorkCommitter workCommitter;
308331
GetWorkSender getWorkSender;
309332
if (options.isEnableStreamingEngine()) {
310-
WindmillStreamPool<GetDataStream> getDataStreamPool =
311-
WindmillStreamPool.create(
312-
Math.max(1, options.getWindmillGetDataStreamCount()),
313-
GET_DATA_STREAM_TIMEOUT,
314-
windmillServer::getDataStream);
315-
getDataClient = new StreamPoolGetDataClient(getDataMetricTracker, getDataStreamPool);
316-
heartbeatSender =
317-
createStreamingEngineHeartbeatSender(
318-
options, windmillServer, getDataStreamPool, configFetcher.getGlobalConfigHandle());
319333
channelzServlet =
320334
createChannelzServlet(options, windmillServer::getWindmillServiceEndpoints);
321-
workCommitter =
322-
StreamingEngineWorkCommitter.builder()
323-
.setCommitWorkStreamFactory(
324-
WindmillStreamPool.create(
325-
numCommitThreads,
326-
COMMIT_STREAM_TIMEOUT,
327-
windmillServer::commitWorkStream)
328-
::getCloseableStream)
329-
.setCommitByteSemaphore(Commits.maxCommitByteSemaphore())
330-
.setNumCommitSenders(numCommitThreads)
331-
.setOnCommitComplete(this::onCompleteCommit)
332-
.build();
333335
getWorkSender =
334336
GetWorkSender.forStreamingEngine(
335337
receiver -> windmillServer.getWorkStream(request, receiver));
@@ -608,6 +610,10 @@ private static StreamingApplianceComputationConfigFetcher createApplianceComputa
608610
new FixedGlobalConfigHandle(StreamingGlobalConfig.builder().build()));
609611
}
610612

613+
private static boolean isDirectPathPipeline(DataflowWorkerHarnessOptions options) {
614+
return options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled();
615+
}
616+
611617
private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) {
612618
Preconditions.checkArgument(
613619
options.isStreaming(),

runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarness.java

+114-32
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.beam.runners.dataflow.worker.windmill.WindmillConnection;
4848
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints;
4949
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.Endpoint;
50+
import org.apache.beam.runners.dataflow.worker.windmill.WindmillEndpoints.EndpointType;
5051
import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress;
5152
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream;
5253
import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream;
@@ -121,6 +122,9 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
121122
@GuardedBy("metadataLock")
122123
private long pendingMetadataVersion;
123124

125+
@GuardedBy("metadataLock")
126+
private EndpointType activeEndpointType = EndpointType.DIRECTPATH;
127+
124128
@GuardedBy("this")
125129
private boolean started;
126130

@@ -201,9 +205,13 @@ public static FanOutStreamingEngineWorkerHarness create(
201205
workCommitterFactory,
202206
getDataMetricTracker,
203207
Executors.newSingleThreadExecutor(
204-
new ThreadFactoryBuilder()
205-
.setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME)
206-
.build()));
208+
new ThreadFactoryBuilder().setNameFormat(WORKER_METADATA_CONSUMER_THREAD_NAME).build()),
209+
streamPoolWorkCommitter,
210+
streamPoolGetDataClient,
211+
streamPoolHeartbeatSender,
212+
streamingWorkScheduler,
213+
waitForResources,
214+
computationStateFetcher);
207215
}
208216

209217
@VisibleForTesting
@@ -216,7 +224,13 @@ static FanOutStreamingEngineWorkerHarness forTesting(
216224
GetWorkBudgetDistributor getWorkBudgetDistributor,
217225
GrpcDispatcherClient dispatcherClient,
218226
Function<WindmillStream.CommitWorkStream, WorkCommitter> workCommitterFactory,
219-
ThrottlingGetDataMetricTracker getDataMetricTracker) {
227+
ThrottlingGetDataMetricTracker getDataMetricTracker,
228+
WorkCommitter streamPoolWorkCommitter,
229+
GetDataClient streamPoolGetDataClient,
230+
HeartbeatSender streamPoolHeartbeatSender,
231+
StreamingWorkScheduler streamingWorkScheduler,
232+
Runnable waitForResources,
233+
Function<String, Optional<ComputationState>> computationStateFetcher) {
220234
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkProvider =
221235
new FanOutStreamingEngineWorkerHarness(
222236
jobHeader,
@@ -233,10 +247,32 @@ static FanOutStreamingEngineWorkerHarness forTesting(
233247
// blocked by the consumeWorkerMetadata() task. Test suites run in different
234248
// environments and non-determinism has lead to past flakiness. See
235249
// https://github.com/apache/beam/issues/28957.
236-
MoreExecutors.newDirectExecutorService());
250+
MoreExecutors.newDirectExecutorService(),
251+
streamPoolWorkCommitter,
252+
streamPoolGetDataClient,
253+
streamPoolHeartbeatSender,
254+
streamingWorkScheduler,
255+
waitForResources,
256+
computationStateFetcher);
237257
fanOutStreamingEngineWorkProvider.start();
238258
return fanOutStreamingEngineWorkProvider;
239259
}
260+
/*
261+
@Override
262+
public synchronized void start() {
263+
Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness cannot start twice.");
264+
GetWorkerMetadataStream localGetWorkerMetadataStream = streamFactory.createGetWorkerMetadataStream(
265+
dispatcherClient::getWindmillMetadataServiceStubBlocking,
266+
getWorkerMetadataThrottleTimer,
267+
this::consumeWorkerMetadata);
268+
269+
if (localGetWorkerMetadataStream != null) {
270+
localGetWorkerMetadataStream.start();
271+
}
272+
getWorkerMetadataStream = localGetWorkerMetadataStream;
273+
started = true;
274+
}
275+
*/
240276

241277
@Override
242278
public synchronized void start() {
@@ -301,10 +337,14 @@ public synchronized void shutdown() {
301337
}
302338

303339
private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
340+
LOG.info("DEBUG LOG: consumeWorkerMetadata called with endpoints: {}", windmillEndpoints);
304341
synchronized (metadataLock) {
305342
// Only process versions greater than what we currently have to prevent double processing of
306343
// metadata. workerMetadataConsumer is single-threaded so we maintain ordering.
307-
if (windmillEndpoints.version() > pendingMetadataVersion) {
344+
// But in case the endpoint type in worker metadata is different from the active endpoint
345+
// type, also process those endpoints
346+
if (windmillEndpoints.version() > pendingMetadataVersion
347+
|| windmillEndpoints.endpointType() != activeEndpointType) {
308348
pendingMetadataVersion = windmillEndpoints.version();
309349
workerMetadataConsumer.execute(() -> consumeWindmillWorkerEndpoints(windmillEndpoints));
310350
}
@@ -314,21 +354,32 @@ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
314354
private synchronized void consumeWindmillWorkerEndpoints(WindmillEndpoints newWindmillEndpoints) {
315355
// Since this is run on a single threaded executor, multiple versions of the metadata maybe
316356
// queued up while a previous version of the windmillEndpoints were being consumed. Only consume
317-
// the endpoints if they are the most current version.
357+
// the endpoints if they are the most current version, or if the endpoint type is different
358+
// from currently active endpoints.
318359
synchronized (metadataLock) {
319-
if (newWindmillEndpoints.version() < pendingMetadataVersion) {
360+
if (newWindmillEndpoints.version() < pendingMetadataVersion
361+
&& newWindmillEndpoints.endpointType() == activeEndpointType) {
320362
return;
321363
}
364+
activeEndpointType = newWindmillEndpoints.endpointType();
322365
}
323366

324367
LOG.debug(
325368
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}",
326369
newWindmillEndpoints,
327370
activeMetadataVersion,
328371
newWindmillEndpoints.version());
372+
LOG.info(
373+
"Consuming new endpoints of type: {}. previous metadata version: {}, current metadata version: {}",
374+
newWindmillEndpoints.endpointType(),
375+
activeMetadataVersion,
376+
newWindmillEndpoints.version());
329377
closeStreamsNotIn(newWindmillEndpoints);
330378
ImmutableMap<Endpoint, WindmillStreamSender> newStreams =
331-
createAndStartNewStreams(newWindmillEndpoints.windmillEndpoints()).join();
379+
createAndStartNewStreams(
380+
newWindmillEndpoints.windmillEndpoints(), newWindmillEndpoints.endpointType())
381+
.join();
382+
332383
StreamingEngineBackends newBackends =
333384
StreamingEngineBackends.builder()
334385
.setWindmillStreams(newStreams)
@@ -354,7 +405,7 @@ private CompletableFuture<Void> closeStreamsNotIn(WindmillEndpoints newWindmillE
354405
.map(
355406
entry ->
356407
CompletableFuture.runAsync(
357-
() -> closeStreamSender(entry.getKey(), entry.getValue()),
408+
() -> closeStreamSender(entry.getKey(), (StreamSender) entry.getValue()),
358409
windmillStreamManager));
359410

360411
Set<Endpoint> newGlobalDataEndpoints =
@@ -365,7 +416,8 @@ private CompletableFuture<Void> closeStreamsNotIn(WindmillEndpoints newWindmillE
365416
.map(
366417
sender ->
367418
CompletableFuture.runAsync(
368-
() -> closeStreamSender(sender.endpoint(), sender), windmillStreamManager));
419+
() -> closeStreamSender(sender.endpoint(), (StreamSender) sender),
420+
windmillStreamManager));
369421

370422
return CompletableFuture.allOf(
371423
Streams.concat(closeStreamFutures, closeGlobalDataStreamFutures)
@@ -384,11 +436,15 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) {
384436
}
385437

386438
private synchronized CompletableFuture<ImmutableMap<Endpoint, WindmillStreamSender>>
387-
createAndStartNewStreams(ImmutableSet<Endpoint> newWindmillEndpoints) {
439+
createAndStartNewStreams(
440+
ImmutableSet<Endpoint> newWindmillEndpoints, EndpointType endpointType) {
388441
ImmutableMap<Endpoint, WindmillStreamSender> currentStreams = backends.get().windmillStreams();
389442
return MoreFutures.allAsList(
390443
newWindmillEndpoints.stream()
391-
.map(endpoint -> getOrCreateWindmillStreamSenderFuture(endpoint, currentStreams))
444+
.map(
445+
endpoint ->
446+
getOrCreateWindmillStreamSenderFuture(
447+
endpoint, currentStreams, endpointType))
392448
.collect(Collectors.toList()))
393449
.thenApply(
394450
backends -> backends.stream().collect(toImmutableMap(Pair::getLeft, Pair::getRight)))
@@ -397,13 +453,18 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) {
397453

398454
private CompletionStage<Pair<Endpoint, WindmillStreamSender>>
399455
getOrCreateWindmillStreamSenderFuture(
400-
Endpoint endpoint, ImmutableMap<Endpoint, WindmillStreamSender> currentStreams) {
456+
Endpoint endpoint,
457+
ImmutableMap<Endpoint, WindmillStreamSender> currentStreams,
458+
EndpointType endpointType) {
401459
return Optional.ofNullable(currentStreams.get(endpoint))
402460
.map(backend -> CompletableFuture.completedFuture(Pair.of(endpoint, backend)))
403461
.orElseGet(
404462
() ->
405463
MoreFutures.supplyAsync(
406-
() -> Pair.of(endpoint, createAndStartWindmillStreamSender(endpoint)),
464+
() ->
465+
Pair.of(
466+
endpoint,
467+
createAndStartWindmillStreamSender(endpoint, endpointType)),
407468
windmillStreamManager)
408469
.toCompletableFuture());
409470
}
@@ -452,23 +513,44 @@ private GlobalDataStreamSender getOrCreateGlobalDataSteam(
452513
keyedEndpoint.getValue()));
453514
}
454515

455-
private WindmillStreamSender createAndStartWindmillStreamSender(Endpoint endpoint) {
456-
WindmillStreamSender windmillStreamSender =
457-
WindmillStreamSender.create(
458-
WindmillConnection.from(endpoint, this::createWindmillStub),
459-
GetWorkRequest.newBuilder()
460-
.setClientId(jobHeader.getClientId())
461-
.setJobId(jobHeader.getJobId())
462-
.setProjectId(jobHeader.getProjectId())
463-
.setWorkerId(jobHeader.getWorkerId())
464-
.build(),
465-
GetWorkBudget.noBudget(),
466-
streamFactory,
467-
workItemScheduler,
468-
getDataStream ->
469-
StreamGetDataClient.create(
470-
getDataStream, this::getGlobalDataStream, getDataMetricTracker),
471-
workCommitterFactory);
516+
private WindmillStreamSender createAndStartWindmillStreamSender(
517+
Endpoint endpoint, EndpointType enpointType) {
518+
WindmillStreamSender windmillStreamSender;
519+
windmillStreamSender =
520+
enpointType == EndpointType.DIRECTPATH
521+
? WindmillDirectStreamSender.create(
522+
WindmillConnection.from(endpoint, this::createWindmillStub),
523+
GetWorkRequest.newBuilder()
524+
.setClientId(jobHeader.getClientId())
525+
.setJobId(jobHeader.getJobId())
526+
.setProjectId(jobHeader.getProjectId())
527+
.setWorkerId(jobHeader.getWorkerId())
528+
.build(),
529+
GetWorkBudget.noBudget(),
530+
streamFactory,
531+
workItemScheduler,
532+
getDataStream ->
533+
StreamGetDataClient.create(
534+
getDataStream, this::getGlobalDataStream, getDataMetricTracker),
535+
workCommitterFactory)
536+
: WindmillStreamPoolSender.create(
537+
WindmillConnection.from(endpoint, this::createWindmillStub),
538+
GetWorkRequest.newBuilder()
539+
.setClientId(jobHeader.getClientId())
540+
.setJobId(jobHeader.getJobId())
541+
.setProjectId(jobHeader.getProjectId())
542+
.setWorkerId(jobHeader.getWorkerId())
543+
.setMaxItems(totalGetWorkBudget.items())
544+
.setMaxBytes(totalGetWorkBudget.bytes())
545+
.build(),
546+
GetWorkBudget.noBudget(),
547+
streamFactory,
548+
streamPoolWorkCommitter,
549+
streamPoolGetDataClient,
550+
streamPoolHeartbeatSender,
551+
streamingWorkScheduler,
552+
waitForResources,
553+
computationStateFetcher);
472554
windmillStreamSender.start();
473555
return windmillStreamSender;
474556
}

0 commit comments

Comments
 (0)