Skip to content

Commit a892fab

Browse files
committed
HIVE-29385: Use RetryingExecutor else where in HMS
1 parent 3d58586 commit a892fab

File tree

6 files changed

+170
-308
lines changed

6 files changed

+170
-308
lines changed

standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/RetryingMetaStoreClient.java

Lines changed: 48 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import java.io.IOException;
2222
import java.lang.annotation.Annotation;
2323
import java.lang.reflect.InvocationHandler;
24-
import java.lang.reflect.InvocationTargetException;
2524
import java.lang.reflect.Method;
2625
import java.lang.reflect.Proxy;
2726
import java.lang.reflect.UndeclaredThrowableException;
@@ -31,6 +30,8 @@
3130
import java.util.Iterator;
3231
import java.util.Map;
3332
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.function.Predicate;
3435
import java.util.function.Supplier;
3536

3637
import org.apache.hadoop.classification.InterfaceAudience;
@@ -39,13 +40,13 @@
3940
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
4041
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
4142
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
43+
import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
4244
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
4345
import org.slf4j.Logger;
4446
import org.slf4j.LoggerFactory;
4547
import org.apache.hadoop.hive.metastore.annotation.NoReconnect;
4648
import org.apache.hadoop.hive.metastore.api.MetaException;
4749
import org.apache.hadoop.security.UserGroupInformation;
48-
import org.apache.thrift.TException;
4950
import org.apache.thrift.transport.TTransportException;
5051

5152
import com.google.common.annotations.VisibleForTesting;
@@ -71,7 +72,6 @@ public class RetryingMetaStoreClient implements InvocationHandler {
7172
private long lastConnectionTime;
7273
private boolean localMetaStore;
7374

74-
7575
protected RetryingMetaStoreClient(Configuration conf, Class<?>[] constructorArgTypes,
7676
Object[] constructorArgs, Map<String, Long> metaCallTimeMap,
7777
Class<? extends IMetaStoreClient> msClientClass) throws MetaException {
@@ -174,11 +174,6 @@ private static IMetaStoreClient getProxy(Class<?>[] interfaces,
174174

175175
@Override
176176
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
177-
Object ret;
178-
int retriesMade = 0;
179-
TException caughtException;
180-
181-
boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
182177
boolean allowRetry = true;
183178
Annotation[] directives = method.getDeclaredAnnotations();
184179
if(directives != null) {
@@ -188,73 +183,59 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
188183
}
189184
}
190185
}
186+
boolean finalAllowRetry = allowRetry;
187+
Predicate<Throwable> retryPolicy = ex -> finalAllowRetry && !base.isLocalMetaStore() &&
188+
TTransportException.class.isAssignableFrom(ex.getClass());
189+
AtomicInteger retriesMade = new AtomicInteger(0);
190+
return new RetryingExecutor<>(retryLimit, () -> {
191+
reconnect(method, retriesMade);
192+
Object ret;
193+
if (metaCallTimeMap == null) {
194+
ret = method.invoke(base, args);
195+
} else {
196+
// need to capture the timing
197+
long startTime = System.currentTimeMillis();
198+
ret = method.invoke(base, args);
199+
long timeTaken = System.currentTimeMillis() - startTime;
200+
addMethodTime(method, timeTaken);
201+
}
202+
return ret;
203+
}).sleepInterval(retryDelaySeconds * 1000)
204+
.onRetry(retryPolicy)
205+
.run();
206+
}
191207

