Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.facebook.drift.client;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.drift.client.address.AddressSelector;
import com.facebook.drift.client.stats.MethodInvocationStat;
import com.facebook.drift.client.stats.MethodInvocationStatsFactory;
Expand All @@ -39,16 +38,14 @@
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executor;
import java.util.function.Supplier;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.drift.client.ExceptionClassifier.NORMAL_RESULT;
import static com.facebook.drift.client.FilteredMethodInvoker.createFilteredMethodInvoker;
import static com.facebook.drift.transport.MethodMetadata.toMethodMetadata;
import static com.google.common.reflect.Reflection.newProxy;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class DriftClientFactory
{
Expand All @@ -58,41 +55,47 @@ public class DriftClientFactory
private final ExceptionClassifier exceptionClassifier;
private final ConcurrentMap<Class<?>, ThriftServiceMetadata> serviceMetadataCache = new ConcurrentHashMap<>();
private final MethodInvocationStatsFactory methodInvocationStatsFactory;
private final Executor retryService;

public DriftClientFactory(
ThriftCodecManager codecManager,
Supplier<MethodInvoker> methodInvokerSupplier,
AddressSelector<? extends Address> addressSelector,
ExceptionClassifier exceptionClassifier,
MethodInvocationStatsFactory methodInvocationStatsFactory)
MethodInvocationStatsFactory methodInvocationStatsFactory,
Executor retryService)
{
this.codecManager = requireNonNull(codecManager, "codecManager is null");
this.methodInvokerSupplier = requireNonNull(methodInvokerSupplier, "methodInvokerSupplier is null");
this.addressSelector = requireNonNull(addressSelector, "addressSelector is null");
this.exceptionClassifier = exceptionClassifier;
this.methodInvocationStatsFactory = requireNonNull(methodInvocationStatsFactory, "methodInvocationStatsFactory is null");
this.retryService = requireNonNull(retryService, "retryService is null");
}

public DriftClientFactory(
ThriftCodecManager codecManager,
MethodInvokerFactory<?> invokerFactory,
AddressSelector<? extends Address> addressSelector,
ExceptionClassifier exceptionClassifier)
ExceptionClassifier exceptionClassifier,
Executor retryService)
{
this(
codecManager,
() -> invokerFactory.createMethodInvoker(null),
addressSelector,
exceptionClassifier,
new NullMethodInvocationStatsFactory());
new NullMethodInvocationStatsFactory(),
retryService);
}

public DriftClientFactory(
ThriftCodecManager codecManager,
MethodInvokerFactory<?> invokerFactory,
AddressSelector<? extends Address> addressSelector)
AddressSelector<? extends Address> addressSelector,
Executor retryService)
{
this(codecManager, invokerFactory, addressSelector, NORMAL_RESULT);
this(codecManager, invokerFactory, addressSelector, NORMAL_RESULT, retryService);
}

