Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,22 @@
import static org.junit.Assert.assertTrue;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
import com.google.edwmigration.dumper.application.dumper.Main;
import com.google.edwmigration.dumper.application.dumper.connector.AbstractConnectorTest;
import com.google.edwmigration.dumper.application.dumper.connector.hive.support.HiveServerSupport;
import com.google.edwmigration.dumper.application.dumper.connector.hive.support.HiveTestSchemaBuilder;
import com.google.edwmigration.dumper.application.dumper.task.Task;
import com.google.edwmigration.dumper.plugin.ext.jdk.progress.ConcurrentProgressMonitor;
import com.google.edwmigration.dumper.plugin.ext.jdk.progress.ConcurrentRecordProgressMonitor;
import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.commons.compress.archivers.zip.ZipArchiveEntry;
import org.apache.commons.compress.archivers.zip.ZipFile;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.SystemUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Theories.class)
public class HiveMetadataConnectorTest extends AbstractConnectorTest {

private static final Logger logger = LoggerFactory.getLogger(HiveMetadataConnectorTest.class);
private static final boolean debug = false;

private final HiveMetadataConnector connector = new HiveMetadataConnector();

@Test
Expand Down Expand Up @@ -141,109 +112,4 @@ public void addTasksTo_migrationMetadataTaskExists_success(
"Task names must contain '" + taskName + "'. Actual tasks: " + taskNames,
taskNames.contains(taskName));
}

/**
* Run this with:
*
* <pre>
* ./gradlew :dumper:app:{cleanTest,test} --tests HiveMetadataConnectorTest
* -Dtest.verbose=true -Dorg.gradle.java.home=/usr/lib/jvm/java-1.8.0-openjdk-amd64
* </pre>
*/
@Test
public void testLoadedHive312() throws Exception {

// Hive 3.1.2 requires java 1.8
Assume.assumeTrue(SystemUtils.IS_JAVA_1_8);

// with 5/5/10/1000 -> ~2 minutes
int NUM_SCHEMAS = 5;
int NUM_TABLES = 5;
int NUM_COLUMNS = 10;
int NUM_PARTITIONS = 1000;

int NUM_DUMPER_RUNS = 5; // 5 -> ~4 minutes
int BATCH_SIZE = 25;

List<List<String>> setupStatements =
HiveTestSchemaBuilder.getStatements(NUM_SCHEMAS, NUM_TABLES, NUM_COLUMNS, NUM_PARTITIONS);

try (HiveServerSupport instanceSupport = new HiveServerSupport().start()) {

// Populate Hive Metastore
long total = setupStatements.stream().mapToLong(Collection::size).sum();
try (ConcurrentProgressMonitor monitor =
new ConcurrentRecordProgressMonitor("Populating Hive instance.", total)) {
ExecutorService executor = Executors.newFixedThreadPool(HiveServerSupport.CONCURRENCY);

for (List<String> statementList : setupStatements) {
executor
.invokeAll(
Lists.partition(statementList, BATCH_SIZE).stream()
.map(l -> getCallable(instanceSupport, l, monitor))
.collect(Collectors.toList()))
.forEach(this::assertNoException);
}
}

// Run dumper many times and assert all tables are there (1 table = 1 line, JSONL)
for (int i = 0; i < NUM_DUMPER_RUNS; i++) {
String tmpPrefix = String.format("dumper-test-iteration-%d-", i);
Path tmpPath = Files.createTempFile(tmpPrefix, ".zip");
File tmpFile = tmpPath.toFile();

try {
Main.main(
"--connector",
"hiveql",
"--port",
"" + instanceSupport.getMetastoreThriftPort(),
"--output",
tmpFile.getAbsolutePath());

assertTrue(tmpFile.exists());

try (ZipFile zipFile = new ZipFile(tmpFile)) {
List<ZipArchiveEntry> entries = Collections.list(zipFile.getEntries());

entries.forEach(e -> Assert.assertFalse(e.getName().contains("exception.txt")));

List<String> tables =
IOUtils.readLines(
zipFile.getInputStream(zipFile.getEntry("tables.jsonl")),
StandardCharsets.UTF_8);

Assert.assertEquals(
"All tables should be present.", NUM_SCHEMAS * NUM_TABLES, tables.size());

logger.info("Dump verified.");
}

} catch (Exception e) {
throw new AssertionError(e);
} finally {
if (!debug) {
FileUtils.forceDelete(tmpFile);
}
}
}
}
}

private void assertNoException(Future<Void> f) {
try {
f.get();
} catch (Exception e) {
Assert.fail("Exception during setup");
}
}

private Callable<Void> getCallable(
HiveServerSupport support, List<String> batch, ConcurrentProgressMonitor monitor) {
return () -> {
support.execute(batch.toArray(new String[] {}));
monitor.count(batch.size());
return null;
};
}
}
Loading
Loading