Skip to content

Commit 39b955b

Browse files
authored
Remove writeRecordsCsv, refactor the usage metrics task for Redshift (#660)
* remove writeRecordsCsv which worked on lists of a file's all records * create CsvRecordWriter for writing records one at a time * replace Optional with Nullable to simplify a helper method * move the list of extracted cluster usage metric types to RedshiftClusterUsageMetricsTask * refactor record formatting in RedshiftClusterUsageMetricsTask
1 parent 2f29dbd commit 39b955b

File tree

5 files changed

+92
-98
lines changed

5 files changed

+92
-98
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractAwsApiTask.java

Lines changed: 35 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19-
import static com.google.edwmigration.dumper.application.dumper.utils.OptionalUtils.optionallyWhen;
20-
2119
import com.amazonaws.auth.AWSCredentialsProvider;
2220
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2321
import com.amazonaws.auth.BasicAWSCredentials;
@@ -33,9 +31,9 @@
3331
import java.io.IOException;
3432
import java.io.Writer;
3533
import java.nio.charset.StandardCharsets;
36-
import java.util.List;
3734
import java.util.Optional;
3835
import javax.annotation.Nonnull;
36+
import javax.annotation.Nullable;
3937
import org.apache.commons.csv.CSVFormat;
4038
import org.apache.commons.csv.CSVPrinter;
4139

@@ -80,33 +78,48 @@ public AmazonCloudWatch cloudWatchApiClient() {
8078
() -> AmazonCloudWatchClient.builder().withCredentials(credentialsProvider).build());
8179
}
8280

