Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class ConnectorArguments extends DefaultArguments {
+ "\n";

public static final String OPT_CONNECTOR = "connector";
public static final String OPT_DIAGNOSTICS = "diagnostics";
public static final String OPT_DRIVER = "driver";
public static final String OPT_CLASS = "jdbcDriverClass";
public static final String OPT_URI = "url";
Expand Down Expand Up @@ -471,6 +472,13 @@ public class ConnectorArguments extends DefaultArguments {
.ofType(Integer.class)
.defaultsTo(OPT_THREAD_POOL_SIZE_DEFAULT);

private final OptionSpec<Boolean> optionDiagnostics =
parser
.accepts(OPT_DIAGNOSTICS, "Allows dumper diagnostics to be turned on/off")
.withOptionalArg()
.ofType(Boolean.class)
.defaultsTo(true);

public final OptionSpec<String> optionHadoopHdfsSiteXml =
parser
.accepts(
Expand Down Expand Up @@ -1048,6 +1056,10 @@ public List<String> getQueryLogAlternates() {
return getOptions().valuesOf(optionQueryLogAlternates);
}

public boolean isDiagnosticsOn() {
return getOptions().valueOf(optionDiagnostics);
}

public boolean isTestFlag(char c) {
String flags = getOptions().valueOf(optionFlags);
if (flags == null) {
Expand Down Expand Up @@ -1214,7 +1226,8 @@ public String toString() {
.add(OPT_QUERY_LOG_START, getQueryLogStart())
.add(OPT_QUERY_LOG_END, getQueryLogEnd())
.add(OPT_QUERY_LOG_ALTERNATES, getQueryLogAlternates())
.add(OPT_ASSESSMENT, isAssessment());
.add(OPT_ASSESSMENT, isAssessment())
.add(OPT_DIAGNOSTICS, isDiagnosticsOn());
getConnectorProperties().getDefinitionMap().forEach(toStringHelper::add);
return toStringHelper.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @author miguel */
public class Main {

private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final TelemetryProcessor telemetryProcessor = new TelemetryProcessor();

private final MetadataDumper metadataDumper;

public Main(MetadataDumper metadataDumper) {
this.metadataDumper = metadataDumper;
}

public boolean run(@Nonnull String... args) throws Exception {
return metadataDumper.run(args);
}

private static void printErrorMessages(Throwable e) {
new SummaryPrinter()
Expand All @@ -64,14 +52,14 @@ private static void printErrorMessages(Throwable e) {
public static void main(String... args) throws Exception {
try {
StartUpMetaInfoProcessor.printMetaInfo();
telemetryProcessor.setDumperMetadata(StartUpMetaInfoProcessor.getDumperMetadata());

Main main = new Main(new MetadataDumper(telemetryProcessor));

if (args.length == 0) {
args = new String[] {"--help"};
}
if (!main.run(args)) {

MetadataDumper metadataDumper = new MetadataDumper(args);

if (!metadataDumper.run()) {
System.exit(1);
}
} catch (MetadataDumperUsageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,19 @@ public class MetadataDumper {
private static final Pattern GCS_PATH_PATTERN =
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");

private final TelemetryProcessor telemetryProcessor;
private TelemetryProcessor telemetryProcessor;
private ConnectorArguments connectorArguments;

public MetadataDumper(TelemetryProcessor telemetryProcessor) {
this.telemetryProcessor = telemetryProcessor;
}

public boolean run(String... args) throws Exception {
ConnectorArguments arguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
try {
return run(arguments);
} finally {
if (arguments.saveResponseFile()) {
JsonResponseFile.save(arguments);
}
public MetadataDumper(String... args) throws Exception {
this.connectorArguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
telemetryProcessor = new TelemetryProcessor(connectorArguments.isDiagnosticsOn());
if (connectorArguments.saveResponseFile()) {
JsonResponseFile.save(connectorArguments);
}
}

public boolean run(@Nonnull ConnectorArguments arguments) throws Exception {
String connectorName = arguments.getConnectorName();
if (connectorName == null) {
logger.error("Target connector is required");
return false;
}
public boolean run() throws Exception {
String connectorName = connectorArguments.getConnectorName();

Connector connector = ConnectorRepository.getInstance().getByName(connectorName);
if (connector == null) {
Expand All @@ -93,64 +83,28 @@ public boolean run(@Nonnull ConnectorArguments arguments) throws Exception {
ConnectorRepository.getInstance().getAllNames());
return false;
}
connector.validate(arguments);
return run(connector, arguments);
connector.validate(connectorArguments);
return run(connector);
}

private void print(@Nonnull Task<?> task, int indent) {
System.out.println(repeat(' ', indent * 2) + task);
if (task instanceof TaskGroup) {
for (Task<?> subtask : ((TaskGroup) task).getTasks()) {
print(subtask, indent + 1);
}
}
}

private Path prepareOutputPath(
@Nonnull String fileName, @Nonnull Closer closer, @Nonnull ConnectorArguments arguments)
throws IOException {
Matcher matcher = GCS_PATH_PATTERN.matcher(fileName);
if (matcher.matches()) {
String bucket = matcher.group("bucket");
String path = matcher.group("path");
logger.debug(
"Setting up CloudStorageFileSystem with bucket '{}' and path '{}'.", bucket, path);
CloudStorageFileSystem cloudStorageFileSystem =
closer.register(CloudStorageFileSystem.forBucket(bucket));
return cloudStorageFileSystem.getPath(path);
} else {
Path path = Paths.get(fileName);
File file = path.toFile();
if (file.exists()) {
if (!arguments.isOutputContinue()) {
file.delete(); // It's a simple file, and we were asked to overwrite it.
}
} else {
Files.createParentDirs(file);
}
return path;
}
}

protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments arguments)
throws Exception {
protected boolean run(@Nonnull Connector connector) throws Exception {
List<Task<?>> tasks = new ArrayList<>();
tasks.add(new VersionTask());
tasks.add(new ArgumentsTask(arguments));
tasks.add(new ArgumentsTask(connectorArguments));
{
File sqlScript = arguments.getSqlScript();
File sqlScript = connectorArguments.getSqlScript();
if (sqlScript != null) {
tasks.add(new JdbcRunSQLScript(sqlScript));
}
}
connector.addTasksTo(tasks, arguments);
connector.addTasksTo(tasks, connectorArguments);

// The default output file is based on the connector.
// We had a customer request to base it on the database, but that isn't well-defined,
// as there may be 0 or N databases in a single file.
String outputFileLocation = getOutputFileLocation(connector, arguments);
String outputFileLocation = getOutputFileLocation(connector, connectorArguments);

if (arguments.isDryRun()) {
if (connectorArguments.isDryRun()) {
String title = "Dry run: Printing task list for " + connector.getName();
System.out.println(title);
System.out.println(repeat('=', title.length()));
Expand All @@ -169,7 +123,7 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
boolean requiredTaskSucceeded = false;

try (Closer closer = Closer.create()) {
Path outputPath = prepareOutputPath(outputFileLocation, closer, arguments);
Path outputPath = prepareOutputPath(outputFileLocation, closer, connectorArguments);

URI outputUri = URI.create("jar:" + outputPath.toUri());

Expand All @@ -184,15 +138,21 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
logger.debug("Target filesystem is [{}]", sinkFactory);

Handle handle = closer.register(connector.open(arguments));
Handle handle = closer.register(connector.open(connectorArguments));

new TasksRunner(sinkFactory, handle, arguments.getThreadPoolSize(), state, tasks, arguments)
new TasksRunner(
sinkFactory,
handle,
connectorArguments.getThreadPoolSize(),
state,
tasks,
connectorArguments)
.run();

requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);

telemetryProcessor.addDumperRunMetricsToPayload(
arguments, state, stopwatch, requiredTaskSucceeded);
connectorArguments, state, stopwatch, requiredTaskSucceeded);
telemetryProcessor.processTelemetry(fileSystem);
} finally {
// We must do this in finally after the ZipFileSystem has been closed.
Expand All @@ -215,6 +175,41 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
}
}

private void print(@Nonnull Task<?> task, int indent) {
System.out.println(repeat(' ', indent * 2) + task);
if (task instanceof TaskGroup) {
for (Task<?> subtask : ((TaskGroup) task).getTasks()) {
print(subtask, indent + 1);
}
}
}

private Path prepareOutputPath(
@Nonnull String fileName, @Nonnull Closer closer, @Nonnull ConnectorArguments arguments)
throws IOException {
Matcher matcher = GCS_PATH_PATTERN.matcher(fileName);
if (matcher.matches()) {
String bucket = matcher.group("bucket");
String path = matcher.group("path");
logger.debug(
"Setting up CloudStorageFileSystem with bucket '{}' and path '{}'.", bucket, path);
CloudStorageFileSystem cloudStorageFileSystem =
closer.register(CloudStorageFileSystem.forBucket(bucket));
return cloudStorageFileSystem.getPath(path);
} else {
Path path = Paths.get(fileName);
File file = path.toFile();
if (file.exists()) {
if (!arguments.isOutputContinue()) {
file.delete(); // It's a simple file, and we were asked to overwrite it.
}
} else {
Files.createParentDirs(file);
}
return path;
}
}

private String getOutputFileLocation(Connector connector, ConnectorArguments arguments) {
Clock clock = Clock.systemDefaultZone();
// The default output file is based on the connector.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ public class TelemetryProcessor {
private static final Logger logger = LoggerFactory.getLogger(TelemetryProcessor.class);

private final ClientTelemetry clientTelemetry;
private final boolean shouldWrite;

public TelemetryProcessor() {
public TelemetryProcessor(boolean shouldWrite) {
clientTelemetry = new ClientTelemetry();
}
clientTelemetry.setDumperMetadata(StartUpMetaInfoProcessor.getDumperMetadata());

public void setDumperMetadata(DumperMetadata dumperMetadata) {
clientTelemetry.setDumperMetadata(dumperMetadata);
this.shouldWrite = shouldWrite;
}

/**
Expand All @@ -47,6 +47,9 @@ public void setDumperMetadata(DumperMetadata dumperMetadata) {
*/
public void addDumperRunMetricsToPayload(
ConnectorArguments arguments, TaskSetState state, Stopwatch stopwatch, boolean success) {
if (!shouldWrite) {
return;
}
try {
clientTelemetry.setEventType(EventType.DUMPER_RUN_METRICS);

Expand Down Expand Up @@ -89,6 +92,9 @@ public void addDumperRunMetricsToPayload(
}

public void processTelemetry(FileSystem fileSystem) {
if (!shouldWrite) {
return;
}
try {
TelemetryWriter.write(fileSystem, clientTelemetry);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.cloud.kms.v1.KeyManagementServiceClient;
import com.google.edwmigration.dumper.application.dumper.MetadataDumper;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.TelemetryProcessor;
import com.google.gson.Gson;
import com.google.protobuf.ByteString;
import java.io.IOException;
Expand Down Expand Up @@ -88,7 +87,7 @@ void run() throws Exception {
args.add(driverPath.toString());
});
args.addAll(connectorConfiguration.args);
metadataDumperSupplier.get().run(args.toArray(new String[args.size()]));
metadataDumperSupplier.get().run();
}
}

Expand All @@ -100,7 +99,13 @@ public static void main(String... args) throws Exception {
/* maxRetries= */ 3, /* defaultRetryInterval= */ TimeValue.ofSeconds(1L)))
.build()) {
new Main(
() -> new MetadataDumper(new TelemetryProcessor()),
() -> {
try {
return new MetadataDumper(args);
} catch (Exception e) {
throw new RuntimeException(e);
}
},
new HttpClientMetadataRetriever(httpClient),
DriverRetriever.create(httpClient, Files.createTempDirectory("clouddumper")))
.run();
Expand Down
Loading
Loading