Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public class ConnectorArguments extends DefaultArguments {
+ "\n";

public static final String OPT_CONNECTOR = "connector";
public static final String OPT_TELEMETRY = "telemetry";
public static final String OPT_DRIVER = "driver";
public static final String OPT_CLASS = "jdbcDriverClass";
public static final String OPT_URI = "url";
Expand Down Expand Up @@ -471,6 +472,13 @@ public class ConnectorArguments extends DefaultArguments {
.ofType(Integer.class)
.defaultsTo(OPT_THREAD_POOL_SIZE_DEFAULT);

private final OptionSpec<Boolean> optionTelemetry =
parser
.accepts(OPT_TELEMETRY, "Allows dumper telemetry to be turned on/off")
.withOptionalArg()
.ofType(Boolean.class)
.defaultsTo(true);

public final OptionSpec<String> optionHadoopHdfsSiteXml =
parser
.accepts(
Expand Down Expand Up @@ -1048,6 +1056,10 @@ public List<String> getQueryLogAlternates() {
return getOptions().valuesOf(optionQueryLogAlternates);
}

public boolean isTelemetryOn() {
return getOptions().valueOf(optionTelemetry);
}

public boolean isTestFlag(char c) {
String flags = getOptions().valueOf(optionFlags);
if (flags == null) {
Expand Down Expand Up @@ -1214,7 +1226,8 @@ public String toString() {
.add(OPT_QUERY_LOG_START, getQueryLogStart())
.add(OPT_QUERY_LOG_END, getQueryLogEnd())
.add(OPT_QUERY_LOG_ALTERNATES, getQueryLogAlternates())
.add(OPT_ASSESSMENT, isAssessment());
.add(OPT_ASSESSMENT, isAssessment())
.add(OPT_TELEMETRY, isTelemetryOn());
getConnectorProperties().getDefinitionMap().forEach(toStringHelper::add);
return toStringHelper.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,25 +21,13 @@
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import java.util.Objects;
import javax.annotation.Nonnull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** @author miguel */
public class Main {

private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final TelemetryProcessor telemetryProcessor = new TelemetryProcessor();

private final MetadataDumper metadataDumper;

public Main(MetadataDumper metadataDumper) {
this.metadataDumper = metadataDumper;
}

public boolean run(@Nonnull String... args) throws Exception {
return metadataDumper.run(args);
}

private static void printErrorMessages(Throwable e) {
new SummaryPrinter()
Expand All @@ -64,14 +52,14 @@ private static void printErrorMessages(Throwable e) {
public static void main(String... args) throws Exception {
try {
StartUpMetaInfoProcessor.printMetaInfo();
telemetryProcessor.setDumperMetadata(StartUpMetaInfoProcessor.getDumperMetadata());

Main main = new Main(new MetadataDumper(telemetryProcessor));

if (args.length == 0) {
args = new String[] {"--help"};
}
if (!main.run(args)) {

MetadataDumper metadataDumper = new MetadataDumper(args);

if (!metadataDumper.run()) {
System.exit(1);
}
} catch (MetadataDumperUsageException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,29 +61,21 @@ public class MetadataDumper {
private static final Pattern GCS_PATH_PATTERN =
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");

private final TelemetryProcessor telemetryProcessor;
private TelemetryProcessor telemetryProcessor;
private ConnectorArguments connectorArguments;

public MetadataDumper(TelemetryProcessor telemetryProcessor) {
this.telemetryProcessor = telemetryProcessor;
}

public boolean run(String... args) throws Exception {
ConnectorArguments arguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
try {
return run(arguments);
} finally {
if (arguments.saveResponseFile()) {
JsonResponseFile.save(arguments);
}
public MetadataDumper(String... args) throws Exception {
this.connectorArguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
telemetryProcessor =
new TelemetryProcessor(
TelemetryStrategyFactory.createStrategy(connectorArguments.isTelemetryOn()));
if (connectorArguments.saveResponseFile()) {
JsonResponseFile.save(connectorArguments);
}
}

public boolean run(@Nonnull ConnectorArguments arguments) throws Exception {
String connectorName = arguments.getConnectorName();
if (connectorName == null) {
logger.error("Target connector is required");
return false;
}
public boolean run() throws Exception {
String connectorName = connectorArguments.getConnectorName();

Connector connector = ConnectorRepository.getInstance().getByName(connectorName);
if (connector == null) {
Expand All @@ -93,64 +85,28 @@ public boolean run(@Nonnull ConnectorArguments arguments) throws Exception {
ConnectorRepository.getInstance().getAllNames());
return false;
}
connector.validate(arguments);
return run(connector, arguments);
connector.validate(connectorArguments);
return run(connector);
}

private void print(@Nonnull Task<?> task, int indent) {
System.out.println(repeat(' ', indent * 2) + task);
if (task instanceof TaskGroup) {
for (Task<?> subtask : ((TaskGroup) task).getTasks()) {
print(subtask, indent + 1);
}
}
}

private Path prepareOutputPath(
@Nonnull String fileName, @Nonnull Closer closer, @Nonnull ConnectorArguments arguments)
throws IOException {
Matcher matcher = GCS_PATH_PATTERN.matcher(fileName);
if (matcher.matches()) {
String bucket = matcher.group("bucket");
String path = matcher.group("path");
logger.debug(
"Setting up CloudStorageFileSystem with bucket '{}' and path '{}'.", bucket, path);
CloudStorageFileSystem cloudStorageFileSystem =
closer.register(CloudStorageFileSystem.forBucket(bucket));
return cloudStorageFileSystem.getPath(path);
} else {
Path path = Paths.get(fileName);
File file = path.toFile();
if (file.exists()) {
if (!arguments.isOutputContinue()) {
file.delete(); // It's a simple file, and we were asked to overwrite it.
}
} else {
Files.createParentDirs(file);
}
return path;
}
}

protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments arguments)
throws Exception {
protected boolean run(@Nonnull Connector connector) throws Exception {
List<Task<?>> tasks = new ArrayList<>();
tasks.add(new VersionTask());
tasks.add(new ArgumentsTask(arguments));
tasks.add(new ArgumentsTask(connectorArguments));
{
File sqlScript = arguments.getSqlScript();
File sqlScript = connectorArguments.getSqlScript();
if (sqlScript != null) {
tasks.add(new JdbcRunSQLScript(sqlScript));
}
}
connector.addTasksTo(tasks, arguments);
connector.addTasksTo(tasks, connectorArguments);

// The default output file is based on the connector.
// We had a customer request to base it on the database, but that isn't well-defined,
// as there may be 0 or N databases in a single file.
String outputFileLocation = getOutputFileLocation(connector, arguments);
String outputFileLocation = getOutputFileLocation(connector, connectorArguments);

if (arguments.isDryRun()) {
if (connectorArguments.isDryRun()) {
String title = "Dry run: Printing task list for " + connector.getName();
System.out.println(title);
System.out.println(repeat('=', title.length()));
Expand All @@ -169,7 +125,7 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
boolean requiredTaskSucceeded = false;

try (Closer closer = Closer.create()) {
Path outputPath = prepareOutputPath(outputFileLocation, closer, arguments);
Path outputPath = prepareOutputPath(outputFileLocation, closer, connectorArguments);

URI outputUri = URI.create("jar:" + outputPath.toUri());

Expand All @@ -184,15 +140,21 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
logger.debug("Target filesystem is [{}]", sinkFactory);

Handle handle = closer.register(connector.open(arguments));
Handle handle = closer.register(connector.open(connectorArguments));

new TasksRunner(sinkFactory, handle, arguments.getThreadPoolSize(), state, tasks, arguments)
new TasksRunner(
sinkFactory,
handle,
connectorArguments.getThreadPoolSize(),
state,
tasks,
connectorArguments)
.run();

requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);

telemetryProcessor.addDumperRunMetricsToPayload(
arguments, state, stopwatch, requiredTaskSucceeded);
connectorArguments, state, stopwatch, requiredTaskSucceeded);
telemetryProcessor.processTelemetry(fileSystem);
} finally {
// We must do this in finally after the ZipFileSystem has been closed.
Expand All @@ -215,6 +177,41 @@ protected boolean run(@Nonnull Connector connector, @Nonnull ConnectorArguments
}
}

private void print(@Nonnull Task<?> task, int indent) {
System.out.println(repeat(' ', indent * 2) + task);
if (task instanceof TaskGroup) {
for (Task<?> subtask : ((TaskGroup) task).getTasks()) {
print(subtask, indent + 1);
}
}
}

private Path prepareOutputPath(
@Nonnull String fileName, @Nonnull Closer closer, @Nonnull ConnectorArguments arguments)
throws IOException {
Matcher matcher = GCS_PATH_PATTERN.matcher(fileName);
if (matcher.matches()) {
String bucket = matcher.group("bucket");
String path = matcher.group("path");
logger.debug(
"Setting up CloudStorageFileSystem with bucket '{}' and path '{}'.", bucket, path);
CloudStorageFileSystem cloudStorageFileSystem =
closer.register(CloudStorageFileSystem.forBucket(bucket));
return cloudStorageFileSystem.getPath(path);
} else {
Path path = Paths.get(fileName);
File file = path.toFile();
if (file.exists()) {
if (!arguments.isOutputContinue()) {
file.delete(); // It's a simple file, and we were asked to overwrite it.
}
} else {
Files.createParentDirs(file);
}
return path;
}
}

private String getOutputFileLocation(Connector connector, ConnectorArguments arguments) {
Clock clock = Clock.systemDefaultZone();
// The default output file is based on the connector.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper;

import com.google.common.base.Stopwatch;
import com.google.edwmigration.dumper.application.dumper.metrics.*;
import com.google.edwmigration.dumper.application.dumper.task.TaskSetState;
import java.nio.file.FileSystem;

/**
* Strategy implementation that does nothing (no-op). This replaces the behavior when shouldWrite =
* false.
*/
public class NoOpTelemetryStrategy implements TelemetryStrategy {

@Override
public void processDumperRunMetrics(
ClientTelemetry clientTelemetry,
ConnectorArguments arguments,
TaskSetState state,
Stopwatch stopwatch,
boolean success) {
// Do nothing - this is the no-op strategy
}

@Override
public void writeTelemetry(FileSystem fileSystem, ClientTelemetry clientTelemetry) {
// Do nothing - this is the no-op strategy
}
}
Loading
Loading