Skip to content

Commit 92b7bb5

Browse files
committed
make telemetry streaming again
1 parent 0d04095 commit 92b7bb5

File tree

13 files changed

+203
-619
lines changed

13 files changed

+203
-619
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/DiskTelemetryWriteStrategy.java

Lines changed: 146 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,23 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper;
1818

19-
import com.google.common.base.Stopwatch;
19+
import static java.nio.file.Files.newBufferedWriter;
20+
21+
import com.fasterxml.jackson.annotation.JsonInclude;
22+
import com.fasterxml.jackson.core.JsonProcessingException;
23+
import com.fasterxml.jackson.databind.ObjectMapper;
24+
import com.fasterxml.jackson.databind.SerializationFeature;
25+
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
2026
import com.google.edwmigration.dumper.application.dumper.metrics.*;
21-
import com.google.edwmigration.dumper.application.dumper.task.TaskSetState;
22-
import java.nio.file.FileSystem;
23-
import java.time.Duration;
24-
import java.time.ZonedDateTime;
25-
import java.util.List;
26-
import java.util.UUID;
27-
import java.util.stream.Collectors;
27+
import java.io.BufferedWriter;
28+
import java.io.IOException;
29+
import java.io.PrintWriter;
30+
import java.nio.charset.StandardCharsets;
31+
import java.nio.file.*;
32+
import java.util.ArrayDeque;
33+
import java.util.Queue;
34+
import net.harawata.appdirs.AppDirs;
35+
import net.harawata.appdirs.AppDirsFactory;
2836
import org.slf4j.Logger;
2937
import org.slf4j.LoggerFactory;
3038

