Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
/*
* Copyright 2022-2025 Google LLC
* Copyright 2013-2021 CompilerWorks
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper;

import static java.nio.file.Files.newBufferedWriter;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.edwmigration.dumper.application.dumper.metrics.*;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.ArrayDeque;
import java.util.Queue;
import net.harawata.appdirs.AppDirs;
import net.harawata.appdirs.AppDirsFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Strategy implementation that writes telemetry data. This replaces the behavior when shouldWrite =
* true.
*/
public class DiskTelemetryWriteStrategy implements TelemetryWriteStrategy {
private static final Logger logger = LoggerFactory.getLogger(DiskTelemetryWriteStrategy.class);
private static final String ALL_DUMPER_RUN_METRICS = "all-dumper-telemetry.jsonl";
private static final String DUMPER_RUN_METRICS = "dumper-telemetry.jsonl";
private final Path telemetryOsCachePath;
private final ObjectMapper MAPPER;
private final Queue<ClientTelemetry> bufferOs;
private final Queue<ClientTelemetry> bufferZip;
private FileSystem fileSystem;
private boolean telemetryOsCacheIsAvailable = true;

public DiskTelemetryWriteStrategy() {
bufferOs = new ArrayDeque<>();
bufferZip = new ArrayDeque<>();
MAPPER = createObjectMapper();
telemetryOsCachePath = Paths.get(createTelemetryOsDirIfNotExists(), ALL_DUMPER_RUN_METRICS);
}

private static ObjectMapper createObjectMapper() {
ObjectMapper mapper = new ObjectMapper();

mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
mapper.registerModule(new JavaTimeModule());
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

return mapper;
}

public void setZipFilePath(FileSystem fileSystem) {
this.fileSystem = fileSystem;
if (copyOsCacheToZip() && telemetryOsCacheIsAvailable) {
// these events were already registered in OsCache
bufferZip.clear();
}
flush();
}

@Override
public void process(ClientTelemetry clientTelemetry) {
logger.debug(
"Processing telemetry data with {} payload items", clientTelemetry.getPayload().size());

if (telemetryOsCacheIsAvailable) {
bufferOs.add(clientTelemetry);
}
bufferZip.add(clientTelemetry);

flush();
}

@Override
public void flush() {
// this implementation uses buffer until zip file is created afterwords it is flushed per
// process
flushOsCache();
flushZip();
}

private void flushOsCache() {
while (telemetryOsCacheIsAvailable && !bufferOs.isEmpty()) {
try {
writeOnDisk(telemetryOsCachePath, MAPPER.writeValueAsString(bufferOs.poll()));
} catch (JsonProcessingException e) {
logger.warn("Failed to serialize telemetry to write in Os Cache", e);
}
}
}

private void flushZip() {
while (fileSystem != null && !bufferZip.isEmpty()) {
try {
writeOnDisk(
fileSystem.getPath(DUMPER_RUN_METRICS), MAPPER.writeValueAsString(bufferZip.poll()));
} catch (JsonProcessingException e) {
logger.warn("Failed to serialize telemetry to write in Zip", e);
}
}
}

private String createTelemetryOsDirIfNotExists() {
AppDirs appDirs = AppDirsFactory.getInstance();

String appName = "DWH-Dumper";
String appVersion = ""; // All versions are accumulated in the same file
String appAuthor = "google"; // Optional, can be null
String cacheDir = appDirs.getUserCacheDir(appName, appVersion, appAuthor);
Path applicationCacheDirPath = Paths.get(cacheDir);
if (java.nio.file.Files.notExists(applicationCacheDirPath)) {
try {
java.nio.file.Files.createDirectories(applicationCacheDirPath);
logger.info("Created application telemetry cache directory: {}", applicationCacheDirPath);
} catch (IOException e) {
disableOsCache();
logger.warn(
"Unable to create application telemetry cache directory : {}", applicationCacheDirPath);
}
}

return cacheDir;
}

private boolean copyOsCacheToZip() {
Path snapshotInZipPath = fileSystem.getPath(DUMPER_RUN_METRICS);
try {
Path parentInZip = snapshotInZipPath.getParent();
if (parentInZip != null && java.nio.file.Files.notExists(parentInZip)) {
java.nio.file.Files.createDirectories(parentInZip);
}
java.nio.file.Files.copy(
telemetryOsCachePath, snapshotInZipPath, StandardCopyOption.REPLACE_EXISTING);
logger.debug(
"Copied Cached {} telemetry to zip file {}.", telemetryOsCachePath, snapshotInZipPath);
return true;
} catch (IOException e) {
logger.warn(
"Failed to copy cached telemetry from {} to ZIP at {}",
telemetryOsCachePath,
snapshotInZipPath,
e);
return false;
}
}

/** Appends the given summary lines for the current run to the external log file. */
private static void writeOnDisk(Path path, String summaryLines) {
try (BufferedWriter writer =
newBufferedWriter(
path,
StandardCharsets.UTF_8,
StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
PrintWriter printer = new PrintWriter(writer)) {

printer.println(summaryLines);
printer.flush();
} catch (IOException e) {
logger.warn("Failed to append to external cumulative summary log: {}", path, e);
}
}

private void disableOsCache() {
telemetryOsCacheIsAvailable = false;
bufferOs.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
import com.google.edwmigration.dumper.application.dumper.io.FileSystemOutputHandleFactory;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandleFactory;
import com.google.edwmigration.dumper.application.dumper.metrics.ClientTelemetry;
import com.google.edwmigration.dumper.application.dumper.metrics.EventType;
import com.google.edwmigration.dumper.application.dumper.task.ArgumentsTask;
import com.google.edwmigration.dumper.application.dumper.task.JdbcRunSQLScript;
import com.google.edwmigration.dumper.application.dumper.task.Task;
Expand All @@ -44,6 +46,7 @@
import java.nio.file.Paths;
import java.time.Clock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
Expand All @@ -61,14 +64,22 @@ public class MetadataDumper {
private static final Pattern GCS_PATH_PATTERN =
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");

private TelemetryProcessor telemetryProcessor;
private ConnectorArguments connectorArguments;
private final TelemetryProcessor telemetryProcessor;
private final ConnectorArguments connectorArguments;

public MetadataDumper(String... args) throws Exception {
this.connectorArguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
telemetryProcessor =
new TelemetryProcessor(
TelemetryStrategyFactory.createStrategy(connectorArguments.isTelemetryOn()));
telemetryProcessor.process(
ClientTelemetry.builder().setEventType(EventType.DUMPER_RUN_START).build());
telemetryProcessor.process(
ClientTelemetry.builder()
.setEventType(EventType.METADATA)
.setPayload(Collections.singletonList(StartUpMetaInfoProcessor.getDumperMetadata()))
.build());

if (connectorArguments.saveResponseFile()) {
JsonResponseFile.save(connectorArguments);
}
Expand Down Expand Up @@ -140,6 +151,8 @@ protected boolean run(@Nonnull Connector connector) throws Exception {
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
logger.debug("Target filesystem is [{}]", sinkFactory);

telemetryProcessor.setZipFilePathForDiskWriteStrategy(fileSystem);

Handle handle = closer.register(connector.open(connectorArguments));

new TasksRunner(
Expand All @@ -148,14 +161,15 @@ protected boolean run(@Nonnull Connector connector) throws Exception {
connectorArguments.getThreadPoolSize(),
state,
tasks,
connectorArguments)
connectorArguments,
telemetryProcessor)
.run();

requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);

telemetryProcessor.addDumperRunMetricsToPayload(
connectorArguments, state, stopwatch, requiredTaskSucceeded);
telemetryProcessor.processTelemetry(fileSystem);
telemetryProcessor.process(
ClientTelemetry.builder().setEventType(EventType.DUMPER_RUN_END).build());
telemetryProcessor.flush();
} finally {
// We must do this in finally after the ZipFileSystem has been closed.
File outputFile = new File(outputFileLocation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,29 +16,20 @@
*/
package com.google.edwmigration.dumper.application.dumper;

import com.google.common.base.Stopwatch;
import com.google.edwmigration.dumper.application.dumper.metrics.*;
import com.google.edwmigration.dumper.application.dumper.task.TaskSetState;
import java.nio.file.FileSystem;

/**
* Strategy implementation that does nothing (no-op). This replaces the behavior when shouldWrite =
* false.
*/
public class NoOpTelemetryStrategy implements TelemetryStrategy {

public class NoOpTelemetryWriteStrategy implements TelemetryWriteStrategy {
@Override
public void processDumperRunMetrics(
ClientTelemetry clientTelemetry,
ConnectorArguments arguments,
TaskSetState state,
Stopwatch stopwatch,
boolean success) {
public void process(ClientTelemetry clientTelemetry) {
// Do nothing - this is the no-op strategy
}

@Override
public void writeTelemetry(FileSystem fileSystem, ClientTelemetry clientTelemetry) {
public void flush() {
// Do nothing - this is the no-op strategy
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import com.google.edwmigration.dumper.application.dumper.io.OutputHandle;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandle.WriteMode;
import com.google.edwmigration.dumper.application.dumper.io.OutputHandleFactory;
import com.google.edwmigration.dumper.application.dumper.metrics.ClientTelemetry;
import com.google.edwmigration.dumper.application.dumper.metrics.EventType;
import com.google.edwmigration.dumper.application.dumper.metrics.TaskRunMetrics;
import com.google.edwmigration.dumper.application.dumper.task.Task;
import com.google.edwmigration.dumper.application.dumper.task.TaskGroup;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
Expand All @@ -35,8 +38,7 @@
import java.nio.charset.StandardCharsets;
import java.sql.SQLException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.CheckForNull;
import javax.annotation.Nonnull;
Expand All @@ -56,17 +58,21 @@ public class TasksRunner implements TaskRunContextOps {
private final TaskRunContext context;
private final TaskSetState.Impl state;
private final List<Task<?>> tasks;
private final TelemetryProcessor telemetryProcessor;
private final HashMap<Task<?>, String> metricToErrorMap = new HashMap<>();

public TasksRunner(
OutputHandleFactory sinkFactory,
Handle handle,
int threadPoolSize,
@Nonnull TaskSetState.Impl state,
List<Task<?>> tasks,
ConnectorArguments arguments) {
ConnectorArguments arguments,
TelemetryProcessor telemetryProcessor) {
context = createContext(sinkFactory, handle, threadPoolSize, arguments);
this.state = state;
this.tasks = tasks;
this.telemetryProcessor = telemetryProcessor;
totalNumberOfTasks = countTasks(tasks);
stopwatch = Stopwatch.createStarted();
numberOfCompletedTasks = new AtomicInteger();
Expand All @@ -93,10 +99,41 @@ public <T> T runChildTask(@Nonnull Task<T> task) throws MetadataDumperUsageExcep

public void run() throws MetadataDumperUsageException {
for (Task<?> task : tasks) {
String taskEventId = UUID.randomUUID().toString();
processTaskStartTelemetry(task, taskEventId);

handleTask(task);

processTaskEndTelemetry(
task, getTaskState(task), metricToErrorMap.getOrDefault(task, null), taskEventId);
}
}

private void processTaskStartTelemetry(Task<?> task, String taskEventId) {
TaskRunMetrics taskRunMetrics =
new TaskRunMetrics(task.getName(), task.getClass().toString(), null, null);

telemetryProcessor.process(
ClientTelemetry.builder()
.setEventId(taskEventId)
.setEventType(EventType.TASK_RUN_START)
.setPayload(Collections.singletonList(taskRunMetrics))
.build());
}

private void processTaskEndTelemetry(
Task<?> task, TaskState taskStatus, String error, String taskEventId) {
TaskRunMetrics taskMetrics =
new TaskRunMetrics(task.getName(), task.getClass().toString(), taskStatus.name(), error);

telemetryProcessor.process(
ClientTelemetry.builder()
.setEventId(taskEventId)
.setEventType(EventType.TASK_RUN_END)
.setPayload(Collections.singletonList(taskMetrics))
.build());
}

@CheckForNull
private <T> T handleTask(Task<T> task) throws MetadataDumperUsageException {
T t = runTask(task);
Expand Down Expand Up @@ -168,6 +205,7 @@ private <T> T runTask(Task<T> task) throws MetadataDumperUsageException {
else if (!task.handleException(e))
logger.warn("Task failed: {}: {}", task, e.getMessage(), e);
state.setTaskException(task, TaskState.FAILED, e);
metricToErrorMap.put(task, e.getMessage());
try {
OutputHandle sink = context.newOutputFileHandle(task.getTargetPath() + ".exception.txt");
sink.asCharSink(StandardCharsets.UTF_8, WriteMode.CREATE_TRUNCATE)
Expand All @@ -178,6 +216,7 @@ else if (!task.handleException(e))
String.valueOf(new DumperDiagnosticQuery(e).call())));
} catch (Exception f) {
logger.warn("Exception-recorder failed: {}", f.getMessage(), f);
metricToErrorMap.put(task, f.getMessage());
}
}
return null;
Expand Down
Loading