From 2909dc90c259ee620e306d905512f5927a9e3e82 Mon Sep 17 00:00:00 2001 From: Martin Trieu Date: Wed, 16 Apr 2025 17:30:08 -0700 Subject: [PATCH] clean up some WindmillStubFactoryFactory --- .../worker/StreamingDataflowWorker.java | 207 ++++++++++-------- .../harness/BackgroundMemoryMonitor.java | 54 +++++ .../client/grpc/GrpcDispatcherClient.java | 61 ++---- .../client/grpc/GrpcWindmillServer.java | 26 ++- .../grpc/stubs/ConfigAwareChannelFactory.java | 71 ++++++ .../stubs/WindmillStubFactoryFactory.java | 25 --- .../stubs/WindmillStubFactoryFactoryImpl.java | 56 ----- .../worker/StreamingDataflowWorkerTest.java | 128 +++-------- ...anOutStreamingEngineWorkerHarnessTest.java | 13 +- .../client/grpc/GrpcDispatcherClientTest.java | 141 ------------ .../client/grpc/GrpcWindmillServerTest.java | 22 +- .../stubs/ConfigAwareChannelFactoryTest.java | 166 ++++++++++++++ .../testing/FakeWindmillStubFactory.java | 4 + .../FakeWindmillStubFactoryFactory.java | 35 --- 14 files changed, 489 insertions(+), 520 deletions(-) create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/BackgroundMemoryMonitor.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactory.java delete mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java delete mode 100644 runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java delete mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java create mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactoryTest.java delete mode 100644 runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index daca54fe1587..004b41daab41 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -28,7 +28,6 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -54,6 +53,7 @@ import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandle; import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfigHandleImpl; +import org.apache.beam.runners.dataflow.worker.streaming.harness.BackgroundMemoryMonitor; import org.apache.beam.runners.dataflow.worker.streaming.harness.FanOutStreamingEngineWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness; import org.apache.beam.runners.dataflow.worker.streaming.harness.SingleSourceWorkerHarness.GetWorkSender; @@ -67,6 +67,7 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill; import org.apache.beam.runners.dataflow.worker.windmill.Windmill.JobHeader; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.appliance.JniWindmillApplianceServer; import org.apache.beam.runners.dataflow.worker.windmill.client.CloseableStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; @@ -87,9 +88,9 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.GrpcWindmillStreamFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCache; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingRemoteStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ConfigAwareChannelFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.ThrottledTimeTracker; import org.apache.beam.runners.dataflow.worker.windmill.state.WindmillStateCache; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; @@ -201,7 +202,9 @@ private StreamingDataflowWorker( GrpcWindmillStreamFactory windmillStreamFactory, ScheduledExecutorService activeWorkRefreshExecutorFn, ConcurrentMap stageInfoMap, - @Nullable GrpcDispatcherClient dispatcherClient) { + @Nullable GrpcDispatcherClient dispatcherClient, + @Nullable ChannelCache channelCache, + ChannelCachingStubFactory stubFactory) { // Register standard file systems. FileSystems.setDefaultPipelineOptions(options); this.configFetcher = configFetcher; @@ -242,11 +245,9 @@ private StreamingDataflowWorker( @Nullable ChannelzServlet channelzServlet = null; Consumer getDataStatusProvider; Supplier currentActiveCommitBytesProvider; - ChannelCache channelCache = null; if (options.isEnableStreamingEngine() && options.getIsWindmillServiceDirectPathEnabled()) { // Direct path pipelines. WeightedSemaphore maxCommitByteSemaphore = Commits.maxCommitByteSemaphore(); - channelCache = createChannelCache(options, configFetcher); FanOutStreamingEngineWorkerHarness fanOutStreamingEngineWorkerHarness = FanOutStreamingEngineWorkerHarness.create( createJobHeader(options, clientId), @@ -273,7 +274,7 @@ private StreamingDataflowWorker( processingContext, getWorkStreamLatencies); }), - ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache), + stubFactory, GetWorkBudgetDistributors.distributeEvenly(), Preconditions.checkNotNull(dispatcherClient), commitWorkStream -> @@ -456,23 +457,17 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder = createGrpcwindmillStreamFactoryBuilder(options, clientId); - ConfigFetcherComputationStateCacheAndWindmillClient - configFetcherComputationStateCacheAndWindmillClient = - createConfigFetcherComputationStateCacheAndWindmillClient( - options, - dataflowServiceClient, - windmillStreamFactoryBuilder, - configFetcher -> - ComputationStateCache.create( - configFetcher, - workExecutor, - windmillStateCache::forComputation, - ID_GENERATOR)); + Dependencies dependencies = + createDependencies( + options, + dataflowServiceClient, + windmillStreamFactoryBuilder, + configFetcher -> + ComputationStateCache.create( + configFetcher, workExecutor, windmillStateCache::forComputation, ID_GENERATOR)); - ComputationStateCache computationStateCache = - configFetcherComputationStateCacheAndWindmillClient.computationStateCache(); - WindmillServerStub windmillServer = - configFetcherComputationStateCacheAndWindmillClient.windmillServer(); + ComputationStateCache computationStateCache = dependencies.computationStateCache(); + WindmillServerStub windmillServer = dependencies.windmillServer(); FailureTracker failureTracker = options.isEnableStreamingEngine() @@ -509,7 +504,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o return new StreamingDataflowWorker( windmillServer, clientId, - configFetcherComputationStateCacheAndWindmillClient.configFetcher(), + dependencies.configFetcher(), computationStateCache, windmillStateCache, workExecutor, @@ -522,11 +517,46 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o workFailureProcessor, streamingCounters, memoryMonitor, - configFetcherComputationStateCacheAndWindmillClient.windmillStreamFactory(), + dependencies.windmillStreamFactory(), Executors.newSingleThreadScheduledExecutor( new ThreadFactoryBuilder().setNameFormat("RefreshWork").build()), stageInfo, - configFetcherComputationStateCacheAndWindmillClient.windmillDispatcherClient()); + dependencies.windmillDispatcherClient(), + dependencies.channelCache(), + dependencies.stubFactory()); + } + + private static Dependencies.Builder initialDependencies( + DataflowWorkerHarnessOptions options, ComputationConfig.Fetcher configFetcher) { + if (options.getUseWindmillIsolatedChannels() == null + || options.getIsWindmillServiceDirectPathEnabled()) { + ConfigAwareChannelFactory channelFactory = + new ConfigAwareChannelFactory(options.getWindmillServiceRpcChannelAliveTimeoutSec()); + ChannelCache channelCache = ChannelCache.create(channelFactory); + ChannelCachingRemoteStubFactory stubFactory = + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache); + GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(stubFactory); + configFetcher + .getGlobalConfigHandle() + .registerConfigObserver( + config -> { + if (channelFactory.tryConsumeJobConfig(config)) { + dispatcherClient.reloadDispatcherEndpoints(config.windmillServiceEndpoints()); + } + }); + return Dependencies.builder() + .setChannelCache(channelCache) + .setStubFactory(stubFactory) + .setWindmillDispatcherClient(dispatcherClient); + } else { + ChannelCache channelCache = createConfigAwareChannelCache(options, configFetcher); + ChannelCachingRemoteStubFactory stubFactory = + ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), channelCache); + return Dependencies.builder() + .setChannelCache(channelCache) + .setStubFactory(stubFactory) + .setWindmillDispatcherClient(GrpcDispatcherClient.create(stubFactory)); + } } /** @@ -535,19 +565,16 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o * the underlying implementation. This method simplifies creating them and returns an object with * all of these dependencies initialized. */ - private static ConfigFetcherComputationStateCacheAndWindmillClient - createConfigFetcherComputationStateCacheAndWindmillClient( - DataflowWorkerHarnessOptions options, - WorkUnitClient dataflowServiceClient, - GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, - Function computationStateCacheFactory) { + private static Dependencies createDependencies( + DataflowWorkerHarnessOptions options, + WorkUnitClient dataflowServiceClient, + GrpcWindmillStreamFactory.Builder windmillStreamFactoryBuilder, + Function computationStateCacheFactory) { if (options.isEnableStreamingEngine()) { - GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); ComputationConfig.Fetcher configFetcher = StreamingEngineComputationConfigFetcher.create( options.getGlobalConfigRefreshPeriod().getMillis(), dataflowServiceClient); - configFetcher.getGlobalConfigHandle().registerConfigObserver(dispatcherClient::onJobConfig); + Dependencies.Builder dependencies = initialDependencies(options, configFetcher); ComputationStateCache computationStateCache = computationStateCacheFactory.apply(configFetcher); GrpcWindmillStreamFactory windmillStreamFactory = @@ -557,13 +584,14 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o .setHealthCheckIntervalMillis( options.getWindmillServiceStreamingRpcHealthCheckPeriodMs()) .build(); - return ConfigFetcherComputationStateCacheAndWindmillClient.builder() - .setWindmillDispatcherClient(dispatcherClient) + + return dependencies .setConfigFetcher(configFetcher) .setComputationStateCache(computationStateCache) .setWindmillStreamFactory(windmillStreamFactory) .setWindmillServer( - GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient)) + GrpcWindmillServer.create( + options, windmillStreamFactory, dependencies.windmillDispatcherClient())) .build(); } @@ -571,7 +599,9 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o if (options.getWindmillServiceEndpoint() != null || options.getLocalWindmillHostport().startsWith("grpc:")) { GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); + GrpcDispatcherClient.create( + ChannelCachingRemoteStubFactory.create( + options.getGcpCredential(), createChannelCache(options))); GrpcWindmillStreamFactory windmillStreamFactory = windmillStreamFactoryBuilder .setHealthCheckIntervalMillis( @@ -581,7 +611,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o GrpcWindmillServer.create(options, windmillStreamFactory, dispatcherClient); ComputationConfig.Fetcher configFetcher = createApplianceComputationConfigFetcher(windmillServer); - return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + return Dependencies.builder() .setWindmillDispatcherClient(dispatcherClient) .setWindmillServer(windmillServer) .setWindmillStreamFactory(windmillStreamFactory) @@ -594,7 +624,7 @@ public static StreamingDataflowWorker fromOptions(DataflowWorkerHarnessOptions o new JniWindmillApplianceServer(options.getLocalWindmillHostport()); ComputationConfig.Fetcher configFetcher = createApplianceComputationConfigFetcher(windmillServer); - return ConfigFetcherComputationStateCacheAndWindmillClient.builder() + return Dependencies.builder() .setWindmillStreamFactory(windmillStreamFactoryBuilder.build()) .setWindmillServer(windmillServer) .setConfigFetcher(configFetcher) @@ -621,21 +651,9 @@ private static void validateWorkerOptions(DataflowWorkerHarnessOptions options) StreamingDataflowWorker.class.getSimpleName()); } - private static ChannelCache createChannelCache( + private static ChannelCache createConfigAwareChannelCache( DataflowWorkerHarnessOptions workerOptions, ComputationConfig.Fetcher configFetcher) { - ChannelCache channelCache = - ChannelCache.create( - (currentFlowControlSettings, serviceAddress) -> { - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress. - return IsolationChannel.create( - () -> - remoteChannel( - serviceAddress, - workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), - currentFlowControlSettings), - currentFlowControlSettings.getOnReadyThresholdBytes()); - }); + ChannelCache channelCache = createChannelCache(workerOptions); configFetcher .getGlobalConfigHandle() .registerConfigObserver( @@ -645,6 +663,36 @@ private static ChannelCache createChannelCache( return channelCache; } + private static ChannelCache createChannelCache(DataflowWorkerHarnessOptions workerOptions) { + return ChannelCache.create( + (currentFlowControlSettings, serviceAddress) -> { + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress. + if (serviceAddress.getKind() + == WindmillServiceAddress.Kind.AUTHENTICATED_GCP_SERVICE_ADDRESS) { + return IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + currentFlowControlSettings), + currentFlowControlSettings.getOnReadyThresholdBytes()); + } + return workerOptions.getUseWindmillIsolatedChannels() + ? IsolationChannel.create( + () -> + remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + currentFlowControlSettings), + currentFlowControlSettings.getOnReadyThresholdBytes()) + : remoteChannel( + serviceAddress, + workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(), + currentFlowControlSettings); + }); + } + @VisibleForTesting static StreamingDataflowWorker forTesting( Map prePopulatedStateNameMappings, @@ -660,7 +708,8 @@ static StreamingDataflowWorker forTesting( StreamingGlobalConfigHandleImpl globalConfigHandle, int localRetryTimeoutMs, StreamingCounters streamingCounters, - WindmillStubFactoryFactory stubFactory) { + ChannelCachingStubFactory stubFactory, + ChannelCache channelCache) { ConcurrentMap stageInfo = new ConcurrentHashMap<>(); BoundedQueueExecutor workExecutor = createWorkUnitExecutor(options); WindmillStateCache stateCache = @@ -765,7 +814,9 @@ static StreamingDataflowWorker forTesting( : windmillStreamFactory.build(), executorSupplier.apply("RefreshWork"), stageInfo, - GrpcDispatcherClient.create(options, stubFactory)); + GrpcDispatcherClient.create(stubFactory), + channelCache, + stubFactory); } private static GrpcWindmillStreamFactory.Builder createGrpcwindmillStreamFactoryBuilder( @@ -942,11 +993,10 @@ private interface StreamingWorkerStatusReporterFactory { } @AutoValue - abstract static class ConfigFetcherComputationStateCacheAndWindmillClient { + abstract static class Dependencies { private static Builder builder() { - return new AutoValue_StreamingDataflowWorker_ConfigFetcherComputationStateCacheAndWindmillClient - .Builder(); + return new AutoValue_StreamingDataflowWorker_Dependencies.Builder(); } abstract ComputationConfig.Fetcher configFetcher(); @@ -957,6 +1007,10 @@ private static Builder builder() { abstract GrpcWindmillStreamFactory windmillStreamFactory(); + abstract @Nullable ChannelCache channelCache(); + + abstract ChannelCachingRemoteStubFactory stubFactory(); + abstract @Nullable GrpcDispatcherClient windmillDispatcherClient(); @AutoValue.Builder @@ -971,38 +1025,13 @@ abstract static class Builder { abstract Builder setWindmillDispatcherClient(GrpcDispatcherClient value); - abstract ConfigFetcherComputationStateCacheAndWindmillClient build(); - } - } - - /** - * Monitors memory pressure on a background executor. May be used to throttle calls, blocking if - * there is memory pressure. - */ - @AutoValue - abstract static class BackgroundMemoryMonitor { - - private static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) { - return new AutoValue_StreamingDataflowWorker_BackgroundMemoryMonitor( - memoryMonitor, - Executors.newSingleThreadScheduledExecutor( - new ThreadFactoryBuilder() - .setNameFormat("MemoryMonitor") - .setPriority(Thread.MIN_PRIORITY) - .build())); - } + abstract GrpcDispatcherClient windmillDispatcherClient(); - abstract MemoryMonitor memoryMonitor(); + abstract Builder setChannelCache(ChannelCache channelCache); - abstract ExecutorService executor(); - - private void start() { - executor().execute(memoryMonitor()); - } + abstract Builder setStubFactory(ChannelCachingRemoteStubFactory stubFactory); - private void shutdown() { - memoryMonitor().stop(); - executor().shutdown(); + abstract Dependencies build(); } } } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/BackgroundMemoryMonitor.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/BackgroundMemoryMonitor.java new file mode 100644 index 000000000000..93d5fcef50be --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/BackgroundMemoryMonitor.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.streaming.harness; + +import com.google.auto.value.AutoValue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Monitors memory pressure on a background executor. May be used to throttle calls, blocking if + * there is memory pressure. + */ +@AutoValue +public abstract class BackgroundMemoryMonitor { + public static BackgroundMemoryMonitor create(MemoryMonitor memoryMonitor) { + return new AutoValue_BackgroundMemoryMonitor( + memoryMonitor, + Executors.newSingleThreadScheduledExecutor( + new ThreadFactoryBuilder() + .setNameFormat("MemoryMonitor") + .setPriority(Thread.MIN_PRIORITY) + .build())); + } + + abstract MemoryMonitor memoryMonitor(); + + abstract ExecutorService executor(); + + public void start() { + executor().execute(memoryMonitor()); + } + + public void shutdown() { + memoryMonitor().stop(); + executor().shutdown(); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java index 3603bacf461a..ca18ef8f278a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClient.java @@ -26,19 +26,15 @@ import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.concurrent.GuardedBy; import javax.annotation.concurrent.ThreadSafe; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc.CloudWindmillMetadataServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; @@ -64,43 +60,25 @@ public class GrpcDispatcherClient { @GuardedBy("this") private final Random rand; - private final WindmillStubFactoryFactory windmillStubFactoryFactory; - - private final AtomicReference windmillStubFactory = new AtomicReference<>(); - - private final AtomicBoolean useIsolatedChannels = new AtomicBoolean(); - private final boolean reactToIsolatedChannelsJobSetting; + private final WindmillStubFactory windmillStubFactory; private GrpcDispatcherClient( - DataflowWorkerHarnessOptions options, - WindmillStubFactoryFactory windmillStubFactoryFactory, + WindmillStubFactory windmillStubFactory, DispatcherStubs initialDispatcherStubs, Random rand) { - this.windmillStubFactoryFactory = windmillStubFactoryFactory; - if (options.getUseWindmillIsolatedChannels() != null) { - this.useIsolatedChannels.set(options.getUseWindmillIsolatedChannels()); - this.reactToIsolatedChannelsJobSetting = false; - } else { - this.useIsolatedChannels.set(false); - this.reactToIsolatedChannelsJobSetting = true; - } - this.windmillStubFactory.set( - windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels.get())); + this.windmillStubFactory = windmillStubFactory; this.rand = rand; this.dispatcherStubs = new AtomicReference<>(initialDispatcherStubs); this.onInitializedEndpoints = new CountDownLatch(1); } - public static GrpcDispatcherClient create( - DataflowWorkerHarnessOptions options, WindmillStubFactoryFactory windmillStubFactoryFactory) { - return new GrpcDispatcherClient( - options, windmillStubFactoryFactory, DispatcherStubs.empty(), new Random()); + public static GrpcDispatcherClient create(WindmillStubFactory windmillStubFactory) { + return new GrpcDispatcherClient(windmillStubFactory, DispatcherStubs.empty(), new Random()); } @VisibleForTesting public static GrpcDispatcherClient forTesting( - DataflowWorkerHarnessOptions options, - WindmillStubFactoryFactory windmillStubFactoryFactory, + WindmillStubFactory windmillStubFactory, List windmillServiceStubs, List windmillMetadataServiceStubs, Set dispatcherEndpoints) { @@ -108,8 +86,7 @@ public static GrpcDispatcherClient forTesting( dispatcherEndpoints.size() == windmillServiceStubs.size() && windmillServiceStubs.size() == windmillMetadataServiceStubs.size()); return new GrpcDispatcherClient( - options, - windmillStubFactoryFactory, + windmillStubFactory, DispatcherStubs.create( dispatcherEndpoints, windmillServiceStubs, windmillMetadataServiceStubs), new Random()); @@ -164,28 +141,16 @@ private synchronized T randomlySelectNextStub(List stubs) { return stubs.get(rand.nextInt(stubs.size())); } - public void onJobConfig(StreamingGlobalConfig config) { - if (config.windmillServiceEndpoints().isEmpty()) { - LOG.warn("Dispatcher client received empty windmill service endpoints from global config"); - return; - } - boolean forceRecreateStubs = false; - if (reactToIsolatedChannelsJobSetting) { - boolean useIsolatedChannels = config.userWorkerJobSettings().getUseWindmillIsolatedChannels(); - if (this.useIsolatedChannels.getAndSet(useIsolatedChannels) != useIsolatedChannels) { - windmillStubFactory.set( - windmillStubFactoryFactory.makeWindmillStubFactory(useIsolatedChannels)); - forceRecreateStubs = true; - } - } - consumeWindmillDispatcherEndpoints(config.windmillServiceEndpoints(), forceRecreateStubs); - } - public synchronized void consumeWindmillDispatcherEndpoints( ImmutableSet dispatcherEndpoints) { consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ false); } + public synchronized void reloadDispatcherEndpoints( + ImmutableSet dispatcherEndpoints) { + consumeWindmillDispatcherEndpoints(dispatcherEndpoints, /* forceRecreateStubs= */ true); + } + private synchronized void consumeWindmillDispatcherEndpoints( ImmutableSet dispatcherEndpoints, boolean forceRecreateStubs) { ImmutableSet currentDispatcherEndpoints = @@ -204,7 +169,7 @@ private synchronized void consumeWindmillDispatcherEndpoints( } LOG.info("Initializing Streaming Engine GRPC client for endpoints: {}", dispatcherEndpoints); - dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory.get())); + dispatcherStubs.set(DispatcherStubs.create(dispatcherEndpoints, windmillStubFactory)); onInitializedEndpoints.countDown(); } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java index e4a7e1a2f92c..21e9b36d748f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServer.java @@ -50,10 +50,11 @@ import org.apache.beam.runners.dataflow.worker.windmill.Windmill.ReportStatsResponse; import org.apache.beam.runners.dataflow.worker.windmill.WindmillApplianceGrpc; import org.apache.beam.runners.dataflow.worker.windmill.WindmillServerStub; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.CommitWorkStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetDataStream; import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStream.GetWorkStream; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; +import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.throttling.StreamingEngineThrottleTimers; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemReceiver; import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; @@ -154,7 +155,7 @@ static GrpcWindmillServer newTestInstance( String name, List experiments, long clientId, - WindmillStubFactoryFactory windmillStubFactoryFactory) { + WindmillStubFactory windmillStubFactory) { ManagedChannel inProcessChannel = inProcessChannel(name); CloudWindmillServiceV1Alpha1Stub stub = CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel); @@ -170,8 +171,7 @@ static GrpcWindmillServer newTestInstance( Set dispatcherEndpoints = Sets.newHashSet(HostAndPort.fromHost(name)); GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.forTesting( - testOptions, - windmillStubFactoryFactory, + windmillStubFactory, windmillServiceStubs, windmillMetadataServiceStubs, dispatcherEndpoints); @@ -193,8 +193,7 @@ static GrpcWindmillServer newTestInstance( } @VisibleForTesting - static GrpcWindmillServer newApplianceTestInstance( - Channel channel, WindmillStubFactoryFactory windmillStubFactoryFactory) { + static GrpcWindmillServer newApplianceTestInstance(Channel channel) { DataflowWorkerHarnessOptions options = testOptions(/* enableStreamingEngine= */ false, new ArrayList<>()); GrpcWindmillServer testServer = @@ -202,7 +201,20 @@ static GrpcWindmillServer newApplianceTestInstance( options, GrpcWindmillStreamFactory.of(createJobHeader(options, 1)).build(), // No-op, Appliance does not use Dispatcher to call Streaming Engine. - GrpcDispatcherClient.create(options, windmillStubFactoryFactory)); + GrpcDispatcherClient.create( + new WindmillStubFactory() { + @Override + public CloudWindmillServiceV1Alpha1Stub createWindmillServiceStub( + WindmillServiceAddress serviceAddress) { + throw new UnsupportedOperationException(); + } + + @Override + public CloudWindmillMetadataServiceV1Alpha1Stub createWindmillMetadataServiceStub( + WindmillServiceAddress serviceAddress) { + throw new UnsupportedOperationException(); + } + })); testServer.syncApplianceStub = createWindmillApplianceStubWithDeadlineInterceptor(channel); return testServer; } diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactory.java new file mode 100644 index 000000000000..60fd208c6784 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactory.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel; + +import javax.annotation.concurrent.ThreadSafe; +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Internal; +import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +/** Creates gRPC channels based on the current {@link StreamingGlobalConfig}. */ +@Internal +@ThreadSafe +public final class ConfigAwareChannelFactory implements WindmillChannelFactory { + private final int windmillServiceRpcChannelAliveTimeoutSec; + @MonotonicNonNull private StreamingGlobalConfig currentConfig = null; + + public ConfigAwareChannelFactory(int windmillServiceRpcChannelAliveTimeoutSec) { + this.windmillServiceRpcChannelAliveTimeoutSec = windmillServiceRpcChannelAliveTimeoutSec; + } + + @Override + public synchronized ManagedChannel create( + Windmill.UserWorkerGrpcFlowControlSettings flowControlSettings, + WindmillServiceAddress serviceAddress) { + return currentConfig != null + && currentConfig.userWorkerJobSettings().getUseWindmillIsolatedChannels() + // IsolationChannel will create and manage separate RPC channels to the same + // serviceAddress via calling the channelFactory, else just directly return + // the RPC channel. + ? IsolationChannel.create( + () -> + remoteChannel( + serviceAddress.getServiceAddress(), + windmillServiceRpcChannelAliveTimeoutSec, + flowControlSettings)) + : remoteChannel( + serviceAddress.getServiceAddress(), + windmillServiceRpcChannelAliveTimeoutSec, + flowControlSettings); + } + + public synchronized boolean tryConsumeJobConfig(StreamingGlobalConfig config) { + if (currentConfig == null + || config.userWorkerJobSettings().getUseWindmillIsolatedChannels() + != currentConfig.userWorkerJobSettings().getUseWindmillIsolatedChannels()) { + currentConfig = config; + return true; + } + return false; + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java deleted file mode 100644 index f7dd9a22b996..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; - -import org.apache.beam.sdk.annotations.Internal; - -@Internal -public interface WindmillStubFactoryFactory { - WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels); -} diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java deleted file mode 100644 index 6de6b6337539..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/WindmillStubFactoryFactoryImpl.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; - -import static org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels.remoteChannel; - -import com.google.auth.Credentials; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; - -public class WindmillStubFactoryFactoryImpl implements WindmillStubFactoryFactory { - private final int windmillServiceRpcChannelAliveTimeoutSec; - private final Credentials gcpCredential; - - public WindmillStubFactoryFactoryImpl(DataflowWorkerHarnessOptions workerOptions) { - this.gcpCredential = workerOptions.getGcpCredential(); - this.windmillServiceRpcChannelAliveTimeoutSec = - workerOptions.getWindmillServiceRpcChannelAliveTimeoutSec(); - } - - @Override - public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) { - ChannelCache channelCache = - ChannelCache.create( - (flowControlSettings, serviceAddress) -> - // IsolationChannel will create and manage separate RPC channels to the same - // serviceAddress via calling the channelFactory, else just directly return the - // RPC channel. - useIsolatedChannels - ? IsolationChannel.create( - () -> - remoteChannel( - serviceAddress.getServiceAddress(), - windmillServiceRpcChannelAliveTimeoutSec, - flowControlSettings)) - : remoteChannel( - serviceAddress.getServiceAddress(), - windmillServiceRpcChannelAliveTimeoutSec, - flowControlSettings)); - return ChannelCachingRemoteStubFactory.create(gcpCredential, channelCache); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java index c60228f336e9..5a0dd291add4 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java @@ -134,7 +134,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.getdata.FakeGetDataClient; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.refresh.HeartbeatSender; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.Context; @@ -196,7 +195,6 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ErrorCollector; @@ -313,28 +311,6 @@ private static CounterUpdate getCounter(Iterable counters, String return null; } - private Iterable buildCounters() { - return Iterables.concat( - streamingCounters - .pendingDeltaCounters() - .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), - streamingCounters - .pendingCumulativeCounters() - .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); - } - - @Before - public void setUp() { - server.clearCommitsReceived(); - streamingCounters = StreamingCounters.create(); - } - - @After - public void cleanUp() { - Optional.ofNullable(computationStateCache) - .ifPresent(ComputationStateCache::closeAndInvalidateAll); - } - private static ExecutableWork createMockWork( ShardedKey shardedKey, long workToken, String computationId) { return createMockWork(shardedKey, workToken, computationId, ignored -> {}); @@ -364,6 +340,28 @@ computationId, new FakeGetDataClient(), ignored -> {}, mock(HeartbeatSender.clas processWorkFn); } + private Iterable buildCounters() { + return Iterables.concat( + streamingCounters + .pendingDeltaCounters() + .extractModifiedDeltaUpdates(DataflowCounterUpdateExtractor.INSTANCE), + streamingCounters + .pendingCumulativeCounters() + .extractUpdates(false, DataflowCounterUpdateExtractor.INSTANCE)); + } + + @Before + public void setUp() { + server.clearCommitsReceived(); + streamingCounters = StreamingCounters.create(); + } + + @After + public void cleanUp() { + Optional.ofNullable(computationStateCache) + .ifPresent(ComputationStateCache::closeAndInvalidateAll); + } + private byte[] intervalWindowBytes(IntervalWindow window) throws Exception { return CoderUtils.encodeToByteArray( DEFAULT_WINDOW_COLLECTION_CODER, Collections.singletonList(window)); @@ -864,6 +862,9 @@ private StreamingDataflowWorker makeWorker( StreamingDataflowWorkerTestParams streamingDataflowWorkerTestParams) { when(mockGlobalConfigHandle.getConfig()) .thenReturn(streamingDataflowWorkerTestParams.streamingGlobalConfig()); + FakeWindmillStubFactory stubFactory = + new FakeWindmillStubFactory( + () -> WindmillChannels.inProcessChannel("StreamingDataflowWorkerTestChannel")); StreamingDataflowWorker worker = StreamingDataflowWorker.forTesting( streamingDataflowWorkerTestParams.stateNameMappings(), @@ -880,10 +881,8 @@ private StreamingDataflowWorker makeWorker( mockGlobalConfigHandle, streamingDataflowWorkerTestParams.localRetryTimeoutMs(), streamingCounters, - new FakeWindmillStubFactoryFactory( - new FakeWindmillStubFactory( - () -> - WindmillChannels.inProcessChannel("StreamingDataflowWorkerTestChannel")))); + stubFactory, + stubFactory.getChannelCache()); this.computationStateCache = worker.getComputationStateCache(); return worker; } @@ -2929,63 +2928,6 @@ public void testActiveWorkForShardedKeys() { Mockito.verifyNoMoreInteractions(mockExecutor); } - @Test - @Ignore // Test is flaky on Jenkins (#27555) - public void testMaxThreadMetric() throws Exception { - int maxThreads = 2; - int threadExpiration = 60; - // setting up actual implementation of executor instead of mocking to keep track of - // active thread count. - BoundedQueueExecutor executor = - new BoundedQueueExecutor( - maxThreads, - threadExpiration, - TimeUnit.SECONDS, - maxThreads, - MAXIMUM_BYTES_OUTSTANDING, - new ThreadFactoryBuilder() - .setNameFormat("DataflowWorkUnits-%d") - .setDaemon(true) - .build()); - - ComputationState computationState = - new ComputationState( - "computation", - defaultMapTask(Collections.singletonList(makeSourceInstruction(StringUtf8Coder.of()))), - executor, - ImmutableMap.of(), - null); - - ShardedKey key1Shard1 = ShardedKey.create(ByteString.copyFromUtf8("key1"), 1); - - // overriding definition of MockWork to add sleep, which will help us keep track of how - // long each work item takes to process and therefore let us manipulate how long the time - // at which we're at max threads is. - Consumer sleepProcessWorkFn = - unused -> { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - }; - - ExecutableWork m2 = createMockWork(key1Shard1, 2, sleepProcessWorkFn); - ExecutableWork m3 = createMockWork(key1Shard1, 3, sleepProcessWorkFn); - - assertTrue(computationState.activateWork(m2)); - assertTrue(computationState.activateWork(m3)); - executor.execute(m2, m2.getWorkItem().getSerializedSize()); - - executor.execute(m3, m3.getWorkItem().getSerializedSize()); - - // Will get close to 1000ms that both work items are processing (sleeping, really) - // give or take a few ms. - long i = 990L; - assertTrue(executor.allThreadsActiveTime() >= i); - executor.shutdown(); - } - @Test public void testActiveThreadMetric() throws Exception { int maxThreads = 5; @@ -4076,14 +4018,6 @@ static class BlockingFn extends DoFn implements TestRule { public static AtomicReference counter = new AtomicReference<>(new Semaphore(0)); public static AtomicInteger callCounter = new AtomicInteger(0); - @ProcessElement - public void processElement(ProcessContext c) throws InterruptedException { - callCounter.incrementAndGet(); - counter().release(); - blocker().await(); - c.output(c.element()); - } - public static CountDownLatch blocker() { return blocker.get(); } @@ -4096,6 +4030,14 @@ public static Semaphore counter() { return counter.get(); } + @ProcessElement + public void processElement(ProcessContext c) throws InterruptedException { + callCounter.incrementAndGet(); + counter().release(); + blocker().await(); + c.output(c.element()); + } + @Override public Statement apply(final Statement base, final Description description) { return new Statement() { diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java index b3aa321b9808..ae941411fe17 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java @@ -29,14 +29,11 @@ import static org.mockito.Mockito.verify; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashSet; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import javax.annotation.Nullable; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; import org.apache.beam.runners.dataflow.worker.util.MemoryMonitor; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillMetadataServiceV1Alpha1Grpc; import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc; @@ -53,12 +50,10 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.ChannelCachingStubFactory; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.runners.dataflow.worker.windmill.work.WorkItemScheduler; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudget; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetDistributor; import org.apache.beam.runners.dataflow.worker.windmill.work.budget.GetWorkBudgetSpender; -import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Server; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessServerBuilder; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.inprocess.InProcessSocketAddress; @@ -108,13 +103,7 @@ public class FanOutStreamingEngineWorkerHarnessTest { private final ChannelCachingStubFactory stubFactory = new FakeWindmillStubFactory( () -> grpcCleanup.register(WindmillChannels.inProcessChannel(CHANNEL_NAME))); - private final GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.forTesting( - PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class), - new FakeWindmillStubFactoryFactory(stubFactory), - new ArrayList<>(), - new ArrayList<>(), - new HashSet<>()); + private final GrpcDispatcherClient dispatcherClient = GrpcDispatcherClient.create(stubFactory); @Rule public transient Timeout globalTimeout = Timeout.seconds(600); private Server fakeStreamingEngineServer; private CountDownLatch getWorkerMetadataReady; diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java deleted file mode 100644 index c04456906ea2..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDispatcherClientTest.java +++ /dev/null @@ -1,141 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.client.grpc; - -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.not; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertSame; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import org.apache.beam.runners.dataflow.options.DataflowWorkerHarnessOptions; -import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; -import org.apache.beam.runners.dataflow.worker.windmill.CloudWindmillServiceV1Alpha1Grpc.CloudWindmillServiceV1Alpha1Stub; -import org.apache.beam.runners.dataflow.worker.windmill.Windmill.UserWorkerRunnerV1Settings; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.IsolationChannel; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactoryImpl; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; -import org.hamcrest.Matcher; -import org.junit.Test; -import org.junit.experimental.runners.Enclosed; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.junit.runners.Parameterized; -import org.junit.runners.Parameterized.Parameter; -import org.junit.runners.Parameterized.Parameters; - -@RunWith(Enclosed.class) -public class GrpcDispatcherClientTest { - - @RunWith(JUnit4.class) - public static class RespectsJobSettingTest { - - @Test - public void createsNewStubWhenIsolatedChannelsConfigIsChanged() { - DataflowWorkerHarnessOptions options = - PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - // Create first time with Isolated channels disabled - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); - CloudWindmillServiceV1Alpha1Stub stub1 = dispatcherClient.getWindmillServiceStub(); - CloudWindmillServiceV1Alpha1Stub stub2 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub2, stub1); - assertThat(stub1.getChannel(), not(instanceOf(IsolationChannel.class))); - - // Enable Isolated channels - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ true)); - CloudWindmillServiceV1Alpha1Stub stub3 = dispatcherClient.getWindmillServiceStub(); - assertNotSame(stub3, stub1); - - assertThat(stub3.getChannel(), instanceOf(IsolationChannel.class)); - CloudWindmillServiceV1Alpha1Stub stub4 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub3, stub4); - - // Disable Isolated channels - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); - CloudWindmillServiceV1Alpha1Stub stub5 = dispatcherClient.getWindmillServiceStub(); - assertNotSame(stub4, stub5); - assertThat(stub5.getChannel(), not(instanceOf(IsolationChannel.class))); - } - } - - @RunWith(Parameterized.class) - public static class RespectsPipelineOptionsTest { - - @Parameters - public static Collection data() { - List list = new ArrayList<>(); - for (Boolean pipelineOption : new Boolean[] {true, false}) { - list.add(new Object[] {pipelineOption}); - } - return list; - } - - @Parameter(0) - public Boolean pipelineOption; - - @Test - public void ignoresIsolatedChannelsConfigWithPipelineOption() { - DataflowWorkerHarnessOptions options = - PipelineOptionsFactory.as(DataflowWorkerHarnessOptions.class); - options.setUseWindmillIsolatedChannels(pipelineOption); - GrpcDispatcherClient dispatcherClient = - GrpcDispatcherClient.create(options, new WindmillStubFactoryFactoryImpl(options)); - Matcher classMatcher = - pipelineOption - ? instanceOf(IsolationChannel.class) - : not(instanceOf(IsolationChannel.class)); - - // Job setting disabled, PipelineOption enabled - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); - CloudWindmillServiceV1Alpha1Stub stub1 = dispatcherClient.getWindmillServiceStub(); - CloudWindmillServiceV1Alpha1Stub stub2 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub2, stub1); - assertThat(stub1.getChannel(), classMatcher); - - // Job setting enabled - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ true)); - CloudWindmillServiceV1Alpha1Stub stub3 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub3, stub1); - - CloudWindmillServiceV1Alpha1Stub stub4 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub3, stub4); - - // Job setting disabled - dispatcherClient.onJobConfig(getGlobalConfig(/*useWindmillIsolatedChannels=*/ false)); - CloudWindmillServiceV1Alpha1Stub stub5 = dispatcherClient.getWindmillServiceStub(); - assertSame(stub4, stub5); - } - } - - static StreamingGlobalConfig getGlobalConfig(boolean useWindmillIsolatedChannels) { - return StreamingGlobalConfig.builder() - .setWindmillServiceEndpoints(ImmutableSet.of(HostAndPort.fromString("windmill:1234"))) - .setUserWorkerJobSettings( - UserWorkerRunnerV1Settings.newBuilder() - .setUseWindmillIsolatedChannels(useWindmillIsolatedChannels) - .build()) - .build(); - } -} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java index 211e926a073c..18c2cf30ed64 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java @@ -74,7 +74,6 @@ import org.apache.beam.runners.dataflow.worker.windmill.client.WindmillStreamShutdownException; import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillChannels; import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.testing.FakeWindmillStubFactoryFactory; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.CallOptions; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.Channel; @@ -152,14 +151,13 @@ private void startServerAndClient(List experiments) throws Exception { name, experiments, clientId, - new FakeWindmillStubFactoryFactory( - new FakeWindmillStubFactory( - () -> { - ManagedChannel channel = - grpcCleanup.register(WindmillChannels.inProcessChannel(name)); - openedChannels.add(channel); - return channel; - }))); + new FakeWindmillStubFactory( + () -> { + ManagedChannel channel = + grpcCleanup.register(WindmillChannels.inProcessChannel(name)); + openedChannels.add(channel); + return channel; + })); } private void maybeInjectError(Stream stream) { @@ -223,11 +221,7 @@ public ClientCall interceptCall( .build(), testInterceptor); - this.client = - GrpcWindmillServer.newApplianceTestInstance( - inprocessChannel, - new FakeWindmillStubFactoryFactory( - new FakeWindmillStubFactory(() -> (ManagedChannel) inprocessChannel))); + this.client = GrpcWindmillServer.newApplianceTestInstance(inprocessChannel); Windmill.GetWorkResponse response1 = client.getWork(GetWorkRequest.getDefaultInstance()); Windmill.GetWorkResponse response2 = client.getWork(GetWorkRequest.getDefaultInstance()); diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactoryTest.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactoryTest.java new file mode 100644 index 000000000000..239dd1734a60 --- /dev/null +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/stubs/ConfigAwareChannelFactoryTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.beam.runners.dataflow.worker.streaming.config.StreamingGlobalConfig; +import org.apache.beam.runners.dataflow.worker.windmill.Windmill; +import org.apache.beam.runners.dataflow.worker.windmill.WindmillServiceAddress; +import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.ManagedChannel; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.net.HostAndPort; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class ConfigAwareChannelFactoryTest { + private static final Windmill.UserWorkerGrpcFlowControlSettings DEFAULT_FLOW_CONTROL_SETTINGS = + Windmill.UserWorkerGrpcFlowControlSettings.getDefaultInstance(); + private static final WindmillServiceAddress DEFAULT_SERVICE_ADDRESS = + WindmillServiceAddress.create(HostAndPort.fromHost("www.google.com")); + private ConfigAwareChannelFactory channelFactory; + + @Before + public void setUp() { + channelFactory = new ConfigAwareChannelFactory(0); + } + + @Test + public void testCreate_noInternalConfig() { + ManagedChannel channel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(channel).isNotInstanceOf(IsolationChannel.class); + } + + @Test + public void testCreate_isolationChannelEnabled() { + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(true) + .build()) + .build()); + ManagedChannel channel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(channel).isInstanceOf(IsolationChannel.class); + } + + @Test + public void testCreate_isolationChannelDisabled() { + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build()); + ManagedChannel channel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(channel).isNotInstanceOf(IsolationChannel.class); + } + + @Test + public void testCreate_afterNewJobConfig() { + ManagedChannel initialChannel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(initialChannel).isNotInstanceOf(IsolationChannel.class); + + // Consume a job config useWindmillIsolatedChannels = true. + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(true) + .build()) + .build()); + ManagedChannel isolationChannel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(isolationChannel).isInstanceOf(IsolationChannel.class); + + // Consume a job config useWindmillIsolatedChannels = false. + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build()); + ManagedChannel notIsolationChannel = + channelFactory.create(DEFAULT_FLOW_CONTROL_SETTINGS, DEFAULT_SERVICE_ADDRESS); + assertThat(notIsolationChannel).isNotInstanceOf(IsolationChannel.class); + } + + @Test + public void testTryConsumeJobConfig_initialJobConfig() { + assertTrue( + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build())); + } + + @Test + public void testTryConsumeJobConfig_sameIsolationChannelSetting() { + assertTrue( + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build())); + + assertFalse( + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build())); + } + + @Test + public void testTryConsumeJobConfig_differentIsolationChannelSetting() { + assertTrue( + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(false) + .build()) + .build())); + + assertTrue( + channelFactory.tryConsumeJobConfig( + StreamingGlobalConfig.builder() + .setUserWorkerJobSettings( + Windmill.UserWorkerRunnerV1Settings.newBuilder() + .setUseWindmillIsolatedChannels(true) + .build()) + .build())); + } +} diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java index 79297e51a55f..5eb045ca4959 100644 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java +++ b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactory.java @@ -57,4 +57,8 @@ public void remove(WindmillServiceAddress windmillServiceAddress) { public void shutdown() { channelCache.clear(); } + + public ChannelCache getChannelCache() { + return channelCache; + } } diff --git a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java deleted file mode 100644 index 51f8b8e14320..000000000000 --- a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/testing/FakeWindmillStubFactoryFactory.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.dataflow.worker.windmill.testing; - -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactory; -import org.apache.beam.runners.dataflow.worker.windmill.client.grpc.stubs.WindmillStubFactoryFactory; - -public class FakeWindmillStubFactoryFactory implements WindmillStubFactoryFactory { - - private final WindmillStubFactory windmillStubFactory; - - public FakeWindmillStubFactoryFactory(WindmillStubFactory windmillStubFactory) { - this.windmillStubFactory = windmillStubFactory; - } - - @Override - public WindmillStubFactory makeWindmillStubFactory(boolean useIsolatedChannels) { - return windmillStubFactory; - } -}