Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class ConnectorArguments extends DefaultArguments {
+ "\n";

public static final String OPT_CONNECTOR = "connector";
public static final String OPT_DIAGNOSTICS = "diagnostics";
public static final String OPT_DRIVER = "driver";
public static final String OPT_CLASS = "jdbcDriverClass";
public static final String OPT_URI = "url";
Expand Down Expand Up @@ -471,6 +472,13 @@ public class ConnectorArguments extends DefaultArguments {
.ofType(Integer.class)
.defaultsTo(OPT_THREAD_POOL_SIZE_DEFAULT);

private final OptionSpec<Boolean> optionDiagnostics =
parser
.accepts(OPT_DIAGNOSTICS, "Allows dumper diagnostics to be turned on/off")
.withOptionalArg()
.ofType(Boolean.class)
.defaultsTo(true);

public final OptionSpec<String> optionHadoopHdfsSiteXml =
parser
.accepts(
Expand Down Expand Up @@ -1048,6 +1056,10 @@ public List<String> getQueryLogAlternates() {
return getOptions().valuesOf(optionQueryLogAlternates);
}

public boolean isDiagnosticsOn() {
return getOptions().valueOf(optionDiagnostics);
}

public boolean isTestFlag(char c) {
String flags = getOptions().valueOf(optionFlags);
if (flags == null) {
Expand Down Expand Up @@ -1214,7 +1226,8 @@ public String toString() {
.add(OPT_QUERY_LOG_START, getQueryLogStart())
.add(OPT_QUERY_LOG_END, getQueryLogEnd())
.add(OPT_QUERY_LOG_ALTERNATES, getQueryLogAlternates())
.add(OPT_ASSESSMENT, isAssessment());
.add(OPT_ASSESSMENT, isAssessment())
.add(OPT_DIAGNOSTICS, isDiagnosticsOn());
getConnectorProperties().getDefinitionMap().forEach(toStringHelper::add);
return toStringHelper.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @author miguel */
public class Main {

private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final TelemetryProcessor telemetryProcessor = new TelemetryProcessor();

private final MetadataDumper metadataDumper;

public Main(MetadataDumper metadataDumper) {
this.metadataDumper = metadataDumper;
}

public boolean run(@Nonnull String... args) throws Exception {
return metadataDumper.run(args);
}

private static void printErrorMessages(Throwable e) {
new SummaryPrinter()
Expand All @@ -64,14 +52,14 @@ private static void printErrorMessages(Throwable e) {
public static void main(String... args) throws Exception {
try {
StartUpMetaInfoProcessor.printMetaInfo();
telemetryProcessor.setDumperMetadata(StartUpMetaInfoProcessor.getDumperMetadata());

Main main = new Main(new MetadataDumper(telemetryProcessor));

if (args.length == 0) {
args = new String[] {"--help"};
}
if (!main.run(args)) {

MetadataDumper metadataDumper = new MetadataDumper(args);

if (!metadataDumper.run()) {
System.exit(1);
}
} catch (MetadataDumperUsageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,128 @@ public class MetadataDumper {
private static final Pattern GCS_PATH_PATTERN =
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");

private final TelemetryProcessor telemetryProcessor;
private TelemetryProcessor telemetryProcessor;
private ConnectorArguments connectorArguments;

public MetadataDumper(TelemetryProcessor telemetryProcessor) {
this.telemetryProcessor = telemetryProcessor;
public MetadataDumper() {}

public MetadataDumper(String... args) throws Exception {
this.connectorArguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
telemetryProcessor = new TelemetryProcessor(connectorArguments.isDiagnosticsOn());
if (connectorArguments.saveResponseFile()) {
JsonResponseFile.save(connectorArguments);
}
}

public boolean run() throws Exception {
String connectorName = connectorArguments.getConnectorName();

Connector connector = ConnectorRepository.getInstance().getByName(connectorName);
if (connector == null) {
logger.error(
"Target connector '{}' not supported; available are {}.",
connectorName,
ConnectorRepository.getInstance().getAllNames());
return false;
}
connector.validate(connectorArguments);
return run(connector);
}

protected boolean run(@Nonnull Connector connector) throws Exception {
List<Task<?>> tasks = new ArrayList<>();
tasks.add(new VersionTask());
tasks.add(new ArgumentsTask(connectorArguments));
{
File sqlScript = connectorArguments.getSqlScript();
if (sqlScript != null) {
tasks.add(new JdbcRunSQLScript(sqlScript));
}
}
connector.addTasksTo(tasks, connectorArguments);

// The default output file is based on the connector.
// We had a customer request to base it on the database, but that isn't well-defined,
// as there may be 0 or N databases in a single file.
String outputFileLocation = getOutputFileLocation(connector, connectorArguments);

if (connectorArguments.isDryRun()) {
String title = "Dry run: Printing task list for " + connector.getName();
System.out.println(title);
System.out.println(repeat('=', title.length()));
System.out.println("Writing to " + outputFileLocation);
for (Task<?> task : tasks) {
print(task, 1);
}
return true;
} else {
Stopwatch stopwatch = Stopwatch.createStarted();
long outputFileLength = 0;
TaskSetState.Impl state = new TaskSetState.Impl();

logger.info("Using connector: [{}]", connector);
SummaryPrinter summaryPrinter = new SummaryPrinter();
boolean requiredTaskSucceeded = false;

try (Closer closer = Closer.create()) {
Path outputPath = prepareOutputPath(outputFileLocation, closer, connectorArguments);

URI outputUri = URI.create("jar:" + outputPath.toUri());

Map<String, Object> fileSystemProperties =
ImmutableMap.<String, Object>builder()
.put("create", "true")
.put("useTempFile", Boolean.TRUE)
.build();
FileSystem fileSystem =
closer.register(FileSystems.newFileSystem(outputUri, fileSystemProperties));
OutputHandleFactory sinkFactory =
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
logger.debug("Target filesystem is [{}]", sinkFactory);

Handle handle = closer.register(connector.open(connectorArguments));

new TasksRunner(
sinkFactory,
handle,
connectorArguments.getThreadPoolSize(),
state,
tasks,
connectorArguments)
.run();

requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);

telemetryProcessor.addDumperRunMetricsToPayload(
connectorArguments, state, stopwatch, requiredTaskSucceeded);
telemetryProcessor.processTelemetry(fileSystem);
} finally {
// We must do this in finally after the ZipFileSystem has been closed.
File outputFile = new File(outputFileLocation);
if (outputFile.isFile()) {
outputFileLength = outputFile.length();
}

printTaskResults(summaryPrinter, state);
logFinalSummary(
summaryPrinter,
state,
outputFileLength,
stopwatch,
outputFileLocation,
requiredTaskSucceeded);
}

return requiredTaskSucceeded;
}
}

/** @deprecated Use {@link #run()} instead. Should be deleted after refactoring the tests */
@Deprecated
public boolean run(String... args) throws Exception {
ConnectorArguments arguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
telemetryProcessor = new TelemetryProcessor(arguments.isDiagnosticsOn());

try {
return run(arguments);
} finally {
Expand All @@ -78,6 +192,8 @@ public boolean run(String... args) throws Exception {
}
}

/** @deprecated Use {@link #run()} instead. Should be deleted after refactoring the tests */
@Deprecated
public boolean run(@Nonnull ConnectorArguments arguments) throws Exception {
String connectorName = arguments.getConnectorName();
if (connectorName == null) {
Expand Down Expand Up @@ -132,6 +248,8 @@ private Path prepareOutputPath(
}
}

/** @deprecated Use {@link #run()} instead. Should be deleted after refactoring the tests */
@Deprecated
protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments arguments)
throws Exception {
List<Task<?>> tasks = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class TelemetryProcessor {
private static final Logger logger = LoggerFactory.getLogger(TelemetryProcessor.class);

private final ClientTelemetry clientTelemetry;
private final boolean shouldWrite;

public TelemetryProcessor() {
public TelemetryProcessor(boolean shouldWrite) {
clientTelemetry = new ClientTelemetry();
}
clientTelemetry.setDumperMetadata(StartUpMetaInfoProcessor.getDumperMetadata());

public void setDumperMetadata(DumperMetadata dumperMetadata) {
clientTelemetry.setDumperMetadata(dumperMetadata);
this.shouldWrite = shouldWrite;
}

/**
Expand All @@ -47,6 +47,9 @@ public void setDumperMetadata(DumperMetadata dumperMetadata) {
*/
public void addDumperRunMetricsToPayload(
ConnectorArguments arguments, TaskSetState state, Stopwatch stopwatch, boolean success) {
if (!shouldWrite) {
return;
}
try {
clientTelemetry.setEventType(EventType.DUMPER_RUN_METRICS);

Expand Down Expand Up @@ -89,6 +92,9 @@ public void addDumperRunMetricsToPayload(
}

public void processTelemetry(FileSystem fileSystem) {
if (!shouldWrite) {
return;
}
try {
TelemetryWriter.write(fileSystem, clientTelemetry);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.kms.v1.KeyManagementServiceClient;
import com.google.edwmigration.dumper.application.dumper.MetadataDumper;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.TelemetryProcessor;
import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand Down Expand Up @@ -100,7 +99,7 @@ public static void main(String... args) throws Exception {
/* maxRetries= */ 3, /* defaultRetryInterval= */ TimeValue.ofSeconds(1L)))
.build()) {
new Main(
() -> new MetadataDumper(new TelemetryProcessor()),
() -> new MetadataDumper(),
new HttpClientMetadataRetriever(httpClient),
DriverRetriever.create(httpClient, Files.createTempDirectory("clouddumper")))
.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private enum TopLevelTestFile {
}
}

private Main dumper = new Main(new MetadataDumper(new TelemetryProcessor()));
private final MetadataDumper dumper = new MetadataDumper();
private final Connector connector = new TestConnector();

@Before
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import com.google.common.base.MoreObjects;
import com.google.common.base.Predicates;
import com.google.common.io.ByteSource;
import com.google.edwmigration.dumper.application.dumper.Main;
import com.google.edwmigration.dumper.application.dumper.MetadataDumper;
import com.google.edwmigration.dumper.application.dumper.TelemetryProcessor;
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
import com.google.edwmigration.dumper.common.io.ZipArchiveEntryByteSource;
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.CoreMetadataDumpFormat;
Expand Down Expand Up @@ -115,8 +113,7 @@ protected static List<String> findJdbcDrivers(@Nonnull String baseName) throws I
}

public void runDumper(@Nonnull String... args) throws Exception {
Main dumper = new Main(new MetadataDumper(new TelemetryProcessor()));
dumper.run(args);
new MetadataDumper().run(args);
}

protected static class ZipEntryValidator<E extends Enum<E>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.sql.DataSource;
import org.apache.commons.lang3.ArrayUtils;
import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -272,15 +271,14 @@ public Void doInConnection(Connection connection)
// "--database", "db_0"
);

MetadataDumper dumper = new MetadataDumper(new TelemetryProcessor());

dumper.run(new ConnectorArguments(args.toArray(ArrayUtils.EMPTY_STRING_ARRAY)));
MetadataDumper dumper = new MetadataDumper(new String[] {});
dumper.run();

CONTINUE:
{
// Prove that --continue is doing its thing.
Stopwatch stopwatch = Stopwatch.createStarted();
dumper.run(new ConnectorArguments(args.toArray(ArrayUtils.EMPTY_STRING_ARRAY)));
dumper.run();
assertTrue(
"Second run of dumper was too slow.",
stopwatch.elapsed(TimeUnit.SECONDS)
Expand Down
Loading