@@ -34,80 +42,146 @@
3442
*/
3543
public class DiskTelemetryWriteStrategy implements TelemetryWriteStrategy {
3644
private static final Logger logger = LoggerFactory.getLogger(DiskTelemetryWriteStrategy.class);
45+
private static final String ALL_DUMPER_RUN_METRICS = "all-dumper-telemetry.jsonl";
46+
private static final String DUMPER_RUN_METRICS = "dumper-telemetry.jsonl";
47+
private final Path telemetryOsCachePath;
48+
private final ObjectMapper MAPPER;
49+
private final Queue<ClientTelemetry> bufferOs;
50+
private final Queue<ClientTelemetry> bufferZip;
51+
private FileSystem fileSystem;
52+
private boolean telemetryOsCacheIsAvailable = true;
3753

38-
// @Override
39-
// public void processDumperRunMetrics(
40-
// ClientTelemetry clientTelemetry,
41-
// ConnectorArguments arguments,
42-
// TaskSetState state,
43-
// Stopwatch stopwatch,
44-
// boolean success) {
45-
46-
// try {
47-
// clientTelemetry.setEventType(EventType.DUMPER_RUN_METRICS);
48-
49-
// List<TaskExecutionSummary> taskExecutionSummaries =
50-
// state.getTasksReports().stream()
51-
// .map(
52-
// tasksReport ->
53-
// new TaskExecutionSummary(tasksReport.count(), tasksReport.state().name()))
54-
// .collect(Collectors.toList());
55-
56-
// List<TaskDetailedSummary> taskDetailedSummaries =
57-
// state.getTaskResultSummaries().stream()
58-
// .map(
59-
// item ->
60-
// new TaskDetailedSummary(
61-
// item.getTask().getName(),
62-
// item.getTask().getCategory().name(),
63-
// item.getTaskState().name(),
64-
// item.getThrowable().isPresent()
65-
// ? item.getThrowable().get().getMessage()
66-
// : null))
67-
// .collect(Collectors.toList());
68-
69-
// Duration elapsed = stopwatch.elapsed();
70-
71-
// DumperRunMetrics dumperRunMetrics =
72-
// DumperRunMetrics.builder()
73-
// .setId(UUID.randomUUID().toString())
74-
// .setMeasureStartTime(ZonedDateTime.now().minus(elapsed))
75-
// .setRunDurationInMinutes(elapsed.getSeconds() / 60)
76-
// .setOverallStatus(success ? "SUCCESS" : "FAILURE")
77-
// .setTaskExecutionSummary(taskExecutionSummaries)
78-
// .setTaskDetailedSummary(taskDetailedSummaries)
79-
// .setArguments(arguments)
80-
// .build();
81-
// clientTelemetry.addToPayload(dumperRunMetrics);
82-
// } catch (Exception e) {
83-
// logger.warn("Failed to generate dumperRunMetrics and add it to payload", e);
84-
// }
85-
// }
54+
public DiskTelemetryWriteStrategy() {
55+
bufferOs = new ArrayDeque<>();
56+
bufferZip = new ArrayDeque<>();
57+
MAPPER = createObjectMapper();
58+
telemetryOsCachePath = Paths.get(createTelemetryOsDirIfNotExists(), ALL_DUMPER_RUN_METRICS);
59+
}
8660

87-
@Override
88-
public void process(ClientTelemetry clientTelemetry) {
89-
// For disk strategy, processing means preparing the telemetry for writing
90-
// The actual processing is done in processDumperRunMetrics
91-
logger.debug("Processing telemetry data with {} payload items",
92-
clientTelemetry.getPayload().size());
61+
private static ObjectMapper createObjectMapper() {
62+
ObjectMapper mapper = new ObjectMapper();
9363

94-
flush(null, clientTelemetry);
64+
mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY);
65+
mapper.registerModule(new JavaTimeModule());
66+
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
67+
68+
return mapper;
9569
}
9670

71+
public void setZipFilePath(FileSystem fileSystem) {
72+
this.fileSystem = fileSystem;
73+
if (copyOsCacheToZip() && telemetryOsCacheIsAvailable) {
74+
// these events were already registered in OsCache
75+
bufferZip.clear();
76+
}
77+
flush();
78+
}
9779

9880
@Override
99-
public void flush(FileSystem fileSystem) {
100-
// this implementation does not a buffer. It always flushes on process
81+
public void process(ClientTelemetry clientTelemetry) {
82+
logger.debug(
83+
"Processing telemetry data with {} payload items", clientTelemetry.getPayload().size());
84+
85+
if (telemetryOsCacheIsAvailable) {
86+
bufferOs.add(clientTelemetry);
87+
}
88+
bufferZip.add(clientTelemetry);
89+
90+
flush();
10191
}
10292

10393
@Override
104-
public void flush(FileSystem fileSystem, ClientTelemetry clientTelemetry) {
105-
try {
106-
// Write immediately to disk
107-
TelemetryWriter.write(fileSystem, clientTelemetry);
108-
logger.debug("Flushed telemetry data immediately to disk");
109-
} catch (Exception e) {
110-
logger.warn("Failed to flush telemetry data", e);
94+
public void flush() {
95+
// this implementation uses buffer until zip file is created afterwords it is flushed per
96+
// process
97+
flushOsCache();
98+
flushZip();
99+
}
100+
101+
private void flushOsCache() {
102+
while (telemetryOsCacheIsAvailable && !bufferOs.isEmpty()) {
103+
try {
104+
writeOnDisk(telemetryOsCachePath, MAPPER.writeValueAsString(bufferOs.poll()));
105+
} catch (JsonProcessingException e) {
106+
logger.warn("Failed to serialize telemetry to write in Os Cache", e);
107+
}
108+
}
109+
}
110+
111+
private void flushZip() {
112+
while (fileSystem != null && !bufferZip.isEmpty()) {
113+
try {
114+
writeOnDisk(
115+
fileSystem.getPath(DUMPER_RUN_METRICS), MAPPER.writeValueAsString(bufferZip.poll()));
116+
} catch (JsonProcessingException e) {
117+
logger.warn("Failed to serialize telemetry to write in Zip", e);
118+
}
119+
}
120+
}
121+
122+
private String createTelemetryOsDirIfNotExists() {
123+
AppDirs appDirs = AppDirsFactory.getInstance();
124+
125+
String appName = "DWH-Dumper";
126+
String appVersion = ""; // All versions are accumulated in the same file
127+
String appAuthor = "google"; // Optional, can be null
128+
String cacheDir = appDirs.getUserCacheDir(appName, appVersion, appAuthor);
129+
Path applicationCacheDirPath = Paths.get(cacheDir);
130+
if (java.nio.file.Files.notExists(applicationCacheDirPath)) {
131+
try {
132+
java.nio.file.Files.createDirectories(applicationCacheDirPath);
133+
logger.info("Created application telemetry cache directory: {}", applicationCacheDirPath);
134+
} catch (IOException e) {
135+
disableOsCache();
136+
logger.warn(
137+
"Unable to create application telemetry cache directory : {}", applicationCacheDirPath);
138+
}
111139
}
140+
141+
return cacheDir;
142+
}
143+
144+
private boolean copyOsCacheToZip() {
145+
Path snapshotInZipPath = fileSystem.getPath(DUMPER_RUN_METRICS);
146+
try {
147+
Path parentInZip = snapshotInZipPath.getParent();
148+
if (parentInZip != null && java.nio.file.Files.notExists(parentInZip)) {
149+
java.nio.file.Files.createDirectories(parentInZip);
150+
}
151+
java.nio.file.Files.copy(
152+
telemetryOsCachePath, snapshotInZipPath, StandardCopyOption.REPLACE_EXISTING);
153+
logger.debug(
154+
"Copied Cached {} telemetry to zip file {}.", telemetryOsCachePath, snapshotInZipPath);
155+
return true;
156+
} catch (IOException e) {
157+
logger.warn(
158+
"Failed to copy cached telemetry from {} to ZIP at {}",
159+
telemetryOsCachePath,
160+
snapshotInZipPath,
161+
e);
162+
return false;
163+
}
164+
}
165+
166+
/** Appends the given summary lines for the current run to the external log file. */
167+
private static void writeOnDisk(Path path, String summaryLines) {
168+
try (BufferedWriter writer =
169+
newBufferedWriter(
170+
path,
171+
StandardCharsets.UTF_8,
172+
StandardOpenOption.CREATE,
173+
StandardOpenOption.APPEND);
174+
PrintWriter printer = new PrintWriter(writer)) {
175+
176+
printer.println(summaryLines);
177+
printer.flush();
178+
} catch (IOException e) {
179+
logger.warn("Failed to append to external cumulative summary log: {}", path, e);
180+
}
181+
}
182+
183+
private void disableOsCache() {
184+
telemetryOsCacheIsAvailable = false;
185+
bufferOs.clear();
112186
}
113187
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/MetadataDumper.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import java.nio.file.Paths;
4747
import java.time.Clock;
4848
import java.util.ArrayList;
49+
import java.util.Collections;
4950
import java.util.List;
5051
import java.util.Map;
5152
import java.util.regex.Matcher;
@@ -63,14 +64,22 @@ public class MetadataDumper {
6364
private static final Pattern GCS_PATH_PATTERN =
6465
Pattern.compile("gs://(?<bucket>[^/]+)/(?<path>.*)");
6566

66-
private TelemetryProcessor telemetryProcessor;
67-
private ConnectorArguments connectorArguments;
67+
private final TelemetryProcessor telemetryProcessor;
68+
private final ConnectorArguments connectorArguments;
6869

6970
public MetadataDumper(String... args) throws Exception {
7071
this.connectorArguments = new ConnectorArguments(JsonResponseFile.addResponseFiles(args));
7172
telemetryProcessor =
7273
new TelemetryProcessor(
7374
TelemetryStrategyFactory.createStrategy(connectorArguments.isTelemetryOn()));
75+
telemetryProcessor.process(
76+
ClientTelemetry.builder().setEventType(EventType.DUMPER_RUN_START).build());
77+
telemetryProcessor.process(
78+
ClientTelemetry.builder()
79+
.setEventType(EventType.METADATA)
80+
.setPayload(Collections.singletonList(StartUpMetaInfoProcessor.getDumperMetadata()))
81+
.build());
82+
7483
if (connectorArguments.saveResponseFile()) {
7584
JsonResponseFile.save(connectorArguments);
7685
}
@@ -142,6 +151,8 @@ protected boolean run(@Nonnull Connector connector) throws Exception {
142151
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
143152
logger.debug("Target filesystem is [{}]", sinkFactory);
144153

154+
telemetryProcessor.setZipFilePathForDiskWriteStrategy(fileSystem);
155+
145156
Handle handle = closer.register(connector.open(connectorArguments));
146157

147158
new TasksRunner(
@@ -156,11 +167,9 @@ protected boolean run(@Nonnull Connector connector) throws Exception {
156167

157168
requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);
158169

159-
telemetryProcessor
160-
.process(ClientTelemetry.builder()
161-
.setEventType(EventType.DUMPER_RUN_END)
162-
.build());
163-
telemetryProcessor.flush(fileSystem);
170+
telemetryProcessor.process(
171+
ClientTelemetry.builder().setEventType(EventType.DUMPER_RUN_END).build());
172+
telemetryProcessor.flush();
164173
} finally {
165174
// We must do this in finally after the ZipFileSystem has been closed.
166175
File outputFile = new File(outputFileLocation);

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/NoOpTelemetryWriteStrategy.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,7 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper;
1818

19-
import com.google.common.base.Stopwatch;
2019
import com.google.edwmigration.dumper.application.dumper.metrics.*;
21-
import com.google.edwmigration.dumper.application.dumper.task.TaskSetState;
22-
import java.nio.file.FileSystem;
2320

2421
/**
2522
* Strategy implementation that does nothing (no-op). This replaces the behavior when shouldWrite =
@@ -32,7 +29,7 @@ public void process(ClientTelemetry clientTelemetry) {
3229
}
3330

3431
@Override
35-
public void flush(FileSystem fileSystem, ClientTelemetry clientTelemetry) {
32+
public void flush() {
3633
// Do nothing - this is the no-op strategy
3734
}
3835
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/TasksRunner.java

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,10 +38,7 @@
3838
import java.nio.charset.StandardCharsets;
3939
import java.sql.SQLException;
4040
import java.time.Duration;
41-
import java.util.Arrays;
42-
import java.util.HashMap;
43-
import java.util.List;
44-
import java.util.UUID;
41+
import java.util.*;
4542
import java.util.concurrent.atomic.AtomicInteger;
4643
import javax.annotation.CheckForNull;
4744
import javax.annotation.Nonnull;
@@ -107,30 +104,34 @@ public void run() throws MetadataDumperUsageException {
107104

108105
handleTask(task);
109106

110-
processTaskEndTelemetry(task, getTaskState(task), metricToErrorMap.getOrDefault(task, null), taskEventId);
107+
processTaskEndTelemetry(
108+
task, getTaskState(task), metricToErrorMap.getOrDefault(task, null), taskEventId);
111109
}
112110
}
113111

114112
private void processTaskStartTelemetry(Task<?> task, String taskEventId) {
115-
TaskRunMetrics taskRunMetrics = new TaskRunMetrics(task.getName(), task.getClass().toString(), null, null);
116-
113+
TaskRunMetrics taskRunMetrics =
114+
new TaskRunMetrics(task.getName(), task.getClass().toString(), null, null);
115+
117116
telemetryProcessor.process(
118-
ClientTelemetry.builder()
119-
.setEventId(taskEventId)
120-
.setEventType(EventType.TASK_RUN_START)
121-
.setPayload(Arrays.asList(taskRunMetrics))
122-
.build());
117+
ClientTelemetry.builder()
118+
.setEventId(taskEventId)
119+
.setEventType(EventType.TASK_RUN_START)
120+
.setPayload(Collections.singletonList(taskRunMetrics))
121+
.build());
123122
}
124123

125-
private void processTaskEndTelemetry(Task<?> task, TaskState taskStatus, String error, String taskEventId) {
126-
TaskRunMetrics taskMetrics = new TaskRunMetrics(task.getName(), task.getClass().toString(), taskStatus.name(), error);
127-
124+
private void processTaskEndTelemetry(
125+
Task<?> task, TaskState taskStatus, String error, String taskEventId) {
126+
TaskRunMetrics taskMetrics =
127+
new TaskRunMetrics(task.getName(), task.getClass().toString(), taskStatus.name(), error);
128+
128129
telemetryProcessor.process(
129-
ClientTelemetry.builder()
130-
.setEventId(taskEventId)
131-
.setEventType(EventType.TASK_RUN_START)
132-
.setPayload(Arrays.asList(taskMetrics))
133-
.build());
130+
ClientTelemetry.builder()
131+
.setEventId(taskEventId)
132+
.setEventType(EventType.TASK_RUN_END)
133+
.setPayload(Collections.singletonList(taskMetrics))
134+
.build());
134135
}
135136

136137
@CheckForNull

0 commit comments

Comments
 (0)