192-
while (true) {
193-
try {
194-
SecurityUtils.reloginExpiringKeytabUser();
195-
196-
if (allowReconnect) {
197-
if (retriesMade > 0 || hasConnectionLifeTimeReached(method)) {
198-
if (this.ugi != null) {
199-
// Perform reconnect with the proper user context
200-
try {
201-
LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi);
202-
203-
this.ugi.doAs(
204-
new PrivilegedExceptionAction<Object> () {
205-
@Override
206-
public Object run() throws MetaException {
207-
base.reconnect();
208-
return null;
209-
}
210-
});
211-
} catch (UndeclaredThrowableException e) {
212-
Throwable te = e.getCause();
213-
if (te instanceof PrivilegedActionException) {
214-
throw te.getCause();
215-
} else {
216-
throw te;
217-
}
218-
}
219-
lastConnectionTime = System.currentTimeMillis();
208+
private void reconnect(Method method, AtomicInteger retriesMade) throws Exception {
209+
SecurityUtils.reloginExpiringKeytabUser();
210+
boolean allowReconnect = ! method.isAnnotationPresent(NoReconnect.class);
211+
if (allowReconnect) {
212+
if (retriesMade.getAndIncrement() > 0 || hasConnectionLifeTimeReached(method)) {
213+
if (this.ugi != null) {
214+
// Perform reconnect with the proper user context
215+
try {
216+
LOG.info("RetryingMetaStoreClient trying reconnect as " + this.ugi);
217+
218+
this.ugi.doAs((PrivilegedExceptionAction<Object>) () -> {
219+
base.reconnect();
220+
return null;
221+
});
222+
} catch (UndeclaredThrowableException e) {
223+
Throwable te = e.getCause();
224+
if (te instanceof PrivilegedActionException pe) {
225+
throw pe;
226+
} else if (te instanceof Exception ex){
227+
throw ex;
220228
} else {
221-
LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information.");
222-
throw new MetaException("UGI information unavailable. Will not attempt a reconnect.");
229+
throw new RuntimeException(te);
223230
}
224231
}
225-
}
226-
227-
if (metaCallTimeMap == null) {
228-
ret = method.invoke(base, args);
232+
lastConnectionTime = System.currentTimeMillis();
229233
} else {
230-
// need to capture the timing
231-
long startTime = System.currentTimeMillis();
232-
ret = method.invoke(base, args);
233-
long timeTaken = System.currentTimeMillis() - startTime;
234-
addMethodTime(method, timeTaken);
234+
LOG.warn("RetryingMetaStoreClient unable to reconnect. No UGI information.");
235+
throw new MetaException("UGI information unavailable. Will not attempt a reconnect.");
235236
}
236-
break;
237-
} catch (UndeclaredThrowableException e) {
238-
throw e.getCause();
239-
} catch (InvocationTargetException e) {
240-
Throwable t = e.getCause();
241-
// Metastore client needs retry for only TTransportException.
242-
if (TTransportException.class.isAssignableFrom(t.getClass())) {
243-
caughtException = (TTransportException) t;
244-
} else {
245-
throw t;
246-
}
247-
}
248-
249-
if (retriesMade >= retryLimit || base.isLocalMetaStore() || !allowRetry) {
250-
throw caughtException;
251237
}
252-
retriesMade++;
253-
LOG.warn("MetaStoreClient lost connection. Attempting to reconnect (" + retriesMade + " of " +
254-
retryLimit + ") after " + retryDelaySeconds + "s. " + method.getName(), caughtException);
255-
Thread.sleep(retryDelaySeconds * 1000);
256238
}
257-
return ret;
258239
}
259240

260241
/**

standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java

Lines changed: 58 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.base.Preconditions;
23+
import com.google.common.base.Predicate;
2324
import com.google.common.collect.Lists;
2425
import org.apache.commons.lang3.tuple.Pair;
2526
import org.apache.hadoop.classification.InterfaceAudience;
@@ -44,6 +45,7 @@
4445
import org.apache.hadoop.hive.metastore.utils.FilterUtils;
4546
import org.apache.hadoop.hive.metastore.utils.JavaUtils;
4647
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
48+
import org.apache.hadoop.hive.metastore.utils.RetryingExecutor;
4749
import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
4850
import org.apache.hadoop.security.UserGroupInformation;
4951
import org.apache.hadoop.util.ReflectionUtils;
@@ -92,6 +94,7 @@
9294
import java.util.Random;
9395
import java.util.concurrent.TimeUnit;
9496
import java.util.concurrent.atomic.AtomicInteger;
97+
import java.util.concurrent.atomic.AtomicReference;
9598

9699
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.createThriftPartitionsReq;
97100
import static org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
@@ -676,8 +679,6 @@ private TTransport createBinaryClient(URI store, boolean useSSL) throws TTranspo
676679

677680
private void open() throws MetaException {
678681
isConnected = false;
679-
TTransportException tte = null;
680-
MetaException recentME = null;
681682
boolean useSSL = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_SSL);
682683
boolean useSasl = MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.USE_THRIFT_SASL);
683684
String clientAuthMode = MetastoreConf.getVar(conf, MetastoreConf.ConfVars.METASTORE_CLIENT_AUTH_MODE);
@@ -689,91 +690,61 @@ private void open() throws MetaException {
689690
if (clientAuthMode != null) {
690691
usePasswordAuth = "PLAIN".equalsIgnoreCase(clientAuthMode);
691692
}
692-
693-
for (int attempt = 0; !isConnected && attempt < retries; ++attempt) {
694-
for (URI store : metastoreUris) {
695-
LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store,
696-
transportMode);
697-
try {
698-
try {
699-
if (isHttpTransportMode) {
700-
transport = createHttpClient(store, useSSL);
701-
} else {
702-
transport = createBinaryClient(store, useSSL);
703-
}
704-
} catch (TTransportException te) {
705-
tte = te;
706-
throw new MetaException(te.toString());
707-
}
708-
709-
final TProtocol protocol;
710-
if (useCompactProtocol) {
711-
protocol = new TCompactProtocol(transport);
712-
} else {
713-
protocol = new TBinaryProtocol(transport);
714-
}
715-
client = new ThriftHiveMetastore.Client(protocol);
716-
try {
717-
if (!transport.isOpen()) {
718-
transport.open();
719-
final int newCount = connCount.incrementAndGet();
720-
if (useSSL) {
721-
LOG.info(
722-
"Opened an SSL connection to metastore, current connections: {}",
723-
newCount);
724-
if (LOG.isTraceEnabled()) {
725-
LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
726-
System.identityHashCode(this), new Exception());
727-
}
728-
} else {
729-
LOG.info("Opened a connection to metastore, URI ({}) "
730-
+ "current connections: {}", store, newCount);
731-
if (LOG.isTraceEnabled()) {
732-
LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
733-
System.identityHashCode(this), new Exception());
734-
}
735-
}
736-
}
737-
isConnected = true;
738-
} catch (TTransportException e) {
739-
tte = e;
740-
String errMsg = String.format("Failed to connect to the MetaStore Server URI (%s) in %s "
741-
+ "transport mode", store, transportMode);
742-
LOG.warn(errMsg);
743-
LOG.debug(errMsg, e);
693+
AtomicReference<Throwable> recentEx = new AtomicReference<>();
694+
Predicate<Throwable> retryPolicy = ex -> {
695+
recentEx.set(ex);
696+
return !isConnected && (ex instanceof MetaException || ex instanceof TTransportException);
697+
};
698+
AtomicInteger inx = new AtomicInteger(0);
699+
isConnected = new RetryingExecutor<>(retries * metastoreUris.length, () -> {
700+
URI store = metastoreUris[inx.getAndIncrement() % metastoreUris.length];
701+
LOG.info("Trying to connect to metastore with URI ({}) in {} transport mode", store, transportMode);
702+
if (isHttpTransportMode) {
703+
transport = createHttpClient(store, useSSL);
704+
} else {
705+
transport = createBinaryClient(store, useSSL);
706+
}
707+
final TProtocol protocol;
708+
if (useCompactProtocol) {
709+
protocol = new TCompactProtocol(transport);
710+
} else {
711+
protocol = new TBinaryProtocol(transport);
712+
}
713+
client = new ThriftHiveMetastore.Client(protocol);
714+
if (!transport.isOpen()) {
715+
transport.open();
716+
final int newCount = connCount.incrementAndGet();
717+
if (useSSL) {
718+
LOG.info(
719+
"Opened an SSL connection to metastore, current connections: {}",
720+
newCount);
721+
if (LOG.isTraceEnabled()) {
722+
LOG.trace("METASTORE SSL CONNECTION TRACE - open [{}]",
723+
System.identityHashCode(this), new Exception());
744724
}
745-
746-
if (isConnected && !useSasl && !usePasswordAuth && !isHttpTransportMode &&
747-
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) {
748-
// Call set_ugi, only in unsecure mode.
749-
try {
750-
UserGroupInformation ugi = SecurityUtils.getUGI();
751-
client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
752-
} catch (IOException e) {
753-
LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e);
754-
} catch (TException e) {
755-
LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
756-
+ "Continuing without it.", e);
757-
}
725+
} else {
726+
LOG.info("Opened a connection to metastore, URI ({}) "
727+
+ "current connections: {}", store, newCount);
728+
if (LOG.isTraceEnabled()) {
729+
LOG.trace("METASTORE CONNECTION TRACE - open [{}]",
730+
System.identityHashCode(this), new Exception());
758731
}
759-
} catch (MetaException e) {
760-
recentME = e;
761-
String errMsg = "Failed to connect to metastore with URI (" + store
762-
+ ") transport mode:" + transportMode + " in attempt " + attempt;
763-
LOG.error(errMsg, e);
764-
}
765-
if (isConnected) {
766-
// Set the beeline session modified metaConfVars for new HMS connection
767-
overlaySessionModifiedMetaConf();
768-
break;
769732
}
770733
}
771-
// Wait before launching the next round of connection retries.
772-
if (!isConnected && retryDelaySeconds > 0) {
773-
try {
774-
LOG.info("Waiting " + retryDelaySeconds + " seconds before next connection attempt.");
775-
Thread.sleep(retryDelaySeconds * 1000);
776-
} catch (InterruptedException ignore) {}
734+
return true;
735+
}).sleepInterval(retryDelaySeconds * 1000).onRetry(retryPolicy).run();
736+
737+
if (!useSasl && !usePasswordAuth && !isHttpTransportMode &&
738+
MetastoreConf.getBoolVar(conf, MetastoreConf.ConfVars.EXECUTE_SET_UGI)) {
739+
// Call set_ugi, only in unsecure mode.
740+
try {
741+
UserGroupInformation ugi = SecurityUtils.getUGI();
742+
client.set_ugi(ugi.getUserName(), Arrays.asList(ugi.getGroupNames()));
743+
} catch (IOException e) {
744+
LOG.warn("Failed to find ugi of client set_ugi() is not successful, Continuing without it.", e);
745+
} catch (TException e) {
746+
LOG.warn("set_ugi() not successful, Likely cause: new client talking to old server. "
747+
+ "Continuing without it.", e);
777748
}
778749
}
779750

@@ -782,15 +753,14 @@ private void open() throws MetaException {
782753
// be null. When MetaException wraps TTransportException, tte will be set so stringify that
783754
// directly.
784755
String exceptionString = "Unknown exception";
785-
if (tte != null) {
786-
exceptionString = StringUtils.stringifyException(tte);
787-
} else if (recentME != null) {
788-
exceptionString = StringUtils.stringifyException(recentME);
756+
if (recentEx.get() != null) {
757+
exceptionString = StringUtils.stringifyException(recentEx.get());
789758
}
790759
throw new MetaException("Could not connect to meta store using any of the URIs provided." +
791760
" Most recent failure: " + exceptionString);
792761
}
793-
762+
// Set the beeline session modified metaConfVars for new HMS connection
763+
overlaySessionModifiedMetaConf();
794764
snapshotActiveConf();
795765
}
796766

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java renamed to standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/utils/RetryingExecutor.java

File renamed without changes.

standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStoreProxy.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,9 +100,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
100100
Deadline.stopTimer();
101101
}
102102
}
103-
} catch (UndeclaredThrowableException e) {
104-
throw e.getCause();
105-
} catch (InvocationTargetException e) {
103+
} catch (UndeclaredThrowableException | InvocationTargetException e) {
106104
throw e.getCause();
107105
}
108106
}

0 commit comments

Comments
 (0)