public <T> DriftClient<T> createDriftClient(Class<T> clientInterface)
Expand All @@ -114,10 +117,6 @@ public <T> DriftClient<T> createDriftClient(

Optional<String> qualifier = qualifierAnnotation.map(Class::getSimpleName);

// Create a bounded executor with a pool size at 4x number of processors
ExecutorService coreExecutor = newCachedThreadPool(daemonThreadsNamed(clientInterface != null ? clientInterface.getName() : "" + "-retry-service-%s"));
BoundedExecutor retryService = new BoundedExecutor(coreExecutor, 4 * Runtime.getRuntime().availableProcessors());

ImmutableMap.Builder<Method, DriftMethodHandler> builder = ImmutableMap.builder();
for (ThriftMethodMetadata method : serviceMetadata.getMethods()) {
MethodMetadata metadata = toMethodMetadata(codecManager, method);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (C) 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.drift.client;

import com.facebook.airlift.configuration.Config;
import com.facebook.airlift.configuration.ConfigDescription;

import javax.validation.constraints.Min;

public class DriftClientFactoryConfig
{
private int numRetryServiceThreadCount = 2 * Runtime.getRuntime().availableProcessors();

@Min(1)
public int getNumRetryServiceThreadCount()
{
return numRetryServiceThreadCount;
}

@Config("thrift.client.num-retry-service-thread-count")
@ConfigDescription("Number of threads of the executor which handles part of thrift request retries")
public DriftClientFactoryConfig setNumRetryServiceThreadCount(int numRetryServiceThreadCount)
{
this.numRetryServiceThreadCount = numRetryServiceThreadCount;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,31 @@
import com.facebook.drift.codec.ThriftCodecManager;
import com.facebook.drift.transport.client.MethodInvokerFactory;

import java.util.concurrent.Executor;

import static java.util.Objects.requireNonNull;

public class DriftClientFactoryManager<I>
{
private final ThriftCodecManager codecManager;
private final MethodInvokerFactory<I> methodInvokerFactory;
private final MethodInvocationStatsFactory methodInvocationStatsFactory;
private final Executor retryService;

public DriftClientFactoryManager(ThriftCodecManager codecManager, MethodInvokerFactory<I> methodInvokerFactory)
public DriftClientFactoryManager(ThriftCodecManager codecManager, MethodInvokerFactory<I> methodInvokerFactory, Executor retryService)
{
this(codecManager, methodInvokerFactory, new NullMethodInvocationStatsFactory());
this(codecManager, methodInvokerFactory, new NullMethodInvocationStatsFactory(), retryService);
}

public DriftClientFactoryManager(ThriftCodecManager codecManager,
MethodInvokerFactory<I> methodInvokerFactory,
MethodInvocationStatsFactory methodInvocationStatsFactory)
MethodInvocationStatsFactory methodInvocationStatsFactory,
Executor retryService)
{
this.codecManager = requireNonNull(codecManager, "codecManager is null");
this.methodInvokerFactory = requireNonNull(methodInvokerFactory, "methodInvokerFactory is null");
this.methodInvocationStatsFactory = methodInvocationStatsFactory;
this.retryService = requireNonNull(retryService, "retryService is null");
}

public DriftClientFactory createDriftClientFactory(I clientIdentity, AddressSelector<?> addressSelector, ExceptionClassifier exceptionClassifier)
Expand All @@ -50,6 +55,7 @@ public DriftClientFactory createDriftClientFactory(I clientIdentity, AddressSele
() -> methodInvokerFactory.createMethodInvoker(clientIdentity),
addressSelector,
exceptionClassifier,
methodInvocationStatsFactory);
methodInvocationStatsFactory,
retryService);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.facebook.drift.client;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.drift.client.address.AddressSelector;
import com.facebook.drift.client.stats.MethodInvocationStat;
import com.facebook.drift.codec.metadata.ThriftHeaderParameter;
Expand All @@ -32,6 +31,7 @@
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;

import static com.facebook.drift.client.DriftMethodInvocation.createDriftMethodInvocation;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
Expand All @@ -47,7 +47,7 @@ class DriftMethodHandler
private final AddressSelector<? extends Address> addressSelector;
private final RetryPolicy retryPolicy;
private final MethodInvocationStat stat;
private final BoundedExecutor retryService;
private final Executor retryService;

public DriftMethodHandler(
MethodMetadata metadata,
Expand All @@ -57,7 +57,7 @@ public DriftMethodHandler(
AddressSelector<? extends Address> addressSelector,
RetryPolicy retryPolicy,
MethodInvocationStat stat,
BoundedExecutor retryService)
Executor retryService)
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.headerParameters = requireNonNull(headersParameters, "headersParameters is null").stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.facebook.drift.client;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.log.Logger;
import com.facebook.drift.TException;
import com.facebook.drift.client.address.AddressSelector;
Expand Down Expand Up @@ -44,6 +43,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;

import static com.facebook.drift.client.ExceptionClassification.HostStatus.DOWN;
import static com.facebook.drift.client.ExceptionClassification.HostStatus.NORMAL;
Expand All @@ -69,7 +69,7 @@ class DriftMethodInvocation<A extends Address>
private final Optional<String> addressSelectionContext;
private final MethodInvocationStat stat;
private final Ticker ticker;
private final BoundedExecutor retryService;
private final Executor retryService;
private final long startTime;

@GuardedBy("this")
Expand Down Expand Up @@ -98,7 +98,7 @@ static <A extends Address> DriftMethodInvocation<A> createDriftMethodInvocation(
Optional<String> addressSelectionContext,
MethodInvocationStat stat,
Ticker ticker,
BoundedExecutor retryService)
Executor retryService)
{
DriftMethodInvocation<A> invocation = new DriftMethodInvocation<>(
invoker,
Expand Down Expand Up @@ -126,7 +126,7 @@ private DriftMethodInvocation(
Optional<String> addressSelectionContext,
MethodInvocationStat stat,
Ticker ticker,
BoundedExecutor retryService)
Executor retryService)
{
this.invoker = requireNonNull(invoker, "methodHandler is null");
this.metadata = requireNonNull(metadata, "metadata is null");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright (C) 2013 Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.drift.client;

public @interface ForRetryService
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,14 @@
*/
package com.facebook.drift.client.guice;

import com.facebook.airlift.concurrent.BoundedExecutor;
import com.facebook.airlift.configuration.ConfigDefaults;
import com.facebook.drift.client.DriftClient;
import com.facebook.drift.client.DriftClientFactory;
import com.facebook.drift.client.DriftClientFactoryConfig;
import com.facebook.drift.client.DriftClientFactoryManager;
import com.facebook.drift.client.ExceptionClassifier;
import com.facebook.drift.client.ForRetryService;
import com.facebook.drift.client.MethodInvocationFilter;
import com.facebook.drift.client.address.AddressSelector;
import com.facebook.drift.client.stats.JmxMethodInvocationStatsFactory;
Expand Down Expand Up @@ -50,7 +53,9 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;

import static com.facebook.airlift.concurrent.Threads.daemonThreadsNamed;
import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
import static com.facebook.drift.client.ExceptionClassifier.mergeExceptionClassifiers;
import static com.facebook.drift.client.guice.DriftClientAnnotationFactory.extractDriftClientBindingAnnotation;
Expand All @@ -59,6 +64,7 @@
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newCachedThreadPool;

public class DriftClientBinder
{
Expand Down Expand Up @@ -233,21 +239,31 @@ private static class DriftClientBinderModule
public void configure(Binder binder)
{
newSetBinder(binder, ExceptionClassifier.class);
configBinder(binder).bindConfig(DriftClientFactoryConfig.class);
newOptionalBinder(binder, MBeanExporter.class);
newOptionalBinder(binder, MethodInvocationStatsFactory.class)
.setDefault()
.toProvider(DefaultMethodInvocationStatsFactoryProvider.class)
.in(Scopes.SINGLETON);
}

@Provides
@Singleton
@ForRetryService
private static Executor getRetryService(DriftClientFactoryConfig config)
{
return new BoundedExecutor(newCachedThreadPool(daemonThreadsNamed("drift-client-retry-service-%s")), config.getNumRetryServiceThreadCount());
}

@Provides
@Singleton
private static DriftClientFactoryManager<Annotation> getDriftClientFactory(
ThriftCodecManager codecManager,
MethodInvokerFactory<Annotation> methodInvokerFactory,
MethodInvocationStatsFactory methodInvocationStatsFactory)
MethodInvocationStatsFactory methodInvocationStatsFactory,
@ForRetryService Executor retryService)
{
return new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, methodInvocationStatsFactory);
return new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, methodInvocationStatsFactory, retryService);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import static com.facebook.drift.client.guice.MethodInvocationFilterBinder.staticFilterBinder;
import static com.google.common.base.Throwables.getCausalChain;
import static com.google.common.base.Throwables.getRootCause;
import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static java.lang.annotation.ElementType.FIELD;
Expand Down Expand Up @@ -92,7 +93,7 @@ public void testInvoker()
TestingMethodInvocationStatsFactory statsFactory = new TestingMethodInvocationStatsFactory();
List<TestingExceptionClassifier> classifiers = ImmutableList.of(new TestingExceptionClassifier(), new TestingExceptionClassifier(), new TestingExceptionClassifier());

DriftClientFactoryManager<String> clientFactoryManager = new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, statsFactory);
DriftClientFactoryManager<String> clientFactoryManager = new DriftClientFactoryManager<>(codecManager, methodInvokerFactory, statsFactory, directExecutor());
DriftClientFactory driftClientFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", new MockAddressSelector(), mergeExceptionClassifiers(classifiers));

DriftClient<Client> driftClient = driftClientFactory.createDriftClient(Client.class, Optional.empty(), ImmutableList.of(), new DriftClientConfig());
Expand All @@ -114,7 +115,7 @@ public void testFilter()
TestingMethodInvocationStatsFactory statsFactory = new TestingMethodInvocationStatsFactory();
List<TestingExceptionClassifier> classifiers = ImmutableList.of(new TestingExceptionClassifier(), new TestingExceptionClassifier(), new TestingExceptionClassifier());

DriftClientFactoryManager<String> clientFactoryManager = new DriftClientFactoryManager<>(codecManager, invokerFactory, statsFactory);
DriftClientFactoryManager<String> clientFactoryManager = new DriftClientFactoryManager<>(codecManager, invokerFactory, statsFactory, directExecutor());
DriftClientFactory driftClientFactory = clientFactoryManager.createDriftClientFactory("clientIdentity", new MockAddressSelector(), mergeExceptionClassifiers(classifiers));

DriftClient<Client> driftClient = driftClientFactory.createDriftClient(
Expand Down
Loading