1919import com .google .common .base .Preconditions ;
2020import com .google .common .io .ByteSink ;
2121import com .google .common .util .concurrent .MoreExecutors ;
22+ import com .google .edwmigration .dumper .application .dumper .ConnectorArguments ;
23+ import com .google .edwmigration .dumper .application .dumper .annotations .RespectsInput ;
24+ import com .google .edwmigration .dumper .application .dumper .connector .AbstractConnector ;
25+ import com .google .edwmigration .dumper .application .dumper .handle .AbstractHandle ;
26+ import com .google .edwmigration .dumper .application .dumper .handle .Handle ;
27+ import com .google .edwmigration .dumper .application .dumper .task .AbstractTask ;
28+ import com .google .edwmigration .dumper .application .dumper .task .TaskRunContext ;
29+ import com .google .edwmigration .dumper .ext .hive .metastore .HiveMetastoreThriftClient ;
30+ import com .google .edwmigration .dumper .plugin .ext .jdk .concurrent .ExecutorManager ;
2231import com .google .errorprone .annotations .ForOverride ;
23- import java .io .IOException ;
2432import java .io .Writer ;
2533import java .nio .charset .StandardCharsets ;
2634import java .util .ArrayList ;
3341import javax .annotation .concurrent .ThreadSafe ;
3442import org .apache .thrift .transport .TTransportException ;
3543import org .checkerframework .checker .nullness .qual .NonNull ;
36- import com .google .edwmigration .dumper .application .dumper .ConnectorArguments ;
37- import com .google .edwmigration .dumper .application .dumper .annotations .RespectsInput ;
38- import com .google .edwmigration .dumper .application .dumper .connector .AbstractConnector ;
39- import com .google .edwmigration .dumper .application .dumper .handle .AbstractHandle ;
40- import com .google .edwmigration .dumper .application .dumper .handle .Handle ;
41- import com .google .edwmigration .dumper .application .dumper .task .AbstractTask ;
42- import com .google .edwmigration .dumper .application .dumper .task .TaskRunContext ;
43- import com .google .edwmigration .dumper .ext .hive .metastore .HiveMetastoreThriftClient ;
44- import com .google .edwmigration .dumper .plugin .ext .jdk .concurrent .ExecutorManager ;
4544import org .slf4j .Logger ;
4645import org .slf4j .LoggerFactory ;
4746
@@ -64,21 +63,28 @@ public abstract class AbstractHiveConnector extends AbstractConnector {
6463 @ ThreadSafe
6564 public static class ThriftClientPool implements AutoCloseable {
6665
66+ public interface ThriftClientConsumer {
67+ void accept (HiveMetastoreThriftClient thriftClient ) throws Exception ;
68+ }
69+
6770 @ Nonnull
6871 private final String name ;
6972 @ Nonnull
7073 private final ThreadLocal <? extends HiveMetastoreThriftClient > threadLocalThriftClient ;
7174 @ Nonnull
75+ private final ExecutorManager executorManager ;
76+ @ Nonnull
7277 private final ExecutorService executorService ;
7378 @ Nonnull
7479 private final Object lock = new Object ();
7580 @ GuardedBy ("lock" )
7681 @ Nonnull
7782 private final List <@ NonNull HiveMetastoreThriftClient > builtClients = new ArrayList <>();
7883
79- public ThriftClientPool (@ Nonnull String name , @ Nonnull HiveMetastoreThriftClient .Builder thriftClientBuilder , @ Nonnull ExecutorService executorService ) {
84+ public ThriftClientPool (@ Nonnull String name , @ Nonnull HiveMetastoreThriftClient .Builder thriftClientBuilder , int threadPoolSize ) {
8085 this .name = Preconditions .checkNotNull (name , "name was null." );
81- this .executorService = Preconditions .checkNotNull (executorService , "executorService was null." );
86+ this .executorService = ExecutorManager .newExecutorServiceWithBackpressure (name , threadPoolSize );
87+ this .executorManager = new ExecutorManager (executorService );
8288 this .threadLocalThriftClient = ThreadLocal .withInitial (() -> {
8389 String threadName = Thread .currentThread ().getName ();
8490 LOG .debug ("Creating new thread-local Thrift client '{}' owned by pooled client '{}'." , threadName , name );
@@ -94,22 +100,23 @@ public ThriftClientPool(@Nonnull String name, @Nonnull HiveMetastoreThriftClient
94100 });
95101 }
96102
97- @ Nonnull
98- public ExecutorService getExecutorService () {
99- return executorService ;
100- }
103+ public void execute (ThriftClientConsumer consumer ) {
104+ executorManager .execute (() -> {
105+ consumer .accept (getThreadLocalThriftClient ().get ());
106+ return null ;
107+ });
108+ }
101109
102110 @ Nonnull
103- public ThreadLocal <@ NonNull ? extends HiveMetastoreThriftClient > getThreadLocalThriftClient () {
111+ private ThreadLocal <@ NonNull ? extends HiveMetastoreThriftClient > getThreadLocalThriftClient () {
104112 return threadLocalThriftClient ;
105113 }
106114
107115 @ Override
108- public void close () throws IOException {
109- LOG .debug ("Closing pooled Thrift client '{}'." , name );
110- final int TIMEOUT = 30 ;
111- LOG .debug ("Shutting down thread pool backing pooled Thrift client '{}'; will wait up to {} seconds" , name , TIMEOUT );
112- MoreExecutors .shutdownAndAwaitTermination (executorService , TIMEOUT , TimeUnit .SECONDS );
116+ public void close () throws Exception {
117+ LOG .debug ("Shutting down thread pool backing pooled Thrift client '{}'" , name );
118+ executorManager .close ();
119+ MoreExecutors .shutdownAndAwaitTermination (executorService , 30 , TimeUnit .SECONDS );
113120 synchronized (lock ) {
114121 for (HiveMetastoreThriftClient client : builtClients ) {
115122 try {
@@ -147,7 +154,7 @@ public HiveMetastoreThriftClient newClient(@Nonnull String name) throws TTranspo
147154 @ Nonnull
148155 public ThriftClientPool newMultiThreadedThriftClientPool (@ Nonnull String name ) {
149156 LOG .debug ("Creating a new multi-threaded pooled Thrift client named '{}' backed by a thread pool of size {}." , name , threadPoolSize );
150- return new ThriftClientPool (name , thriftClientBuilder , ExecutorManager . newExecutorServiceWithBackpressure ( name , threadPoolSize ) );
157+ return new ThriftClientPool (name , thriftClientBuilder , threadPoolSize );
151158 }
152159 }
153160
0 commit comments