83-
public void writeRecordsCsv(@Nonnull ByteSink sink, List<Object[]> records) throws IOException {
84-
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
85-
try (RecordProgressMonitor monitor = new RecordProgressMonitor(getName());
86-
Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
87-
CSVPrinter printer = format.print(writer);
81+
static class CsvRecordWriter implements AutoCloseable {
82+
private final CSVPrinter printer;
83+
private final RecordProgressMonitor monitor;
84+
private final Writer writer;
85+
86+
CsvRecordWriter(ByteSink sink, CSVFormat format, String name) throws IOException {
87+
monitor = new RecordProgressMonitor(name);
88+
writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream();
89+
printer = format.print(writer);
90+
}
91+
92+
void handleRecord(Object... record) throws IOException {
93+
monitor.count();
94+
printer.printRecord(record);
95+
}
8896

89-
for (Object[] record : records) {
90-
monitor.count();
91-
printer.printRecord(record);
92-
}
97+
@Override
98+
public void close() throws IOException {
99+
// close Monitor first, because closing Writer can throw a checked exception
100+
monitor.close();
101+
writer.close();
93102
}
94103
}
95104

96105
public static Optional<AWSCredentialsProvider> createCredentialsProvider(
97106
ConnectorArguments arguments) {
107+
return Optional.ofNullable(doCreateProvider(arguments));
108+
}
109+
110+
@Nullable
111+
private static AWSCredentialsProvider doCreateProvider(ConnectorArguments arguments) {
98112
String profileName = arguments.getIAMProfile();
99113
if (profileName != null) {
100-
return Optional.of(new ProfileCredentialsProvider(profileName));
114+
return new ProfileCredentialsProvider(profileName);
115+
}
116+
String accessKeyId = arguments.getIAMAccessKeyID();
117+
String secretAccessKey = arguments.getIAMSecretAccessKey();
118+
if (accessKeyId != null && secretAccessKey != null) {
119+
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
120+
return new AWSStaticCredentialsProvider(credentials);
121+
} else {
122+
return null;
101123
}
102-
103-
String iamAccessKey = arguments.getIAMAccessKeyID();
104-
String iamSecretAccessKey = arguments.getIAMSecretAccessKey();
105-
106-
return optionallyWhen(
107-
iamAccessKey != null && iamSecretAccessKey != null,
108-
() ->
109-
new AWSStaticCredentialsProvider(
110-
new BasicAWSCredentials(iamAccessKey, iamSecretAccessKey)));
111124
}
112125
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterNodesTask.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,20 @@
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

1919
import static com.google.edwmigration.dumper.application.dumper.SummaryPrinter.joinSummaryDoubleLine;
20-
import static java.util.stream.Collectors.toList;
2120

2221
import com.amazonaws.auth.AWSCredentialsProvider;
2322
import com.amazonaws.services.redshift.AmazonRedshift;
23+
import com.amazonaws.services.redshift.model.Cluster;
2424
import com.amazonaws.services.redshift.model.DescribeClustersRequest;
2525
import com.amazonaws.services.redshift.model.DescribeClustersResult;
2626
import com.google.common.io.ByteSink;
27+
import com.google.edwmigration.dumper.application.dumper.connector.redshift.AbstractAwsApiTask.CsvRecordWriter;
2728
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
2829
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2930
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftMetadataDumpFormat.ClusterNodes;
3031
import java.io.IOException;
3132
import javax.annotation.Nonnull;
33+
import org.apache.commons.csv.CSVFormat;
3234

3335
/** Extraction task to get information about Redshift Cluster nodes from AWS API. */
3436
public class RedshiftClusterNodesTask extends AbstractAwsApiTask {
@@ -44,19 +46,17 @@ protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, Handle hand
4446
DescribeClustersRequest request = new DescribeClustersRequest();
4547
DescribeClustersResult result = client.describeClusters(request);
4648

47-
writeRecordsCsv(
48-
sink,
49-
result.getClusters().stream()
50-
.map(
51-
cluster ->
52-
new Object[] {
53-
cluster.getClusterIdentifier(),
54-
cluster.getEndpoint() != null ? cluster.getEndpoint().getAddress() : "",
55-
cluster.getNumberOfNodes(),
56-
cluster.getNodeType(),
57-
cluster.getTotalStorageCapacityInMegaBytes()
58-
})
59-
.collect(toList()));
49+
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
50+
try (CsvRecordWriter writer = new CsvRecordWriter(sink, format, getName())) {
51+
for (Cluster item : result.getClusters()) {
52+
writer.handleRecord(
53+
item.getClusterIdentifier(),
54+
item.getEndpoint() != null ? item.getEndpoint().getAddress() : "",
55+
item.getNumberOfNodes(),
56+
item.getNodeType(),
57+
item.getTotalStorageCapacityInMegaBytes());
58+
}
59+
}
6060
return null;
6161
}
6262

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterUsageMetricsTask.java

Lines changed: 39 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,6 @@
1818

1919
import static com.google.common.collect.ImmutableList.toImmutableList;
2020
import static com.google.edwmigration.dumper.application.dumper.SummaryPrinter.joinSummaryDoubleLine;
21-
import static java.util.stream.Collectors.groupingBy;
22-
import static java.util.stream.Collectors.toList;
23-
import static java.util.stream.Collectors.toMap;
2421

2522
import com.amazonaws.auth.AWSCredentialsProvider;
2623
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
@@ -33,7 +30,6 @@
3330
import com.amazonaws.services.redshift.model.DescribeClustersRequest;
3431
import com.google.auto.value.AutoValue;
3532
import com.google.common.collect.ImmutableList;
36-
import com.google.common.collect.ImmutableSortedMap;
3733
import com.google.common.io.ByteSink;
3834
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
3935
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricDataPoint;
@@ -48,9 +44,9 @@
4844
import java.time.format.DateTimeFormatter;
4945
import java.util.Date;
5046
import java.util.List;
51-
import java.util.Map;
52-
import java.util.stream.Stream;
47+
import java.util.TreeMap;
5348
import javax.annotation.Nonnull;
49+
import org.apache.commons.csv.CSVFormat;
5450

5551
/** Extraction task to get Redshift time series metrics from AWS CloudWatch API. */
5652
public class RedshiftClusterUsageMetricsTask extends AbstractAwsApiTask {
@@ -97,58 +93,55 @@ public static MetricDataPoint create(Instant instant, Double value, MetricConfig
9793

9894
private final ZonedDateTime currentTime;
9995
private final ZonedInterval interval;
100-
private final List<MetricConfig> metrics;
10196
private final String zipEntryName;
10297

10398
public RedshiftClusterUsageMetricsTask(
10499
AWSCredentialsProvider credentialsProvider,
105100
ZonedDateTime currentTime,
106101
ZonedInterval interval,
107-
String zipEntryName,
108-
ImmutableList<MetricConfig> metrics) {
102+
String zipEntryName) {
109103
super(
110104
credentialsProvider,
111105
zipEntryName,
112106
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.Header.class);
113107
this.interval = interval;
114-
this.metrics = metrics;
115108
this.currentTime = currentTime;
116109
this.zipEntryName = zipEntryName;
117110
}
118111

119112
@Override
120-
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, Handle handle)
113+
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
121114
throws IOException {
122-
writeRecordsCsv(
123-
sink,
124-
listClusters().stream()
125-
.flatMap(
126-
cluster ->
127-
getClusterMetrics(cluster.getClusterIdentifier()).entrySet().stream()
128-
.map(
129-
metricEntry ->
130-
serializeCsvRow(
131-
cluster.getClusterIdentifier(),
132-
metricEntry.getKey(),
133-
metricEntry.getValue())))
134-
.collect(toList()));
115+
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
116+
try (CsvRecordWriter writer = new CsvRecordWriter(sink, format, getName())) {
117+
AmazonRedshift client = redshiftApiClient();
118+
List<Cluster> clusters = client.describeClusters(new DescribeClustersRequest()).getClusters();
119+
for (Cluster item : clusters) {
120+
writeCluster(writer, item.getClusterIdentifier());
121+
}
122+
}
135123
return null;
136124
}
137125

138-
private Object[] serializeCsvRow(
139-
String clusterId, Instant instant, List<MetricDataPoint> dataPoints) {
140-
Map<MetricConfig, Double> values =
141-
dataPoints.stream().collect(toMap(MetricDataPoint::metricConfig, MetricDataPoint::value));
142-
return Stream.concat(
143-
Stream.of(new Object[] {clusterId, DATE_FORMAT.format(instant)}),
144-
metrics.stream().map(metric -> values.get(metric)))
145-
.toArray();
146-
}
126+
private void writeCluster(CsvRecordWriter writer, String clusterId) throws IOException {
127+
TreeMap<Instant, String> cpuPoints = new TreeMap<>();
128+
TreeMap<Instant, String> diskPoints = new TreeMap<>();
147129

148-
private ImmutableList<Cluster> listClusters() {
149-
AmazonRedshift client = redshiftApiClient();
150-
return ImmutableList.copyOf(
151-
client.describeClusters(new DescribeClustersRequest()).getClusters());
130+
for (MetricDataPoint dataPoint : getCpuMetrics(clusterId)) {
131+
cpuPoints.put(dataPoint.instant(), dataPoint.value().toString());
132+
diskPoints.putIfAbsent(dataPoint.instant(), "");
133+
}
134+
135+
for (MetricDataPoint dataPoint : getDiskMetrics(clusterId)) {
136+
cpuPoints.putIfAbsent(dataPoint.instant(), "");
137+
diskPoints.put(dataPoint.instant(), dataPoint.value().toString());
138+
}
139+
140+
for (Instant key : cpuPoints.keySet()) {
141+
String cpuValue = cpuPoints.get(key);
142+
String diskValue = diskPoints.get(key);
143+
writer.handleRecord(clusterId, DATE_FORMAT.format(key), cpuValue, diskValue);
144+
}
152145
}
153146

154147
private ImmutableList<MetricDataPoint> getMetricDataPoints(
@@ -184,12 +177,15 @@ private Double getDatapointValue(MetricConfig metricConfig, Datapoint datapoint)
184177
}
185178
}
186179

187-
private ImmutableSortedMap<Instant, List<MetricDataPoint>> getClusterMetrics(
188-
String clusterIdentifier) {
189-
return ImmutableSortedMap.copyOf(
190-
metrics.stream()
191-
.flatMap(metricConfig -> getMetricDataPoints(clusterIdentifier, metricConfig).stream())
192-
.collect(groupingBy(MetricDataPoint::instant)));
180+
private ImmutableList<MetricDataPoint> getDiskMetrics(String clusterId) {
181+
MetricConfig config =
182+
MetricConfig.create(MetricName.PercentageDiskSpaceUsed, MetricType.Average);
183+
return getMetricDataPoints(clusterId, config);
184+
}
185+
186+
private ImmutableList<MetricDataPoint> getCpuMetrics(String clusterId) {
187+
MetricConfig config = MetricConfig.create(MetricName.CPUUtilization, MetricType.Average);
188+
return getMetricDataPoints(clusterId, config);
193189
}
194190

195191
/**

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftRawLogsConnector.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19-
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricConfig;
20-
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricName;
21-
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricType;
2219
import static com.google.edwmigration.dumper.application.dumper.utils.ArchiveNameUtil.getEntryFileNameWithTimestamp;
2320

2421
import com.google.auto.service.AutoService;
@@ -38,9 +35,6 @@
3835
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
3936
import com.google.edwmigration.dumper.application.dumper.connector.ZonedIntervalIterable;
4037
import com.google.edwmigration.dumper.application.dumper.connector.ZonedIntervalIterableGenerator;
41-
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricConfig;
42-
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricName;
43-
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricType;
4438
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
4539
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
4640
import com.google.edwmigration.dumper.application.dumper.task.JdbcSelectIntervalTask;
@@ -202,13 +196,7 @@ public void addTasksTo(List<? super Task<?>> out, ConnectorArguments arguments)
202196
"service_class_start_time",
203197
parallelTask);
204198

205-
makeClusterMetricsTasks(
206-
arguments,
207-
intervals,
208-
ImmutableList.of(
209-
MetricConfig.create(MetricName.CPUUtilization, MetricType.Average),
210-
MetricConfig.create(MetricName.PercentageDiskSpaceUsed, MetricType.Average)),
211-
out);
199+
makeClusterMetricsTasks(arguments, intervals, out);
212200
}
213201
}
214202

@@ -250,10 +238,7 @@ private void makeTasks(
250238

251239
/** Creates tasks to get Redshift cluster metrics from AWS CloudWatch API. */
252240
private void makeClusterMetricsTasks(
253-
ConnectorArguments arguments,
254-
ZonedIntervalIterable intervals,
255-
ImmutableList<MetricConfig> metrics,
256-
List<? super Task<?>> out) {
241+
ConnectorArguments arguments, ZonedIntervalIterable intervals, List<? super Task<?>> out) {
257242
AbstractAwsApiTask.createCredentialsProvider(arguments)
258243
.ifPresent(
259244
awsCredentials -> {
@@ -263,7 +248,7 @@ private void makeClusterMetricsTasks(
263248
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.ZIP_ENTRY_PREFIX, interval);
264249
out.add(
265250
new RedshiftClusterUsageMetricsTask(
266-
awsCredentials, ZonedDateTime.now(), interval, file, metrics));
251+
awsCredentials, ZonedDateTime.now(), interval, file));
267252
}
268253
});
269254
}

dumper/app/src/test/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterUsageMetricsTaskTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ public void doRun_success() throws Exception {
123123

124124
RedshiftClusterUsageMetricsTask task =
125125
new RedshiftClusterUsageMetricsTask(
126-
null, CURR_DATE_TIME, TEST_INTERVAL, TEST_ZIP_ENTRY_NAME, testMetrics);
126+
null, CURR_DATE_TIME, TEST_INTERVAL, TEST_ZIP_ENTRY_NAME);
127127
task.withRedshiftApiClient(redshiftClientMock);
128128
task.withCloudWatchApiClient(cloudWatchClientMock);
129129

0 commit comments

Comments
 (0)