Skip to content

Commit e21a71c

Browse files
committed
update to hive-metastore 4
1 parent 301b5ae commit e21a71c

File tree

5 files changed

+1489
-293
lines changed

5 files changed

+1489
-293
lines changed

sdks/java/io/iceberg/build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ dependencies {
7474
// Hive catalog test dependencies
7575
testImplementation project(path: ":sdks:java:io:iceberg:hive")
7676
testImplementation "org.apache.iceberg:iceberg-common:$iceberg_version"
77-
testImplementation ("org.apache.iceberg:iceberg-hive-metastore:$iceberg_version")
77+
testImplementation ("org.apache.hive:hive-iceberg-catalog:$hive_version")
7878
testImplementation ("org.apache.hive:hive-metastore:$hive_version")
7979
testImplementation ("org.apache.hive:hive-standalone-metastore-server:$hive_version")
8080
testImplementation "org.assertj:assertj-core:3.11.1"

sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/catalog/hiveutils/TestHiveMetastore.java

+65-25
Original file line numberDiff line numberDiff line change
@@ -30,23 +30,28 @@
3030
import java.nio.charset.StandardCharsets;
3131
import java.sql.Connection;
3232
import java.sql.DriverManager;
33-
import java.sql.SQLException;
3433
import java.util.concurrent.ExecutorService;
3534
import java.util.concurrent.Executors;
36-
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.Future;
3736
import org.apache.hadoop.conf.Configuration;
3837
import org.apache.hadoop.fs.FileStatus;
3938
import org.apache.hadoop.fs.FileSystem;
4039
import org.apache.hadoop.fs.Path;
4140
import org.apache.hadoop.hive.conf.HiveConf;
4241
import org.apache.hadoop.hive.metastore.HMSHandler;
42+
import org.apache.hadoop.hive.metastore.HMSHandlerProxyFactory;
4343
import org.apache.hadoop.hive.metastore.IHMSHandler;
44-
import org.apache.hadoop.hive.metastore.RetryingHMSHandler;
44+
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
4545
import org.apache.hadoop.hive.metastore.TSetIpAddressProcessor;
46+
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
47+
import org.apache.hadoop.hive.metastore.api.Table;
48+
import org.apache.iceberg.ClientPool;
49+
import org.apache.iceberg.catalog.TableIdentifier;
4650
import org.apache.iceberg.common.DynConstructors;
4751
import org.apache.iceberg.common.DynMethods;
4852
import org.apache.iceberg.hadoop.Util;
4953
import org.apache.iceberg.hive.HiveClientPool;
54+
import org.apache.thrift.TException;
5055
import org.apache.thrift.protocol.TBinaryProtocol;
5156
import org.apache.thrift.server.TServer;
5257
import org.apache.thrift.server.TThreadPoolServer;
@@ -58,7 +63,7 @@
5863
* HiveMetastoreExtension} instead.
5964
*
6065
* <p>Copied over from <a
61-
* href="https://github.com/apache/iceberg/blob/main/hive-metastore/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java">Iceberg's
66+
* href="https://github.com/apache/hive/blob/branch-4.0/iceberg/iceberg-catalog/src/test/java/org/apache/iceberg/hive/TestHiveMetastore.java">Iceberg's
6267
* integration testing util</a>
6368
*/
6469
public class TestHiveMetastore {
@@ -76,10 +81,22 @@ public class TestHiveMetastore {
7681

7782
private static final DynMethods.StaticMethod GET_BASE_HMS_HANDLER =
7883
DynMethods.builder("getProxy")
79-
.impl(RetryingHMSHandler.class, Configuration.class, IHMSHandler.class, boolean.class)
80-
.impl(RetryingHMSHandler.class, HiveConf.class, IHMSHandler.class, boolean.class)
84+
.impl(HMSHandlerProxyFactory.class, Configuration.class, IHMSHandler.class, boolean.class)
85+
.impl(HMSHandlerProxyFactory.class, HiveConf.class, IHMSHandler.class, boolean.class)
8186
.buildStatic();
8287

88+
// Hive3 introduces background metastore tasks (MetastoreTaskThread) for performing various
89+
// cleanup duties. These
90+
// threads are scheduled and executed in a static thread pool
91+
// (org.apache.hadoop.hive.metastore.ThreadPool).
92+
// This thread pool is shut down normally as part of the JVM shutdown hook, but since we're
93+
// creating and tearing down
94+
// multiple metastore instances within the same JVM, we have to call this cleanup method manually,
95+
// otherwise
96+
// threads from our previous test suite will be stuck in the pool with stale config, and keep on
97+
// being scheduled.
98+
// This can lead to issues, e.g. accidental Persistence Manager closure by
99+
// ScheduledQueryExecutionsMaintTask.
83100
private static final DynMethods.StaticMethod METASTORE_THREADS_SHUTDOWN =
84101
DynMethods.builder("shutdown")
85102
.impl("org.apache.hadoop.hive.metastore.ThreadPool")
@@ -89,13 +106,15 @@ public class TestHiveMetastore {
89106
// It's tricky to clear all static fields in an HMS instance in order to switch derby root dir.
90107
// Therefore, we reuse the same derby root between tests and remove it after JVM exits.
91108
private static final File HIVE_LOCAL_DIR;
109+
private static final File HIVE_EXTERNAL_WAREHOUSE_DIR;
92110
private static final String DERBY_PATH;
93111

94112
static {
95113
try {
96114
HIVE_LOCAL_DIR =
97115
createTempDirectory("hive", asFileAttribute(fromString("rwxrwxrwx"))).toFile();
98-
DERBY_PATH = HIVE_LOCAL_DIR + "/metastore_db";
116+
DERBY_PATH = new File(HIVE_LOCAL_DIR, "metastore_db").getPath();
117+
HIVE_EXTERNAL_WAREHOUSE_DIR = new File(HIVE_LOCAL_DIR, "external");
99118
File derbyLogFile = new File(HIVE_LOCAL_DIR, "derby.log");
100119
System.setProperty("derby.stream.error.file", derbyLogFile.getAbsolutePath());
101120
setupMetastoreDB("jdbc:derby:" + DERBY_PATH + ";create=true");
@@ -127,9 +146,16 @@ public class TestHiveMetastore {
127146
TestHiveMetastore(String hiveWarehousePath) {
128147
this.hiveWarehousePath = hiveWarehousePath;
129148
}
149+
/**
150+
* Starts a TestHiveMetastore with the default connection pool size (5) and the default HiveConf.
151+
*/
152+
public void start() {
153+
start(new HiveConf(new Configuration(), TestHiveMetastore.class), DEFAULT_POOL_SIZE);
154+
}
130155

131156
/**
132-
* Starts a TestHiveMetastore with the default connection pool size with the provided HiveConf.
157+
* Starts a TestHiveMetastore with the default connection pool size (5) with the provided
158+
* HiveConf.
133159
*
134160
* @param conf The hive configuration to use
135161
*/
@@ -143,7 +169,6 @@ public void start(HiveConf conf) {
143169
* @param conf The hive configuration to use
144170
* @param poolSize The number of threads in the executor pool
145171
*/
146-
@SuppressWarnings("FutureReturnValueIgnored")
147172
public void start(HiveConf conf, int poolSize) {
148173
try {
149174
TServerSocket socket = new TServerSocket(0);
@@ -153,7 +178,14 @@ public void start(HiveConf conf, int poolSize) {
153178
this.hiveConf = conf;
154179
this.server = newThriftServer(socket, poolSize, hiveConf);
155180
this.executorService = Executors.newSingleThreadExecutor();
156-
this.executorService.submit(() -> server.serve());
181+
Future<?> ignored = this.executorService.submit(() -> server.serve());
182+
183+
// in Hive3, setting this as a system prop ensures that it will be picked up whenever a new
184+
// HiveConf is created
185+
System.setProperty(
186+
HiveConf.ConfVars.METASTORE_URIS.varname,
187+
hiveConf.getVar(HiveConf.ConfVars.METASTORE_URIS));
188+
157189
this.clientPool = new HiveClientPool(1, hiveConf);
158190
} catch (Exception e) {
159191
throw new RuntimeException("Cannot start TestHiveMetastore", e);
@@ -169,13 +201,7 @@ public void stop() throws Exception {
169201
server.stop();
170202
}
171203
if (executorService != null) {
172-
executorService.shutdownNow();
173-
try {
174-
// Give it a reasonable timeout
175-
executorService.awaitTermination(10, TimeUnit.SECONDS);
176-
} catch (InterruptedException e) {
177-
Thread.currentThread().interrupt();
178-
}
204+
executorService.shutdown();
179205
}
180206
if (baseHandler != null) {
181207
baseHandler.shutdown();
@@ -215,9 +241,6 @@ public void reset() throws Exception {
215241

216242
Path warehouseRoot = new Path(hiveWarehousePath);
217243
FileSystem fs = Util.getFs(warehouseRoot, hiveConf);
218-
if (!fs.exists(warehouseRoot)) {
219-
return;
220-
}
221244
for (FileStatus fileStatus : fs.listStatus(warehouseRoot)) {
222245
if (!fileStatus.getPath().getName().equals("derby.log")
223246
&& !fileStatus.getPath().getName().equals("metastore_db")) {
@@ -226,6 +249,19 @@ public void reset() throws Exception {
226249
}
227250
}
228251

252+
public Table getTable(String dbName, String tableName) throws TException, InterruptedException {
253+
return clientPool.run(client -> client.getTable(new GetTableRequest(dbName, tableName)));
254+
}
255+
256+
public Table getTable(TableIdentifier identifier) throws TException, InterruptedException {
257+
return getTable(identifier.namespace().toString(), identifier.name());
258+
}
259+
260+
public <R> R run(ClientPool.Action<R, IMetaStoreClient, TException> action)
261+
throws InterruptedException, TException {
262+
return clientPool.run(action, false);
263+
}
264+
229265
private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf conf)
230266
throws Exception {
231267
HiveConf serverConf = new HiveConf(conf);
@@ -249,20 +285,24 @@ private TServer newThriftServer(TServerSocket socket, int poolSize, HiveConf con
249285
private void initConf(HiveConf conf, int port) {
250286
conf.set(HiveConf.ConfVars.METASTORE_URIS.varname, "thrift://localhost:" + port);
251287
conf.set(HiveConf.ConfVars.METASTORE_WAREHOUSE.varname, hiveWarehousePath);
288+
conf.set(
289+
HiveConf.ConfVars.HIVE_METASTORE_WAREHOUSE_EXTERNAL.varname,
290+
"file:" + HIVE_EXTERNAL_WAREHOUSE_DIR.getAbsolutePath());
252291
conf.set(HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL.varname, "false");
253292
conf.set(HiveConf.ConfVars.METASTORE_DISALLOW_INCOMPATIBLE_COL_TYPE_CHANGES.varname, "false");
254293
conf.set("iceberg.hive.client-pool-size", "2");
255-
// Setting this to avoid thrift exception during running Iceberg tests outside Iceberg.
256-
conf.set(
257-
HiveConf.ConfVars.HIVE_IN_TEST.varname, HiveConf.ConfVars.HIVE_IN_TEST.getDefaultValue());
294+
// set to false so that TxnManager#checkLock does not throw exception when using UNSET data type
295+
// operation
296+
// in the requested lock component
297+
conf.setBoolVar(HiveConf.ConfVars.HIVE_IN_TEST, false);
258298
}
259299

260-
private static void setupMetastoreDB(String dbURL) throws SQLException, IOException {
300+
private static void setupMetastoreDB(String dbURL) throws Exception {
261301
Connection connection = DriverManager.getConnection(dbURL);
262302
ScriptRunner scriptRunner = new ScriptRunner(connection, true, true);
263303

264304
ClassLoader classLoader = ClassLoader.getSystemClassLoader();
265-
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-3.1.0.derby.sql");
305+
InputStream inputStream = classLoader.getResourceAsStream("hive-schema-4.0.0.derby.sql");
266306
try (Reader reader = new InputStreamReader(inputStream, StandardCharsets.UTF_8)) {
267307
scriptRunner.runScript(reader);
268308
}

0 commit comments

Comments
 (0)