47
47
import org .apache .beam .runners .dataflow .worker .windmill .WindmillConnection ;
48
48
import org .apache .beam .runners .dataflow .worker .windmill .WindmillEndpoints ;
49
49
import org .apache .beam .runners .dataflow .worker .windmill .WindmillEndpoints .Endpoint ;
50
+ import org .apache .beam .runners .dataflow .worker .windmill .WindmillEndpoints .EndpointType ;
50
51
import org .apache .beam .runners .dataflow .worker .windmill .WindmillServiceAddress ;
51
52
import org .apache .beam .runners .dataflow .worker .windmill .client .WindmillStream ;
52
53
import org .apache .beam .runners .dataflow .worker .windmill .client .WindmillStream .GetDataStream ;
@@ -121,6 +122,9 @@ public final class FanOutStreamingEngineWorkerHarness implements StreamingWorker
121
122
@ GuardedBy ("metadataLock" )
122
123
private long pendingMetadataVersion ;
123
124
125
+ @ GuardedBy ("metadataLock" )
126
+ private EndpointType activeEndpointType = EndpointType .DIRECTPATH ;
127
+
124
128
@ GuardedBy ("this" )
125
129
private boolean started ;
126
130
@@ -201,9 +205,13 @@ public static FanOutStreamingEngineWorkerHarness create(
201
205
workCommitterFactory ,
202
206
getDataMetricTracker ,
203
207
Executors .newSingleThreadExecutor (
204
- new ThreadFactoryBuilder ()
205
- .setNameFormat (WORKER_METADATA_CONSUMER_THREAD_NAME )
206
- .build ()));
208
+ new ThreadFactoryBuilder ().setNameFormat (WORKER_METADATA_CONSUMER_THREAD_NAME ).build ()),
209
+ streamPoolWorkCommitter ,
210
+ streamPoolGetDataClient ,
211
+ streamPoolHeartbeatSender ,
212
+ streamingWorkScheduler ,
213
+ waitForResources ,
214
+ computationStateFetcher );
207
215
}
208
216
209
217
@ VisibleForTesting
@@ -216,7 +224,13 @@ static FanOutStreamingEngineWorkerHarness forTesting(
216
224
GetWorkBudgetDistributor getWorkBudgetDistributor ,
217
225
GrpcDispatcherClient dispatcherClient ,
218
226
Function <WindmillStream .CommitWorkStream , WorkCommitter > workCommitterFactory ,
219
- ThrottlingGetDataMetricTracker getDataMetricTracker ) {
227
+ ThrottlingGetDataMetricTracker getDataMetricTracker ,
228
+ WorkCommitter streamPoolWorkCommitter ,
229
+ GetDataClient streamPoolGetDataClient ,
230
+ HeartbeatSender streamPoolHeartbeatSender ,
231
+ StreamingWorkScheduler streamingWorkScheduler ,
232
+ Runnable waitForResources ,
233
+ Function <String , Optional <ComputationState >> computationStateFetcher ) {
220
234
FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkProvider =
221
235
new FanOutStreamingEngineWorkerHarness (
222
236
jobHeader ,
@@ -233,10 +247,32 @@ static FanOutStreamingEngineWorkerHarness forTesting(
233
247
// blocked by the consumeWorkerMetadata() task. Test suites run in different
234
248
// environments and non-determinism has lead to past flakiness. See
235
249
// https://github.com/apache/beam/issues/28957.
236
- MoreExecutors .newDirectExecutorService ());
250
+ MoreExecutors .newDirectExecutorService (),
251
+ streamPoolWorkCommitter ,
252
+ streamPoolGetDataClient ,
253
+ streamPoolHeartbeatSender ,
254
+ streamingWorkScheduler ,
255
+ waitForResources ,
256
+ computationStateFetcher );
237
257
fanOutStreamingEngineWorkProvider .start ();
238
258
return fanOutStreamingEngineWorkProvider ;
239
259
}
260
+ /*
261
+ @Override
262
+ public synchronized void start() {
263
+ Preconditions.checkState(!started, "FanOutStreamingEngineWorkerHarness cannot start twice.");
264
+ GetWorkerMetadataStream localGetWorkerMetadataStream = streamFactory.createGetWorkerMetadataStream(
265
+ dispatcherClient::getWindmillMetadataServiceStubBlocking,
266
+ getWorkerMetadataThrottleTimer,
267
+ this::consumeWorkerMetadata);
268
+
269
+ if (localGetWorkerMetadataStream != null) {
270
+ localGetWorkerMetadataStream.start();
271
+ }
272
+ getWorkerMetadataStream = localGetWorkerMetadataStream;
273
+ started = true;
274
+ }
275
+ */
240
276
241
277
@ Override
242
278
public synchronized void start () {
@@ -301,10 +337,14 @@ public synchronized void shutdown() {
301
337
}
302
338
303
339
private void consumeWorkerMetadata (WindmillEndpoints windmillEndpoints ) {
340
+ LOG .info ("DEBUG LOG: consumeWorkerMetadata called with endpoints: {}" , windmillEndpoints );
304
341
synchronized (metadataLock ) {
305
342
// Only process versions greater than what we currently have to prevent double processing of
306
343
// metadata. workerMetadataConsumer is single-threaded so we maintain ordering.
307
- if (windmillEndpoints .version () > pendingMetadataVersion ) {
344
+ // But in case the endpoint type in worker metadata is different from the active endpoint
345
+ // type, also process those endpoints
346
+ if (windmillEndpoints .version () > pendingMetadataVersion
347
+ || windmillEndpoints .endpointType () != activeEndpointType ) {
308
348
pendingMetadataVersion = windmillEndpoints .version ();
309
349
workerMetadataConsumer .execute (() -> consumeWindmillWorkerEndpoints (windmillEndpoints ));
310
350
}
@@ -314,21 +354,32 @@ private void consumeWorkerMetadata(WindmillEndpoints windmillEndpoints) {
314
354
private synchronized void consumeWindmillWorkerEndpoints (WindmillEndpoints newWindmillEndpoints ) {
315
355
// Since this is run on a single threaded executor, multiple versions of the metadata maybe
316
356
// queued up while a previous version of the windmillEndpoints were being consumed. Only consume
317
- // the endpoints if they are the most current version.
357
+ // the endpoints if they are the most current version, or if the endpoint type is different
358
+ // from currently active endpoints.
318
359
synchronized (metadataLock ) {
319
- if (newWindmillEndpoints .version () < pendingMetadataVersion ) {
360
+ if (newWindmillEndpoints .version () < pendingMetadataVersion
361
+ && newWindmillEndpoints .endpointType () == activeEndpointType ) {
320
362
return ;
321
363
}
364
+ activeEndpointType = newWindmillEndpoints .endpointType ();
322
365
}
323
366
324
367
LOG .debug (
325
368
"Consuming new endpoints: {}. previous metadata version: {}, current metadata version: {}" ,
326
369
newWindmillEndpoints ,
327
370
activeMetadataVersion ,
328
371
newWindmillEndpoints .version ());
372
+ LOG .info (
373
+ "Consuming new endpoints of type: {}. previous metadata version: {}, current metadata version: {}" ,
374
+ newWindmillEndpoints .endpointType (),
375
+ activeMetadataVersion ,
376
+ newWindmillEndpoints .version ());
329
377
closeStreamsNotIn (newWindmillEndpoints );
330
378
ImmutableMap <Endpoint , WindmillStreamSender > newStreams =
331
- createAndStartNewStreams (newWindmillEndpoints .windmillEndpoints ()).join ();
379
+ createAndStartNewStreams (
380
+ newWindmillEndpoints .windmillEndpoints (), newWindmillEndpoints .endpointType ())
381
+ .join ();
382
+
332
383
StreamingEngineBackends newBackends =
333
384
StreamingEngineBackends .builder ()
334
385
.setWindmillStreams (newStreams )
@@ -354,7 +405,7 @@ private CompletableFuture<Void> closeStreamsNotIn(WindmillEndpoints newWindmillE
354
405
.map (
355
406
entry ->
356
407
CompletableFuture .runAsync (
357
- () -> closeStreamSender (entry .getKey (), entry .getValue ()),
408
+ () -> closeStreamSender (entry .getKey (), ( StreamSender ) entry .getValue ()),
358
409
windmillStreamManager ));
359
410
360
411
Set <Endpoint > newGlobalDataEndpoints =
@@ -365,7 +416,8 @@ private CompletableFuture<Void> closeStreamsNotIn(WindmillEndpoints newWindmillE
365
416
.map (
366
417
sender ->
367
418
CompletableFuture .runAsync (
368
- () -> closeStreamSender (sender .endpoint (), sender ), windmillStreamManager ));
419
+ () -> closeStreamSender (sender .endpoint (), (StreamSender ) sender ),
420
+ windmillStreamManager ));
369
421
370
422
return CompletableFuture .allOf (
371
423
Streams .concat (closeStreamFutures , closeGlobalDataStreamFutures )
@@ -384,11 +436,15 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) {
384
436
}
385
437
386
438
private synchronized CompletableFuture <ImmutableMap <Endpoint , WindmillStreamSender >>
387
- createAndStartNewStreams (ImmutableSet <Endpoint > newWindmillEndpoints ) {
439
+ createAndStartNewStreams (
440
+ ImmutableSet <Endpoint > newWindmillEndpoints , EndpointType endpointType ) {
388
441
ImmutableMap <Endpoint , WindmillStreamSender > currentStreams = backends .get ().windmillStreams ();
389
442
return MoreFutures .allAsList (
390
443
newWindmillEndpoints .stream ()
391
- .map (endpoint -> getOrCreateWindmillStreamSenderFuture (endpoint , currentStreams ))
444
+ .map (
445
+ endpoint ->
446
+ getOrCreateWindmillStreamSenderFuture (
447
+ endpoint , currentStreams , endpointType ))
392
448
.collect (Collectors .toList ()))
393
449
.thenApply (
394
450
backends -> backends .stream ().collect (toImmutableMap (Pair ::getLeft , Pair ::getRight )))
@@ -397,13 +453,18 @@ private void closeStreamSender(Endpoint endpoint, StreamSender sender) {
397
453
398
454
private CompletionStage <Pair <Endpoint , WindmillStreamSender >>
399
455
getOrCreateWindmillStreamSenderFuture (
400
- Endpoint endpoint , ImmutableMap <Endpoint , WindmillStreamSender > currentStreams ) {
456
+ Endpoint endpoint ,
457
+ ImmutableMap <Endpoint , WindmillStreamSender > currentStreams ,
458
+ EndpointType endpointType ) {
401
459
return Optional .ofNullable (currentStreams .get (endpoint ))
402
460
.map (backend -> CompletableFuture .completedFuture (Pair .of (endpoint , backend )))
403
461
.orElseGet (
404
462
() ->
405
463
MoreFutures .supplyAsync (
406
- () -> Pair .of (endpoint , createAndStartWindmillStreamSender (endpoint )),
464
+ () ->
465
+ Pair .of (
466
+ endpoint ,
467
+ createAndStartWindmillStreamSender (endpoint , endpointType )),
407
468
windmillStreamManager )
408
469
.toCompletableFuture ());
409
470
}
@@ -452,23 +513,44 @@ private GlobalDataStreamSender getOrCreateGlobalDataSteam(
452
513
keyedEndpoint .getValue ()));
453
514
}
454
515
455
- private WindmillStreamSender createAndStartWindmillStreamSender (Endpoint endpoint ) {
456
- WindmillStreamSender windmillStreamSender =
457
- WindmillStreamSender .create (
458
- WindmillConnection .from (endpoint , this ::createWindmillStub ),
459
- GetWorkRequest .newBuilder ()
460
- .setClientId (jobHeader .getClientId ())
461
- .setJobId (jobHeader .getJobId ())
462
- .setProjectId (jobHeader .getProjectId ())
463
- .setWorkerId (jobHeader .getWorkerId ())
464
- .build (),
465
- GetWorkBudget .noBudget (),
466
- streamFactory ,
467
- workItemScheduler ,
468
- getDataStream ->
469
- StreamGetDataClient .create (
470
- getDataStream , this ::getGlobalDataStream , getDataMetricTracker ),
471
- workCommitterFactory );
516
+ private WindmillStreamSender createAndStartWindmillStreamSender (
517
+ Endpoint endpoint , EndpointType enpointType ) {
518
+ WindmillStreamSender windmillStreamSender ;
519
+ windmillStreamSender =
520
+ enpointType == EndpointType .DIRECTPATH
521
+ ? WindmillDirectStreamSender .create (
522
+ WindmillConnection .from (endpoint , this ::createWindmillStub ),
523
+ GetWorkRequest .newBuilder ()
524
+ .setClientId (jobHeader .getClientId ())
525
+ .setJobId (jobHeader .getJobId ())
526
+ .setProjectId (jobHeader .getProjectId ())
527
+ .setWorkerId (jobHeader .getWorkerId ())
528
+ .build (),
529
+ GetWorkBudget .noBudget (),
530
+ streamFactory ,
531
+ workItemScheduler ,
532
+ getDataStream ->
533
+ StreamGetDataClient .create (
534
+ getDataStream , this ::getGlobalDataStream , getDataMetricTracker ),
535
+ workCommitterFactory )
536
+ : WindmillStreamPoolSender .create (
537
+ WindmillConnection .from (endpoint , this ::createWindmillStub ),
538
+ GetWorkRequest .newBuilder ()
539
+ .setClientId (jobHeader .getClientId ())
540
+ .setJobId (jobHeader .getJobId ())
541
+ .setProjectId (jobHeader .getProjectId ())
542
+ .setWorkerId (jobHeader .getWorkerId ())
543
+ .setMaxItems (totalGetWorkBudget .items ())
544
+ .setMaxBytes (totalGetWorkBudget .bytes ())
545
+ .build (),
546
+ GetWorkBudget .noBudget (),
547
+ streamFactory ,
548
+ streamPoolWorkCommitter ,
549
+ streamPoolGetDataClient ,
550
+ streamPoolHeartbeatSender ,
551
+ streamingWorkScheduler ,
552
+ waitForResources ,
553
+ computationStateFetcher );
472
554
windmillStreamSender .start ();
473
555
return windmillStreamSender ;
474
556
}
0 commit comments