diff --git a/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactory.java b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactory.java index f439e6c10..5f8fd837b 100644 --- a/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactory.java +++ b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactory.java @@ -15,7 +15,6 @@ */ package com.facebook.drift.client; -import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.drift.client.address.AddressSelector; import com.facebook.drift.client.stats.MethodInvocationStat; import com.facebook.drift.client.stats.MethodInvocationStatsFactory; @@ -39,16 +38,14 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executor; import java.util.function.Supplier; -import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.drift.client.ExceptionClassifier.NORMAL_RESULT; import static com.facebook.drift.client.FilteredMethodInvoker.createFilteredMethodInvoker; import static com.facebook.drift.transport.MethodMetadata.toMethodMetadata; import static com.google.common.reflect.Reflection.newProxy; import static java.util.Objects.requireNonNull; -import static java.util.concurrent.Executors.newCachedThreadPool; public class DriftClientFactory { @@ -58,41 +55,47 @@ public class DriftClientFactory private final ExceptionClassifier exceptionClassifier; private final ConcurrentMap, ThriftServiceMetadata> serviceMetadataCache = new ConcurrentHashMap<>(); private final MethodInvocationStatsFactory methodInvocationStatsFactory; + private final Executor retryService; public DriftClientFactory( ThriftCodecManager codecManager, Supplier methodInvokerSupplier, AddressSelector addressSelector, ExceptionClassifier exceptionClassifier, - MethodInvocationStatsFactory methodInvocationStatsFactory) + MethodInvocationStatsFactory methodInvocationStatsFactory, + Executor retryService) { this.codecManager = requireNonNull(codecManager, "codecManager is null"); this.methodInvokerSupplier = requireNonNull(methodInvokerSupplier, "methodInvokerSupplier is null"); this.addressSelector = requireNonNull(addressSelector, "addressSelector is null"); this.exceptionClassifier = exceptionClassifier; this.methodInvocationStatsFactory = requireNonNull(methodInvocationStatsFactory, "methodInvocationStatsFactory is null"); + this.retryService = requireNonNull(retryService, "retryService is null"); } public DriftClientFactory( ThriftCodecManager codecManager, MethodInvokerFactory invokerFactory, AddressSelector addressSelector, - ExceptionClassifier exceptionClassifier) + ExceptionClassifier exceptionClassifier, + Executor retryService) { this( codecManager, () -> invokerFactory.createMethodInvoker(null), addressSelector, exceptionClassifier, - new NullMethodInvocationStatsFactory()); + new NullMethodInvocationStatsFactory(), + retryService); } public DriftClientFactory( ThriftCodecManager codecManager, MethodInvokerFactory invokerFactory, - AddressSelector addressSelector) + AddressSelector addressSelector, + Executor retryService) { - this(codecManager, invokerFactory, addressSelector, NORMAL_RESULT); + this(codecManager, invokerFactory, addressSelector, NORMAL_RESULT, retryService); } public DriftClient createDriftClient(Class clientInterface) @@ -114,10 +117,6 @@ public DriftClient createDriftClient( Optional qualifier = qualifierAnnotation.map(Class::getSimpleName); - // Create a bounded executor with a pool size at 4x number of processors - ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed(clientInterface != null ? clientInterface.getName() : "" + "-retry-service-%s")); - BoundedExecutor retryService = new BoundedExecutor(coreExecutor, 4 * Runtime.getRuntime().availableProcessors()); - ImmutableMap.Builder builder = ImmutableMap.builder(); for (ThriftMethodMetadata method : serviceMetadata.getMethods()) { MethodMetadata metadata = toMethodMetadata(codecManager, method); diff --git a/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryConfig.java b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryConfig.java new file mode 100644 index 000000000..4d99b5f10 --- /dev/null +++ b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryConfig.java @@ -0,0 +1,40 @@ +/* + * Copyright (C) 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.drift.client; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +import javax.validation.constraints.Min; + +public class DriftClientFactoryConfig +{ + private int numRetryServiceThreadCount = 2 * Runtime.getRuntime().availableProcessors(); + + @Min(1) + public int getNumRetryServiceThreadCount() + { + return numRetryServiceThreadCount; + } + + @Config("thrift.client.num-retry-service-thread-count") + @ConfigDescription("Number of threads of the executor which handles part of thrift request retries") + public DriftClientFactoryConfig setNumRetryServiceThreadCount(int numRetryServiceThreadCount) + { + this.numRetryServiceThreadCount = numRetryServiceThreadCount; + return this; + } +} diff --git a/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryManager.java b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryManager.java index aa7da3277..d4692187b 100644 --- a/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryManager.java +++ b/drift-client/src/main/java/com/facebook/drift/client/DriftClientFactoryManager.java @@ -21,6 +21,8 @@ import com.facebook.drift.codec.ThriftCodecManager; import com.facebook.drift.transport.client.MethodInvokerFactory; +import java.util.concurrent.Executor; + import static java.util.Objects.requireNonNull; public class DriftClientFactoryManager @@ -28,19 +30,22 @@ public class DriftClientFactoryManager private final ThriftCodecManager codecManager; private final MethodInvokerFactory methodInvokerFactory; private final MethodInvocationStatsFactory methodInvocationStatsFactory; + private final Executor retryService; - public DriftClientFactoryManager(ThriftCodecManager codecManager, MethodInvokerFactory methodInvokerFactory) + public DriftClientFactoryManager(ThriftCodecManager codecManager, MethodInvokerFactory methodInvokerFactory, Executor retryService) { - this(codecManager, methodInvokerFactory, new NullMethodInvocationStatsFactory()); + this(codecManager, methodInvokerFactory, new NullMethodInvocationStatsFactory(), retryService); } public DriftClientFactoryManager(ThriftCodecManager codecManager, MethodInvokerFactory methodInvokerFactory, - MethodInvocationStatsFactory methodInvocationStatsFactory) + MethodInvocationStatsFactory methodInvocationStatsFactory, + Executor retryService) { this.codecManager = requireNonNull(codecManager, "codecManager is null"); this.methodInvokerFactory = requireNonNull(methodInvokerFactory, "methodInvokerFactory is null"); this.methodInvocationStatsFactory = methodInvocationStatsFactory; + this.retryService = requireNonNull(retryService, "retryService is null"); } public DriftClientFactory createDriftClientFactory(I clientIdentity, AddressSelector addressSelector, ExceptionClassifier exceptionClassifier) @@ -50,6 +55,7 @@ public DriftClientFactory createDriftClientFactory(I clientIdentity, AddressSele () -> methodInvokerFactory.createMethodInvoker(clientIdentity), addressSelector, exceptionClassifier, - methodInvocationStatsFactory); + methodInvocationStatsFactory, + retryService); } } diff --git a/drift-client/src/main/java/com/facebook/drift/client/DriftMethodHandler.java b/drift-client/src/main/java/com/facebook/drift/client/DriftMethodHandler.java index 484f236d8..6037469a1 100644 --- a/drift-client/src/main/java/com/facebook/drift/client/DriftMethodHandler.java +++ b/drift-client/src/main/java/com/facebook/drift/client/DriftMethodHandler.java @@ -15,7 +15,6 @@ */ package com.facebook.drift.client; -import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.drift.client.address.AddressSelector; import com.facebook.drift.client.stats.MethodInvocationStat; import com.facebook.drift.codec.metadata.ThriftHeaderParameter; @@ -32,6 +31,7 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import static com.facebook.drift.client.DriftMethodInvocation.createDriftMethodInvocation; import static com.google.common.collect.ImmutableMap.toImmutableMap; @@ -47,7 +47,7 @@ class DriftMethodHandler private final AddressSelector addressSelector; private final RetryPolicy retryPolicy; private final MethodInvocationStat stat; - private final BoundedExecutor retryService; + private final Executor retryService; public DriftMethodHandler( MethodMetadata metadata, @@ -57,7 +57,7 @@ public DriftMethodHandler( AddressSelector addressSelector, RetryPolicy retryPolicy, MethodInvocationStat stat, - BoundedExecutor retryService) + Executor retryService) { this.metadata = requireNonNull(metadata, "metadata is null"); this.headerParameters = requireNonNull(headersParameters, "headersParameters is null").stream() diff --git a/drift-client/src/main/java/com/facebook/drift/client/DriftMethodInvocation.java b/drift-client/src/main/java/com/facebook/drift/client/DriftMethodInvocation.java index 68dd8362e..7fb5ab171 100644 --- a/drift-client/src/main/java/com/facebook/drift/client/DriftMethodInvocation.java +++ b/drift-client/src/main/java/com/facebook/drift/client/DriftMethodInvocation.java @@ -15,7 +15,6 @@ */ package com.facebook.drift.client; -import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.log.Logger; import com.facebook.drift.TException; import com.facebook.drift.client.address.AddressSelector; @@ -44,6 +43,7 @@ import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; import static com.facebook.drift.client.ExceptionClassification.HostStatus.DOWN; import static com.facebook.drift.client.ExceptionClassification.HostStatus.NORMAL; @@ -69,7 +69,7 @@ class DriftMethodInvocation private final Optional addressSelectionContext; private final MethodInvocationStat stat; private final Ticker ticker; - private final BoundedExecutor retryService; + private final Executor retryService; private final long startTime; @GuardedBy("this") @@ -98,7 +98,7 @@ static DriftMethodInvocation createDriftMethodInvocation( Optional addressSelectionContext, MethodInvocationStat stat, Ticker ticker, - BoundedExecutor retryService) + Executor retryService) { DriftMethodInvocation invocation = new DriftMethodInvocation<>( invoker, @@ -126,7 +126,7 @@ private DriftMethodInvocation( Optional addressSelectionContext, MethodInvocationStat stat, Ticker ticker, - BoundedExecutor retryService) + Executor retryService) { this.invoker = requireNonNull(invoker, "methodHandler is null"); this.metadata = requireNonNull(metadata, "metadata is null"); diff --git a/drift-client/src/main/java/com/facebook/drift/client/ForRetryService.java b/drift-client/src/main/java/com/facebook/drift/client/ForRetryService.java new file mode 100644 index 000000000..9c35d7db6 --- /dev/null +++ b/drift-client/src/main/java/com/facebook/drift/client/ForRetryService.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.drift.client; + +public @interface ForRetryService +{ +} diff --git a/drift-client/src/main/java/com/facebook/drift/client/guice/DriftClientBinder.java b/drift-client/src/main/java/com/facebook/drift/client/guice/DriftClientBinder.java index b70d20c54..6b78d1941 100644 --- a/drift-client/src/main/java/com/facebook/drift/client/guice/DriftClientBinder.java +++ b/drift-client/src/main/java/com/facebook/drift/client/guice/DriftClientBinder.java @@ -15,11 +15,14 @@ */ package com.facebook.drift.client.guice; +import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.configuration.ConfigDefaults; import com.facebook.drift.client.DriftClient; import com.facebook.drift.client.DriftClientFactory; +import com.facebook.drift.client.DriftClientFactoryConfig; import com.facebook.drift.client.DriftClientFactoryManager; import com.facebook.drift.client.ExceptionClassifier; +import com.facebook.drift.client.ForRetryService; import com.facebook.drift.client.MethodInvocationFilter; import com.facebook.drift.client.address.AddressSelector; import com.facebook.drift.client.stats.JmxMethodInvocationStatsFactory; @@ -50,7 +53,9 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.concurrent.Executor; +import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed; import static com.facebook.airlift.configuration.ConfigBinder.configBinder; import static com.facebook.drift.client.ExceptionClassifier.mergeExceptionClassifiers; import static com.facebook.drift.client.guice.DriftClientAnnotationFactory.extractDriftClientBindingAnnotation; @@ -59,6 +64,7 @@ import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newCachedThreadPool; public class DriftClientBinder { @@ -233,6 +239,7 @@ private static class DriftClientBinderModule public void configure(Binder binder) { newSetBinder(binder, ExceptionClassifier.class); + configBinder(binder).bindConfig(DriftClientFactoryConfig.class); newOptionalBinder(binder, MBeanExporter.class); newOptionalBinder(binder, MethodInvocationStatsFactory.class) .setDefault() @@ -240,14 +247,23 @@ public void configure(Binder binder) .in(Scopes.SINGLETON); } + @Provides + @Singleton + @ForRetryService + private static Executor getRetryService(DriftClientFactoryConfig config) + { + return new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed("drift-client-retry-service-%s")), config.getNumRetryServiceThreadCount()); + } + @Provides @Singleton private static DriftClientFactoryManager getDriftClientFactory( ThriftCodecManager codecManager, MethodInvokerFactory methodInvokerFactory, - MethodInvocationStatsFactory methodInvocationStatsFactory) + MethodInvocationStatsFactory methodInvocationStatsFactory, + @ForRetryService Executor retryService) { - return new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, methodInvocationStatsFactory); + return new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, methodInvocationStatsFactory, retryService); } @Override diff --git a/drift-client/src/test/java/com/facebook/drift/client/TestDriftClient.java b/drift-client/src/test/java/com/facebook/drift/client/TestDriftClient.java index c18de7fba..e2d78bed0 100644 --- a/drift-client/src/test/java/com/facebook/drift/client/TestDriftClient.java +++ b/drift-client/src/test/java/com/facebook/drift/client/TestDriftClient.java @@ -61,6 +61,7 @@ import static com.facebook.drift.client.guice.MethodInvocationFilterBinder.staticFilterBinder; import static com.google.common.base.Throwables.getCausalChain; import static com.google.common.base.Throwables.getRootCause; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static com.google.inject.multibindings.Multibinder.newSetBinder; import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder; import static java.lang.annotation.ElementType.FIELD; @@ -92,7 +93,7 @@ public void testInvoker() TestingMethodInvocationStatsFactory statsFactory = new TestingMethodInvocationStatsFactory(); List classifiers = ImmutableList.of(new TestingExceptionClassifier(), new TestingExceptionClassifier(), new TestingExceptionClassifier()); - DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, statsFactory); + DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, statsFactory, directExecutor()); DriftClientFactory driftClientFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", new MockAddressSelector(), mergeExceptionClassifiers(classifiers)); DriftClient driftClient = driftClientFactory.createDriftClient(Client.class, Optional.empty(), ImmutableList.of(), new DriftClientConfig()); @@ -114,7 +115,7 @@ public void testFilter() TestingMethodInvocationStatsFactory statsFactory = new TestingMethodInvocationStatsFactory(); List classifiers = ImmutableList.of(new TestingExceptionClassifier(), new TestingExceptionClassifier(), new TestingExceptionClassifier()); - DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(codecManager, invokerFactory, statsFactory); + DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(codecManager, invokerFactory, statsFactory, directExecutor()); DriftClientFactory driftClientFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", new MockAddressSelector(), mergeExceptionClassifiers(classifiers)); DriftClient driftClient = driftClientFactory.createDriftClient( diff --git a/drift-client/src/test/java/com/facebook/drift/client/TestDriftClientFactoryConfig.java b/drift-client/src/test/java/com/facebook/drift/client/TestDriftClientFactoryConfig.java new file mode 100644 index 000000000..27653b5f4 --- /dev/null +++ b/drift-client/src/test/java/com/facebook/drift/client/TestDriftClientFactoryConfig.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2013 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.drift.client; + +import com.google.common.collect.ImmutableMap; +import org.testng.annotations.Test; + +import java.util.Map; + +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertFullMapping; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults; +import static com.facebook.airlift.configuration.testing.ConfigAssertions.recordDefaults; + +public class TestDriftClientFactoryConfig +{ + @Test + public void testDefaults() + { + assertRecordedDefaults(recordDefaults(DriftClientFactoryConfig.class) + .setNumRetryServiceThreadCount(Runtime.getRuntime().availableProcessors() * 2)); + } + + @Test + public void testExplicitPropertyMappings() + { + Map properties = new ImmutableMap.Builder() + .put("thrift.client.num-retry-service-thread-count", "99") + .build(); + + DriftClientFactoryConfig expected = new DriftClientFactoryConfig() + .setNumRetryServiceThreadCount(99); + + assertFullMapping(properties, expected); + } +} diff --git a/drift-integration-tests/src/test/java/com/facebook/drift/integration/ApacheThriftTesterUtil.java b/drift-integration-tests/src/test/java/com/facebook/drift/integration/ApacheThriftTesterUtil.java index 3ad79e313..396924cb1 100644 --- a/drift-integration-tests/src/test/java/com/facebook/drift/integration/ApacheThriftTesterUtil.java +++ b/drift-integration-tests/src/test/java/com/facebook/drift/integration/ApacheThriftTesterUtil.java @@ -45,6 +45,7 @@ import static com.facebook.drift.transport.apache.client.ApacheThriftMethodInvokerFactory.createStaticApacheThriftMethodInvokerFactory; import static com.facebook.drift.transport.netty.codec.Protocol.FB_COMPACT; import static com.facebook.drift.transport.netty.codec.Transport.HEADER; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static org.testng.Assert.assertEquals; final class ApacheThriftTesterUtil @@ -81,7 +82,7 @@ private static int logApacheThriftDriftClient( .setSslEnabled(secure); ApacheThriftConnectionFactoryConfig factoryConfig = new ApacheThriftConnectionFactoryConfig(); try (ApacheThriftMethodInvokerFactory methodInvokerFactory = new ApacheThriftMethodInvokerFactory<>(factoryConfig, clientIdentity -> config)) { - DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory); + DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory, directExecutor()); DriftClientFactory proxyFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", addressSelector, NORMAL_RESULT); DriftScribe scribe = proxyFactory.createDriftClient(DriftScribe.class, Optional.empty(), filters, new DriftClientConfig()).get(); @@ -115,7 +116,7 @@ private static int logApacheThriftStaticDriftClient( .setSslEnabled(secure); try (ApacheThriftMethodInvokerFactory methodInvokerFactory = createStaticApacheThriftMethodInvokerFactory(config)) { - DriftClientFactory proxyFactory = new DriftClientFactory(CODEC_MANAGER, methodInvokerFactory, addressSelector); + DriftClientFactory proxyFactory = new DriftClientFactory(CODEC_MANAGER, methodInvokerFactory, addressSelector, directExecutor()); DriftScribe scribe = proxyFactory.createDriftClient(DriftScribe.class, Optional.empty(), filters, new DriftClientConfig()).get(); @@ -148,7 +149,7 @@ private static int logApacheThriftDriftClientAsync( .setSslEnabled(secure); ApacheThriftConnectionFactoryConfig factoryConfig = new ApacheThriftConnectionFactoryConfig(); try (ApacheThriftMethodInvokerFactory methodInvokerFactory = new ApacheThriftMethodInvokerFactory<>(factoryConfig, clientIdentity -> config)) { - DriftClientFactoryManager proxyFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory); + DriftClientFactoryManager proxyFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory, directExecutor()); DriftClientFactory proxyFactory = proxyFactoryManager.createDriftClientFactory("myFactory", addressSelector, NORMAL_RESULT); DriftAsyncScribe scribe = proxyFactory.createDriftClient(DriftAsyncScribe.class, Optional.empty(), filters, new DriftClientConfig()).get(); diff --git a/drift-integration-tests/src/test/java/com/facebook/drift/integration/DriftNettyTesterUtil.java b/drift-integration-tests/src/test/java/com/facebook/drift/integration/DriftNettyTesterUtil.java index 3f36effdf..7ee8e0cc3 100644 --- a/drift-integration-tests/src/test/java/com/facebook/drift/integration/DriftNettyTesterUtil.java +++ b/drift-integration-tests/src/test/java/com/facebook/drift/integration/DriftNettyTesterUtil.java @@ -46,6 +46,7 @@ import static com.facebook.drift.transport.netty.client.DriftNettyMethodInvokerFactory.createStaticDriftNettyMethodInvokerFactory; import static com.facebook.drift.transport.netty.codec.Protocol.COMPACT; import static com.facebook.drift.transport.netty.codec.Transport.HEADER; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static org.testng.Assert.assertEquals; final class DriftNettyTesterUtil @@ -86,7 +87,7 @@ private static int logNettyDriftClient( new DriftNettyConnectionFactoryConfig(), clientIdentity -> config, testingAllocator)) { - DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory); + DriftClientFactoryManager clientFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory, directExecutor()); DriftClientFactory proxyFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", addressSelector, NORMAL_RESULT); DriftScribe scribe = proxyFactory.createDriftClient(DriftScribe.class, Optional.empty(), filters, new DriftClientConfig()).get(); @@ -121,7 +122,7 @@ private static int logNettyStaticDriftClient( try (TestingPooledByteBufAllocator testingAllocator = new TestingPooledByteBufAllocator(); DriftNettyMethodInvokerFactory methodInvokerFactory = createStaticDriftNettyMethodInvokerFactory(config, testingAllocator)) { - DriftClientFactory proxyFactory = new DriftClientFactory(CODEC_MANAGER, methodInvokerFactory, addressSelector); + DriftClientFactory proxyFactory = new DriftClientFactory(CODEC_MANAGER, methodInvokerFactory, addressSelector, directExecutor()); DriftScribe scribe = proxyFactory.createDriftClient(DriftScribe.class, Optional.empty(), filters, new DriftClientConfig()).get(); @@ -158,7 +159,7 @@ private static int logNettyDriftClientAsync( new DriftNettyConnectionFactoryConfig(), clientIdentity -> config, testingAllocator)) { - DriftClientFactoryManager proxyFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory); + DriftClientFactoryManager proxyFactoryManager = new DriftClientFactoryManager<>(CODEC_MANAGER, methodInvokerFactory, directExecutor()); DriftClientFactory proxyFactory = proxyFactoryManager.createDriftClientFactory("myFactory", addressSelector, NORMAL_RESULT); DriftAsyncScribe scribe = proxyFactory.createDriftClient(DriftAsyncScribe.class, Optional.empty(), filters, new DriftClientConfig()).get();