Skip to content

Commit e24954d

Browse files
committed
[Fix][Connector-V2] Fix hive client thread unsafe
1 parent 8617014 commit e24954d

File tree

3 files changed

+119
-114
lines changed

3 files changed

+119
-114
lines changed

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/commit/HiveSinkAggregatedCommitter.java

+23-19
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,13 @@ public class HiveSinkAggregatedCommitter extends FileSinkAggregatedCommitter {
4040
private final boolean abortDropPartitionMetadata;
4141

4242
private final ReadonlyConfig readonlyConfig;
43+
private final HiveMetaStoreProxy hiveMetaStore;
4344

4445
public HiveSinkAggregatedCommitter(
4546
ReadonlyConfig readonlyConfig, String dbName, String tableName, HadoopConf hadoopConf) {
4647
super(hadoopConf);
4748
this.readonlyConfig = readonlyConfig;
49+
this.hiveMetaStore = new HiveMetaStoreProxy(readonlyConfig);
4850
this.dbName = dbName;
4951
this.tableName = tableName;
5052
this.abortDropPartitionMetadata =
@@ -57,25 +59,20 @@ public List<FileAggregatedCommitInfo> commit(
5759

5860
List<FileAggregatedCommitInfo> errorCommitInfos = super.commit(aggregatedCommitInfos);
5961
if (errorCommitInfos.isEmpty()) {
60-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
61-
try {
62-
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
63-
Map<String, List<String>> partitionDirAndValuesMap =
64-
aggregatedCommitInfo.getPartitionDirAndValuesMap();
65-
List<String> partitions =
66-
partitionDirAndValuesMap.keySet().stream()
67-
.map(partition -> partition.replaceAll("\\\\", "/"))
68-
.collect(Collectors.toList());
69-
try {
70-
hiveMetaStore.addPartitions(dbName, tableName, partitions);
71-
log.info("Add these partitions {}", partitions);
72-
} catch (TException e) {
73-
log.error("Failed to add these partitions {}", partitions, e);
74-
errorCommitInfos.add(aggregatedCommitInfo);
75-
}
62+
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
63+
Map<String, List<String>> partitionDirAndValuesMap =
64+
aggregatedCommitInfo.getPartitionDirAndValuesMap();
65+
List<String> partitions =
66+
partitionDirAndValuesMap.keySet().stream()
67+
.map(partition -> partition.replaceAll("\\\\", "/"))
68+
.collect(Collectors.toList());
69+
try {
70+
hiveMetaStore.addPartitions(dbName, tableName, partitions);
71+
log.info("Add these partitions {}", partitions);
72+
} catch (TException e) {
73+
log.error("Failed to add these partitions {}", partitions, e);
74+
errorCommitInfos.add(aggregatedCommitInfo);
7675
}
77-
} finally {
78-
hiveMetaStore.close();
7976
}
8077
}
8178
return errorCommitInfos;
@@ -85,7 +82,6 @@ public List<FileAggregatedCommitInfo> commit(
8582
public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws Exception {
8683
super.abort(aggregatedCommitInfos);
8784
if (abortDropPartitionMetadata) {
88-
HiveMetaStoreProxy hiveMetaStore = HiveMetaStoreProxy.getInstance(readonlyConfig);
8985
for (FileAggregatedCommitInfo aggregatedCommitInfo : aggregatedCommitInfos) {
9086
Map<String, List<String>> partitionDirAndValuesMap =
9187
aggregatedCommitInfo.getPartitionDirAndValuesMap();
@@ -100,7 +96,15 @@ public void abort(List<FileAggregatedCommitInfo> aggregatedCommitInfos) throws E
10096
log.error("Failed to remove these partitions {}", partitions, e);
10197
}
10298
}
99+
}
100+
}
101+
102+
@Override
103+
public void close() throws IOException {
104+
try {
103105
hiveMetaStore.close();
106+
} finally {
107+
super.close();
104108
}
105109
}
106110
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveMetaStoreProxy.java

+95-91
Original file line numberDiff line numberDiff line change
@@ -32,125 +32,129 @@
3232
import org.apache.hadoop.hive.conf.HiveConf;
3333
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
3434
import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
35-
import org.apache.hadoop.hive.metastore.api.MetaException;
3635
import org.apache.hadoop.hive.metastore.api.Table;
3736
import org.apache.thrift.TException;
3837

3938
import lombok.NonNull;
4039
import lombok.extern.slf4j.Slf4j;
4140

41+
import java.io.Closeable;
4242
import java.io.File;
4343
import java.io.IOException;
44+
import java.io.Serializable;
4445
import java.net.MalformedURLException;
4546
import java.nio.file.Files;
47+
import java.nio.file.Path;
4648
import java.nio.file.Paths;
4749
import java.util.List;
4850
import java.util.Objects;
4951

5052
@Slf4j
51-
public class HiveMetaStoreProxy {
52-
private HiveMetaStoreClient hiveMetaStoreClient;
53-
private static volatile HiveMetaStoreProxy INSTANCE = null;
53+
public class HiveMetaStoreProxy implements Closeable, Serializable {
5454
private static final List<String> HADOOP_CONF_FILES = ImmutableList.of("hive-site.xml");
5555

56-
private HiveMetaStoreProxy(ReadonlyConfig readonlyConfig) {
57-
String metastoreUri = readonlyConfig.get(HiveOptions.METASTORE_URI);
58-
String hiveHadoopConfigPath = readonlyConfig.get(HiveConfig.HADOOP_CONF_PATH);
59-
String hiveSitePath = readonlyConfig.get(HiveConfig.HIVE_SITE_PATH);
60-
HiveConf hiveConf = new HiveConf();
61-
hiveConf.set("hive.metastore.uris", metastoreUri);
62-
try {
63-
if (StringUtils.isNotBlank(hiveHadoopConfigPath)) {
64-
HADOOP_CONF_FILES.forEach(
65-
confFile -> {
66-
java.nio.file.Path path = Paths.get(hiveHadoopConfigPath, confFile);
67-
if (Files.exists(path)) {
68-
try {
69-
hiveConf.addResource(path.toUri().toURL());
70-
} catch (IOException e) {
71-
log.warn(
72-
"Error adding Hadoop resource {}, resource was not added",
73-
path,
74-
e);
75-
}
76-
}
77-
});
78-
}
56+
private final String metastoreUri;
57+
private final String hadoopConfDir;
58+
private final String hiveSitePath;
59+
private final boolean kerberosEnabled;
60+
private final boolean remoteUserEnabled;
7961

80-
if (StringUtils.isNotBlank(hiveSitePath)) {
81-
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
82-
}
62+
private final String krb5Path;
63+
private final String principal;
64+
private final String keytabPath;
65+
private final String remoteUser;
8366

84-
log.info("hive client conf:{}", hiveConf);
85-
if (HiveMetaStoreProxyUtils.enableKerberos(readonlyConfig)) {
86-
// login Kerberos
87-
Configuration authConf = new Configuration();
88-
authConf.set("hadoop.security.authentication", "kerberos");
89-
this.hiveMetaStoreClient =
90-
HadoopLoginFactory.loginWithKerberos(
91-
authConf,
92-
readonlyConfig.get(HdfsSourceConfigOptions.KRB5_PATH),
93-
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL),
94-
readonlyConfig.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH),
95-
(conf, userGroupInformation) -> {
96-
return new HiveMetaStoreClient(hiveConf);
97-
});
98-
return;
67+
private HiveMetaStoreClient hiveClient;
68+
69+
public HiveMetaStoreProxy(ReadonlyConfig config) {
70+
this.metastoreUri = config.get(HiveOptions.METASTORE_URI);
71+
this.hadoopConfDir = config.get(HiveConfig.HADOOP_CONF_PATH);
72+
this.hiveSitePath = config.get(HiveConfig.HIVE_SITE_PATH);
73+
this.kerberosEnabled = HiveMetaStoreProxyUtils.enableKerberos(config);
74+
this.remoteUserEnabled = HiveMetaStoreProxyUtils.enableRemoteUser(config);
75+
this.krb5Path = config.get(HdfsSourceConfigOptions.KRB5_PATH);
76+
this.principal = config.get(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL);
77+
this.keytabPath = config.get(HdfsSourceConfigOptions.KERBEROS_KEYTAB_PATH);
78+
this.remoteUser = config.get(HdfsSourceConfigOptions.REMOTE_USER);
79+
}
80+
81+
private synchronized HiveMetaStoreClient getClient() {
82+
if (hiveClient == null) {
83+
hiveClient = initializeClient();
84+
}
85+
return hiveClient;
86+
}
87+
88+
private HiveMetaStoreClient initializeClient() {
89+
HiveConf hiveConf = buildHiveConf();
90+
try {
91+
if (kerberosEnabled) {
92+
return loginWithKerberos(hiveConf);
9993
}
100-
if (HiveMetaStoreProxyUtils.enableRemoteUser(readonlyConfig)) {
101-
this.hiveMetaStoreClient =
102-
HadoopLoginFactory.loginWithRemoteUser(
103-
new Configuration(),
104-
readonlyConfig.get(HdfsSourceConfigOptions.REMOTE_USER),
105-
(conf, userGroupInformation) -> {
106-
return new HiveMetaStoreClient(hiveConf);
107-
});
108-
return;
94+
if (remoteUserEnabled) {
95+
return loginWithRemoteUser(hiveConf);
10996
}
110-
this.hiveMetaStoreClient = new HiveMetaStoreClient(hiveConf);
111-
} catch (MetaException e) {
112-
String errorMsg =
113-
String.format(
114-
"Using this hive uris [%s] to initialize "
115-
+ "hive metastore client instance failed",
116-
metastoreUri);
117-
throw new HiveConnectorException(
118-
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
119-
} catch (MalformedURLException e) {
120-
String errorMsg =
121-
String.format(
122-
"Using this hive uris [%s], hive conf [%s] to initialize "
123-
+ "hive metastore client instance failed",
124-
metastoreUri, readonlyConfig.get(HiveOptions.HIVE_SITE_PATH));
125-
throw new HiveConnectorException(
126-
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errorMsg, e);
97+
return new HiveMetaStoreClient(hiveConf);
12798
} catch (Exception e) {
99+
String errMsg =
100+
String.format(
101+
"Failed to initialize HiveMetaStoreClient [uris=%s, hiveSite=%s]",
102+
metastoreUri, hiveSitePath);
128103
throw new HiveConnectorException(
129-
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED,
130-
"Login form kerberos failed",
131-
e);
104+
HiveConnectorErrorCode.INITIALIZE_HIVE_METASTORE_CLIENT_FAILED, errMsg, e);
132105
}
133106
}
134107

135-
public static HiveMetaStoreProxy getInstance(ReadonlyConfig readonlyConfig) {
136-
if (INSTANCE == null) {
137-
synchronized (HiveMetaStoreProxy.class) {
138-
if (INSTANCE == null) {
139-
INSTANCE = new HiveMetaStoreProxy(readonlyConfig);
108+
private HiveConf buildHiveConf() {
109+
HiveConf hiveConf = new HiveConf();
110+
hiveConf.set("hive.metastore.uris", metastoreUri);
111+
112+
if (StringUtils.isNotBlank(hadoopConfDir)) {
113+
for (String fileName : HADOOP_CONF_FILES) {
114+
Path path = Paths.get(hadoopConfDir, fileName);
115+
if (Files.exists(path)) {
116+
try {
117+
hiveConf.addResource(path.toUri().toURL());
118+
} catch (IOException e) {
119+
log.warn("Error adding Hadoop config {}", path, e);
120+
}
140121
}
141122
}
142123
}
143-
return INSTANCE;
124+
if (StringUtils.isNotBlank(hiveSitePath)) {
125+
try {
126+
hiveConf.addResource(new File(hiveSitePath).toURI().toURL());
127+
} catch (MalformedURLException e) {
128+
log.warn("Invalid hiveSitePath {}", hiveSitePath, e);
129+
}
130+
}
131+
log.info("Hive client configuration: {}", hiveConf);
132+
return hiveConf;
133+
}
134+
135+
private HiveMetaStoreClient loginWithKerberos(HiveConf hiveConf) throws Exception {
136+
Configuration authConf = new Configuration();
137+
authConf.set("hadoop.security.authentication", "kerberos");
138+
return HadoopLoginFactory.loginWithKerberos(
139+
authConf,
140+
krb5Path,
141+
principal,
142+
keytabPath,
143+
(conf, ugi) -> new HiveMetaStoreClient(hiveConf));
144+
}
145+
146+
private HiveMetaStoreClient loginWithRemoteUser(HiveConf hiveConf) throws Exception {
147+
return HadoopLoginFactory.loginWithRemoteUser(
148+
new Configuration(), remoteUser, (conf, ugi) -> new HiveMetaStoreClient(hiveConf));
144149
}
145150

146151
public Table getTable(@NonNull String dbName, @NonNull String tableName) {
147152
try {
148-
return hiveMetaStoreClient.getTable(dbName, tableName);
153+
return getClient().getTable(dbName, tableName);
149154
} catch (TException e) {
150-
String errorMsg =
151-
String.format("Get table [%s.%s] information failed", dbName, tableName);
155+
String msg = String.format("Failed to get table %s.%s", dbName, tableName);
152156
throw new HiveConnectorException(
153-
HiveConnectorErrorCode.GET_HIVE_TABLE_INFORMATION_FAILED, errorMsg, e);
157+
HiveConnectorErrorCode.GET_HIVE_TABLE_INFORMATION_FAILED, msg, e);
154158
}
155159
}
156160

@@ -159,9 +163,9 @@ public void addPartitions(
159163
throws TException {
160164
for (String partition : partitions) {
161165
try {
162-
hiveMetaStoreClient.appendPartition(dbName, tableName, partition);
163-
} catch (AlreadyExistsException e) {
164-
log.warn("The partition {} are already exists", partition);
166+
getClient().appendPartition(dbName, tableName, partition);
167+
} catch (AlreadyExistsException ae) {
168+
log.warn("Partition {} already exists", partition);
165169
}
166170
}
167171
}
@@ -170,14 +174,14 @@ public void dropPartitions(
170174
@NonNull String dbName, @NonNull String tableName, List<String> partitions)
171175
throws TException {
172176
for (String partition : partitions) {
173-
hiveMetaStoreClient.dropPartition(dbName, tableName, partition, false);
177+
getClient().dropPartition(dbName, tableName, partition, false);
174178
}
175179
}
176180

181+
@Override
177182
public synchronized void close() {
178-
if (Objects.nonNull(hiveMetaStoreClient)) {
179-
hiveMetaStoreClient.close();
180-
HiveMetaStoreProxy.INSTANCE = null;
183+
if (Objects.nonNull(hiveClient)) {
184+
hiveClient.close();
181185
}
182186
}
183187
}

seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/utils/HiveTableUtils.java

+1-4
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,9 @@ public static Table getTableInfo(ReadonlyConfig readonlyConfig) {
3838
throw new SeaTunnelRuntimeException(
3939
HiveConnectorErrorCode.HIVE_TABLE_NAME_ERROR, "Current table name is " + table);
4040
}
41-
HiveMetaStoreProxy hiveMetaStoreProxy = HiveMetaStoreProxy.getInstance(readonlyConfig);
42-
try {
41+
try (HiveMetaStoreProxy hiveMetaStoreProxy = new HiveMetaStoreProxy(readonlyConfig)) {
4342
return hiveMetaStoreProxy.getTable(
4443
tablePath.getDatabaseName(), tablePath.getTableName());
45-
} finally {
46-
hiveMetaStoreProxy.close();
4744
}
4845
}
4946

0 commit comments

Comments
 (0)