diff --git a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/HiveMetadataConnectorTest.java b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/HiveMetadataConnectorTest.java index f72312159..c3064e705 100644 --- a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/HiveMetadataConnectorTest.java +++ b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/HiveMetadataConnectorTest.java @@ -20,51 +20,22 @@ import static org.junit.Assert.assertTrue; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Lists; import com.google.edwmigration.dumper.application.dumper.ConnectorArguments; -import com.google.edwmigration.dumper.application.dumper.Main; import com.google.edwmigration.dumper.application.dumper.connector.AbstractConnectorTest; -import com.google.edwmigration.dumper.application.dumper.connector.hive.support.HiveServerSupport; -import com.google.edwmigration.dumper.application.dumper.connector.hive.support.HiveTestSchemaBuilder; import com.google.edwmigration.dumper.application.dumper.task.Task; -import com.google.edwmigration.dumper.plugin.ext.jdk.progress.ConcurrentProgressMonitor; -import com.google.edwmigration.dumper.plugin.ext.jdk.progress.ConcurrentRecordProgressMonitor; -import java.io.File; import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.stream.Collectors; -import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; -import org.apache.commons.compress.archivers.zip.ZipFile; -import org.apache.commons.io.FileUtils; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.SystemUtils; -import org.junit.Assert; -import org.junit.Assume; import org.junit.Test; import org.junit.experimental.theories.DataPoints; import org.junit.experimental.theories.FromDataPoints; import org.junit.experimental.theories.Theories; import org.junit.experimental.theories.Theory; import org.junit.runner.RunWith; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; @RunWith(Theories.class) public class HiveMetadataConnectorTest extends AbstractConnectorTest { - private static final Logger logger = LoggerFactory.getLogger(HiveMetadataConnectorTest.class); - private static final boolean debug = false; - private final HiveMetadataConnector connector = new HiveMetadataConnector(); @Test @@ -141,109 +112,4 @@ public void addTasksTo_migrationMetadataTaskExists_success( "Task names must contain '" + taskName + "'. Actual tasks: " + taskNames, taskNames.contains(taskName)); } - - /** - * Run this with: - * - *
-   * ./gradlew :dumper:app:{cleanTest,test} --tests HiveMetadataConnectorTest
-   * -Dtest.verbose=true -Dorg.gradle.java.home=/usr/lib/jvm/java-1.8.0-openjdk-amd64
-   * 
- */ - @Test - public void testLoadedHive312() throws Exception { - - // Hive 3.1.2 requires java 1.8 - Assume.assumeTrue(SystemUtils.IS_JAVA_1_8); - - // with 5/5/10/1000 -> ~2 minutes - int NUM_SCHEMAS = 5; - int NUM_TABLES = 5; - int NUM_COLUMNS = 10; - int NUM_PARTITIONS = 1000; - - int NUM_DUMPER_RUNS = 5; // 5 -> ~4 minutes - int BATCH_SIZE = 25; - - List> setupStatements = - HiveTestSchemaBuilder.getStatements(NUM_SCHEMAS, NUM_TABLES, NUM_COLUMNS, NUM_PARTITIONS); - - try (HiveServerSupport instanceSupport = new HiveServerSupport().start()) { - - // Populate Hive Metastore - long total = setupStatements.stream().mapToLong(Collection::size).sum(); - try (ConcurrentProgressMonitor monitor = - new ConcurrentRecordProgressMonitor("Populating Hive instance.", total)) { - ExecutorService executor = Executors.newFixedThreadPool(HiveServerSupport.CONCURRENCY); - - for (List statementList : setupStatements) { - executor - .invokeAll( - Lists.partition(statementList, BATCH_SIZE).stream() - .map(l -> getCallable(instanceSupport, l, monitor)) - .collect(Collectors.toList())) - .forEach(this::assertNoException); - } - } - - // Run dumper many times and assert all tables are there (1 table = 1 line, JSONL) - for (int i = 0; i < NUM_DUMPER_RUNS; i++) { - String tmpPrefix = String.format("dumper-test-iteration-%d-", i); - Path tmpPath = Files.createTempFile(tmpPrefix, ".zip"); - File tmpFile = tmpPath.toFile(); - - try { - Main.main( - "--connector", - "hiveql", - "--port", - "" + instanceSupport.getMetastoreThriftPort(), - "--output", - tmpFile.getAbsolutePath()); - - assertTrue(tmpFile.exists()); - - try (ZipFile zipFile = new ZipFile(tmpFile)) { - List entries = Collections.list(zipFile.getEntries()); - - entries.forEach(e -> Assert.assertFalse(e.getName().contains("exception.txt"))); - - List tables = - IOUtils.readLines( - zipFile.getInputStream(zipFile.getEntry("tables.jsonl")), - StandardCharsets.UTF_8); - - Assert.assertEquals( - "All tables should be present.", NUM_SCHEMAS * NUM_TABLES, tables.size()); - - logger.info("Dump verified."); - } - - } catch (Exception e) { - throw new AssertionError(e); - } finally { - if (!debug) { - FileUtils.forceDelete(tmpFile); - } - } - } - } - } - - private void assertNoException(Future f) { - try { - f.get(); - } catch (Exception e) { - Assert.fail("Exception during setup"); - } - } - - private Callable getCallable( - HiveServerSupport support, List batch, ConcurrentProgressMonitor monitor) { - return () -> { - support.execute(batch.toArray(new String[] {})); - monitor.count(batch.size()); - return null; - }; - } } diff --git a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveServerSupport.java b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveServerSupport.java deleted file mode 100644 index a365aceea..000000000 --- a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveServerSupport.java +++ /dev/null @@ -1,263 +0,0 @@ -/* - * Copyright 2022 Google LLC - * Copyright 2013-2021 CompilerWorks - * Copyright (C) 2015-2021 Expedia, Inc. and Klarna AB. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.edwmigration.dumper.application.dumper.connector.hive.support; - -import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTORE_FASTPATH; - -import com.google.common.util.concurrent.MoreExecutors; -import java.io.IOException; -import java.io.UncheckedIOException; -import java.net.ServerSocket; -import java.nio.file.Files; -import java.nio.file.Path; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.commons.io.FileUtils; -import org.apache.derby.jdbc.EmbeddedDriver; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStore; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge; -import org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge23; -import org.apache.hive.service.Service.STATE; -import org.apache.hive.service.server.HiveServer2; -import org.checkerframework.checker.index.qual.Positive; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Support class to run a Hive Metastore listening on thrift. - */ -// This class contains some code sourced from and inspired by HiveRunner and BeeJU, specifically -// https://github.com/klarna/HiveRunner/blob/fb00a98f37abdb779547c1c98ef6fbe54d373e0c/src/main/java/com/klarna/hiverunner/StandaloneHiveServerContext.java -// https://github.com/ExpediaGroup/beeju/blob/a3e821b9bdb70f0e603cccb6408c319b241df66c/src/main/java/com/hotels/beeju/core/BeejuCore.java -public class HiveServerSupport implements AutoCloseable { - - public static final int CONCURRENCY = 32; // Default is min:5 max:500 - private static final Logger logger = LoggerFactory.getLogger(HiveServerSupport.class); - - // "user" conflicts with USER db and the metastore_db can't be created. - private static final String METASTORE_DB_USER = "db_user"; - private static final String METASTORE_DB_PASSWORD = "db_password"; - - private final HiveConf conf = new HiveConf(); - private final int thriftPortMetastore; - private final int thriftPortServer; - - private Path hiveTestDir; - private HiveServer2 hiveServer; - private ExecutorService metastoreExecutor; - - public HiveServerSupport() throws IOException { - thriftPortServer = getFreePort(); - thriftPortMetastore = getFreePort(); - configure(); - } - - @Positive - public int getMetastoreThriftPort() { - return thriftPortMetastore; - } - - @SuppressWarnings("deprecation") - public void configure() { - try { - hiveTestDir = Files.createTempDirectory("dumper-hive-test-"); - - createAndSetFolderProperty(HiveConf.ConfVars.SCRATCHDIR, "scratchdir"); - createAndSetFolderProperty(HiveConf.ConfVars.LOCALSCRATCHDIR, "localscratchdir"); - createAndSetFolderProperty(HiveConf.ConfVars.HIVEHISTORYFILELOC, "hivehistoryfileloc"); - - Path derbyHome = Files.createTempDirectory(hiveTestDir, "derby-home-"); - System.setProperty("derby.system.home", derbyHome.toString()); - - String derbyLog = Files.createTempFile(hiveTestDir, "derby", ".log").toString(); - System.setProperty("derby.stream.error.file", derbyLog); - - Path warehouseDir = Files.createTempDirectory(hiveTestDir, "hive-warehouse-"); - setHiveVar(HiveConf.ConfVars.METASTOREWAREHOUSE, warehouseDir.toString()); - - String driverClassName = EmbeddedDriver.class.getName(); - conf.setBoolean("hcatalog.hive.client.cache.disabled", true); - String connectionURL = "jdbc:derby:memory:" + UUID.randomUUID() + ";create=true"; - - setMetastoreAndSystemProperty(ConfVars.CONNECT_URL_KEY, connectionURL); - setMetastoreAndSystemProperty(ConfVars.CONNECTION_DRIVER, driverClassName); - setMetastoreAndSystemProperty(ConfVars.CONNECTION_USER_NAME, METASTORE_DB_USER); - setMetastoreAndSystemProperty(ConfVars.PWD, METASTORE_DB_PASSWORD); - - conf.setVar(HiveConf.ConfVars.METASTORE_CONNECTION_POOLING_TYPE, "NONE"); - conf.setBoolVar(HiveConf.ConfVars.HMSHANDLERFORCERELOADCONF, true); - - setMetastoreAndSystemProperty(ConfVars.AUTO_CREATE_ALL, "true"); - setMetastoreAndSystemProperty(ConfVars.SCHEMA_VERIFICATION, "false"); - - conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_WEBUI_PORT, 0); // disable - conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); - conf.setBoolVar(HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED, false); - setMetastoreAndSystemProperty(ConfVars.EVENT_DB_NOTIFICATION_API_AUTH, "false"); - conf.setTimeVar(HiveConf.ConfVars.HIVE_NOTFICATION_EVENT_POLL_INTERVAL, 0, - TimeUnit.MILLISECONDS); - conf.set(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname, "DUMMY"); - System.setProperty(HiveConf.ConfVars.HIVE_SERVER2_MATERIALIZED_VIEWS_REGISTRY_IMPL.varname, - "DUMMY"); - - conf.setBoolVar(HiveConf.ConfVars.HIVESESSIONSILENT, true); // Do not print "OK" - setHiveVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + thriftPortMetastore); - setHiveIntVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_PORT, thriftPortServer); - - } catch (IOException e) { - throw new UncheckedIOException("Error during configuration.", e); - } - } - - public void startMetastore() throws InterruptedException { - metastoreExecutor = Executors.newFixedThreadPool(CONCURRENCY); - - final Lock startLock = new ReentrantLock(); - final Condition startCondition = startLock.newCondition(); - final AtomicBoolean startedServing = new AtomicBoolean(); - - final HiveConf hiveConf = new HiveConf(conf, HiveMetaStoreClient.class); - metastoreExecutor.execute(() -> { - try { - HadoopThriftAuthBridge bridge = HadoopThriftAuthBridge23.getBridge(); - HiveMetaStore.startMetaStore(thriftPortMetastore, bridge, hiveConf, startLock, - startCondition, startedServing); - } catch (Throwable t) { - logger.error("Unable to start Hive Metastore", t); - } - }); - - wait(startLock, startCondition); - - } - - public void startServer() throws InterruptedException { - hiveServer = new HiveServer2(); - hiveServer.init(conf); - hiveServer.start(); - waitForState(hiveServer, STATE.STARTED); - - } - - public HiveServerSupport start() throws InterruptedException { - logger.info("Starting Hive Metastore on port {}, HiveServer2 on port {} ...", - thriftPortMetastore, thriftPortServer); - conf.setBoolVar(METASTORE_FASTPATH, true); - startMetastore(); - startServer(); - logger.info("Started."); - return this; - } - - public void execute(String... sqlScripts) throws SQLException { - try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { - for (String script : sqlScripts) { - statement.execute(script); - } - } - } - - public Connection getJdbcConnection() throws SQLException { - String jdbcConnectionUrl = "jdbc:hive2://localhost:" + thriftPortServer; - return DriverManager.getConnection(jdbcConnectionUrl); - } - - private int getFreePort() throws IOException { - try (ServerSocket socket = new ServerSocket(0)) { - return socket.getLocalPort(); - } - } - - private void waitForState(HiveServer2 hiveServer, STATE targetState) throws InterruptedException { - int retries = 0; - int maxRetries = 5; - while (hiveServer.getServiceState() != targetState && retries < maxRetries) { - TimeUnit.SECONDS.sleep(1); - retries++; - } - if (retries >= maxRetries) { - throw new RuntimeException("HiveServer2 did not start in a reasonable time"); - } - } - - private void wait(Lock startLock, Condition startCondition) throws InterruptedException { - for (int j = 0; j < 3; j++) { - startLock.lock(); - try { - if (startCondition.await(1, TimeUnit.MINUTES)) { - return; - } - } finally { - startLock.unlock(); - } - } - throw new RuntimeException( - "Maximum number of tries reached whilst waiting for Thrift server to be ready"); - } - - private void setMetastoreAndSystemProperty(ConfVars key, String value) { - conf.set(key.getVarname(), value); - conf.set(key.getHiveName(), value); - - System.setProperty(key.getVarname(), value); - System.setProperty(key.getHiveName(), value); - } - - private void createAndSetFolderProperty(HiveConf.ConfVars var, String childFolderName) - throws IOException { - String folderPath = newFolder(hiveTestDir, childFolderName).toAbsolutePath().toString(); - conf.setVar(var, folderPath); - } - - private Path newFolder(Path basedir, String folder) throws IOException { - Path newFolder = Files.createTempDirectory(basedir, folder); - FileUtil.setPermission(newFolder.toFile(), FsPermission.getDirDefault()); - return newFolder; - } - - public void setHiveVar(HiveConf.ConfVars variable, String value) { - conf.setVar(variable, value); - } - - public void setHiveIntVar(HiveConf.ConfVars variable, int value) { - conf.setIntVar(variable, value); - } - - @Override - public void close() throws Exception { - hiveServer.stop(); - waitForState(hiveServer, STATE.STOPPED); - MoreExecutors.shutdownAndAwaitTermination(metastoreExecutor, 30, TimeUnit.SECONDS); - FileUtils.deleteDirectory(hiveTestDir.toFile()); - } -} diff --git a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveTestSchemaBuilder.java b/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveTestSchemaBuilder.java deleted file mode 100644 index ef9b54669..000000000 --- a/dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/hive/support/HiveTestSchemaBuilder.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright 2022-2025 Google LLC - * Copyright 2013-2021 CompilerWorks - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.google.edwmigration.dumper.application.dumper.connector.hive.support; - -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.List; -import java.util.stream.Collectors; -import java.util.stream.IntStream; -import org.apache.commons.lang3.StringUtils; - -public class HiveTestSchemaBuilder { - - public static final String DATABASE_PREFIX = "test_db_"; - public static final String TABLE_PREFIX = "test_tbl_"; - public static final List PARTITION_NAMES = Lists.newArrayList("test_partition"); - public static final int PARTITION_SPLIT = 50; - - public static List> getStatements( - int dbCount, int tblCount, int colCount, int pCount) { - List> result = new ArrayList<>(); - result.add(createDatabases(dbCount)); - result.add(createTables(dbCount, tblCount, colCount)); - result.add(createPartitions(dbCount, tblCount, pCount)); - return result; - } - - public static List createPartitions(int dbCount, int tblCount, int pCount) { - List result = new ArrayList<>(); - for (int db = 0; db < dbCount; db++) { - for (int tbl = 0; tbl < tblCount; tbl++) { - result.addAll(createPartitions(db, tbl, pCount, PARTITION_SPLIT)); - } - } - return result; - } - - public static List createTables(int dbCount, int tblCount, int colCount) { - List result = new ArrayList<>(); - for (int db = 0; db < dbCount; db++) { - for (int tbl = 0; tbl < tblCount; tbl++) { - result.add(createTable(db, tbl, colCount)); - } - } - return result; - } - - public static List createDatabases(int dbCount) { - return IntStream.range(0, dbCount) - .mapToObj(HiveTestSchemaBuilder::createDatabase) - .collect(Collectors.toList()); - } - - public static List createPartitions( - int dbIndex, int tblIndex, int partitionCount, int partitionSplit) { - List partitions = - IntStream.range(0, partitionCount) - .mapToObj(HiveTestSchemaBuilder::getPartitionValue) - .collect(Collectors.toList()); - - return Lists.partition(partitions, partitionSplit).stream() - .map(l -> StringUtils.join(l, " ")) - .map( - p -> - String.format( - "alter table %s%d.%s%d add %s", - DATABASE_PREFIX, dbIndex, TABLE_PREFIX, tblIndex, p)) - .collect(Collectors.toList()); - } - - public static String createTable(int dbIndex, int tblIndex, int colCount) { - return String.format( - "create table %s%d.%s%d (%s) partitioned by (%s)", - DATABASE_PREFIX, - dbIndex, - TABLE_PREFIX, - tblIndex, - getColumns(colCount), - getPartitionsDefinition()); - } - - private static String getColumns(int colCount) { - return IntStream.range(0, colCount) - .mapToObj(c -> "col_" + c + " int") - .collect(Collectors.joining(", ")); - } - - public static String createDatabase(int dbIndex) { - return String.format("create database %s%d", DATABASE_PREFIX, dbIndex); - } - - private static String getPartitionsDefinition() { - return PARTITION_NAMES.stream().map(p -> p + " int").collect(Collectors.joining(", ")); - } - - private static String getPartitionValue(int value) { - return PARTITION_NAMES.stream() - .map(p -> String.format("partition (%s = %s)", p, value)) - .collect(Collectors.joining(", ")); - } -}