Skip to content

Commit 844ae9f

Browse files
authored
Extend Redshift raw logs with CPU and Storage metrics from AWS CloudWatch API (#335)
Add com.amazonaws:aws-java-sdk-cloudwatch library. Extend Redshift raw logs with Cluster usage metrics (Average CPU, Average Disk space used) Create new files cluster_usage_metrics_{DATE}.csv in the dwh-migration-redshift-raw-logs* ZIP file. Automatically determine metric period (60 / 300 / 3600 sec) based on interval time.
1 parent 4e83572 commit 844ae9f

File tree

8 files changed

+439
-0
lines changed

8 files changed

+439
-0
lines changed

dumper/app/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ dependencies {
6161
implementation libs.google.cloud.kms
6262
implementation libs.httpclient5
6363
implementation libs.aws.java.sdk.redshift
64+
implementation libs.aws.java.sdk.cloudwatch
6465

6566
runtimeOnly libs.logback.classic
6667
runtimeOnly libs.jcl.over.slf4j
@@ -129,6 +130,9 @@ licenseReport {
129130
"com.amazonaws:aws-java-sdk-core": [
130131
licenseUrl: "https://raw.githubusercontent.com/aws/aws-sdk-java/master/LICENSE.txt",
131132
],
133+
"com.amazonaws:aws-java-sdk-cloudwatch": [
134+
licenseUrl: "https://raw.githubusercontent.com/aws/aws-sdk-java/master/LICENSE.txt",
135+
],
132136
"com.amazonaws:aws-java-sdk-redshift": [
133137
licenseUrl: "https://raw.githubusercontent.com/aws/aws-sdk-java/master/LICENSE.txt",
134138
],

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractAwsApiTask.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2121
import com.amazonaws.auth.BasicAWSCredentials;
2222
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
23+
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
24+
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
2325
import com.amazonaws.services.redshift.AmazonRedshift;
2426
import com.amazonaws.services.redshift.AmazonRedshiftClient;
2527
import com.google.common.io.ByteSink;
@@ -41,6 +43,7 @@ public abstract class AbstractAwsApiTask extends AbstractTask<Void> {
4143
AWSCredentialsProvider credentialsProvider;
4244
Class<? extends Enum<?>> headerEnum;
4345
Optional<AmazonRedshift> redshiftClient;
46+
Optional<AmazonCloudWatch> cloudWatchClient;
4447

4548
public AbstractAwsApiTask(
4649
AWSCredentialsProvider credentialsProvider,
@@ -49,6 +52,7 @@ public AbstractAwsApiTask(
4952
super(zipEntryName);
5053
this.headerEnum = headerEnum;
5154
this.redshiftClient = Optional.empty();
55+
this.cloudWatchClient = Optional.empty();
5256
this.credentialsProvider = credentialsProvider;
5357
}
5458

@@ -58,11 +62,22 @@ public AbstractAwsApiTask withRedshiftApiClient(AmazonRedshift redshiftClient) {
5862
return this;
5963
}
6064

65+
@Nonnull
66+
public AbstractAwsApiTask withCloudWatchApiClient(AmazonCloudWatch cloudWatchClient) {
67+
this.cloudWatchClient = Optional.of(cloudWatchClient);
68+
return this;
69+
}
70+
6171
public AmazonRedshift redshiftApiClient() {
6272
return redshiftClient.orElseGet(
6373
() -> AmazonRedshiftClient.builder().withCredentials(credentialsProvider).build());
6474
}
6575

76+
public AmazonCloudWatch cloudWatchApiClient() {
77+
return cloudWatchClient.orElseGet(
78+
() -> AmazonCloudWatchClient.builder().withCredentials(credentialsProvider).build());
79+
}
80+
6681
public void writeRecordsCsv(@Nonnull ByteSink sink, List<Object[]> records) throws IOException {
6782
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
6883
try (RecordProgressMonitor monitor = new RecordProgressMonitor(getName());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
/*
2+
* Copyright 2022-2023 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
18+
19+
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static java.util.stream.Collectors.groupingBy;
21+
import static java.util.stream.Collectors.toList;
22+
import static java.util.stream.Collectors.toMap;
23+
24+
import com.amazonaws.auth.AWSCredentialsProvider;
25+
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
26+
import com.amazonaws.services.cloudwatch.model.Datapoint;
27+
import com.amazonaws.services.cloudwatch.model.Dimension;
28+
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
29+
import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
30+
import com.amazonaws.services.redshift.AmazonRedshift;
31+
import com.amazonaws.services.redshift.model.Cluster;
32+
import com.amazonaws.services.redshift.model.DescribeClustersRequest;
33+
import com.google.auto.value.AutoValue;
34+
import com.google.common.collect.ImmutableList;
35+
import com.google.common.collect.ImmutableSortedMap;
36+
import com.google.common.io.ByteSink;
37+
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
38+
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricDataPoint;
39+
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
40+
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
41+
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftRawLogsDumpFormat;
42+
import java.io.IOException;
43+
import java.time.Duration;
44+
import java.time.Instant;
45+
import java.time.ZoneOffset;
46+
import java.time.ZonedDateTime;
47+
import java.time.format.DateTimeFormatter;
48+
import java.util.Date;
49+
import java.util.List;
50+
import java.util.Map;
51+
import java.util.stream.Stream;
52+
import javax.annotation.Nonnull;
53+
54+
/** Extraction task to get Redshift time series metrics from AWS CloudWatch API. */
55+
public class RedshiftClusterUsageMetricsTask extends AbstractAwsApiTask {
56+
57+
protected static enum MetricName {
58+
CPUUtilization,
59+
PercentageDiskSpaceUsed
60+
}
61+
62+
protected static enum MetricType {
63+
Average
64+
}
65+
66+
@AutoValue
67+
protected abstract static class MetricConfig {
68+
69+
public abstract MetricName name();
70+
71+
public abstract MetricType type();
72+
73+
public static MetricConfig create(MetricName name, MetricType type) {
74+
return new AutoValue_RedshiftClusterUsageMetricsTask_MetricConfig(name, type);
75+
}
76+
}
77+
78+
@AutoValue
79+
protected abstract static class MetricDataPoint {
80+
81+
public abstract Instant instant();
82+
83+
public abstract Double value();
84+
85+
public abstract MetricConfig metricConfig();
86+
87+
public static MetricDataPoint create(Instant instant, Double value, MetricConfig metricConfig) {
88+
return new AutoValue_RedshiftClusterUsageMetricsTask_MetricDataPoint(
89+
instant, value, metricConfig);
90+
}
91+
}
92+
93+
private static final String REDSHIFT_NAMESPACE = "AWS/Redshift";
94+
private static final DateTimeFormatter DATE_FORMAT =
95+
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS").withZone(ZoneOffset.UTC);
96+
97+
private final ZonedDateTime currentTime;
98+
private final ZonedInterval interval;
99+
private final List<MetricConfig> metrics;
100+
101+
public RedshiftClusterUsageMetricsTask(
102+
AWSCredentialsProvider credentialsProvider,
103+
ZonedDateTime currentTime,
104+
ZonedInterval interval,
105+
String zipEntryName,
106+
ImmutableList<MetricConfig> metrics) {
107+
super(
108+
credentialsProvider,
109+
zipEntryName,
110+
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.Header.class);
111+
this.interval = interval;
112+
this.metrics = metrics;
113+
this.currentTime = currentTime;
114+
}
115+
116+
@Override
117+
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, Handle handle)
118+
throws IOException {
119+
writeRecordsCsv(
120+
sink,
121+
listClusters().stream()
122+
.flatMap(
123+
cluster ->
124+
getClusterMetrics(cluster.getClusterIdentifier()).entrySet().stream()
125+
.map(
126+
metricEntry ->
127+
serializeCsvRow(
128+
cluster.getClusterIdentifier(),
129+
metricEntry.getKey(),
130+
metricEntry.getValue())))
131+
.collect(toList()));
132+
return null;
133+
}
134+
135+
private Object[] serializeCsvRow(
136+
String clusterId, Instant instant, List<MetricDataPoint> dataPoints) {
137+
Map<MetricConfig, Double> values =
138+
dataPoints.stream().collect(toMap(MetricDataPoint::metricConfig, MetricDataPoint::value));
139+
return Stream.concat(
140+
Stream.of(new Object[] {clusterId, DATE_FORMAT.format(instant)}),
141+
metrics.stream().map(metric -> values.get(metric)))
142+
.toArray();
143+
}
144+
145+
private ImmutableList<Cluster> listClusters() {
146+
AmazonRedshift client = redshiftApiClient();
147+
return ImmutableList.copyOf(
148+
client.describeClusters(new DescribeClustersRequest()).getClusters());
149+
}
150+
151+
private ImmutableList<MetricDataPoint> getMetricDataPoints(
152+
String clusterId, MetricConfig metricConfig) {
153+
AmazonCloudWatch client = cloudWatchApiClient();
154+
GetMetricStatisticsRequest request =
155+
new GetMetricStatisticsRequest()
156+
.withMetricName(metricConfig.name().name())
157+
.withStatistics(metricConfig.type().name())
158+
.withNamespace(REDSHIFT_NAMESPACE)
159+
.withDimensions(new Dimension().withName("ClusterIdentifier").withValue(clusterId))
160+
.withStartTime(Date.from(interval.getStartUTC().toInstant()))
161+
.withEndTime(Date.from(interval.getEndExclusiveUTC().toInstant()))
162+
.withPeriod((int) metricDataPeriod().getSeconds());
163+
164+
GetMetricStatisticsResult result = client.getMetricStatistics(request);
165+
return result.getDatapoints().stream()
166+
.map(
167+
datapoint ->
168+
MetricDataPoint.create(
169+
datapoint.getTimestamp().toInstant(),
170+
getDatapointValue(metricConfig, datapoint),
171+
metricConfig))
172+
.collect(toImmutableList());
173+
}
174+
175+
private Double getDatapointValue(MetricConfig metricConfig, Datapoint datapoint) {
176+
switch (metricConfig.type()) {
177+
case Average:
178+
return datapoint.getAverage();
179+
default:
180+
return null;
181+
}
182+
}
183+
184+
private ImmutableSortedMap<Instant, List<MetricDataPoint>> getClusterMetrics(
185+
String clusterIdentifier) {
186+
return ImmutableSortedMap.copyOf(
187+
metrics.stream()
188+
.flatMap(metricConfig -> getMetricDataPoints(clusterIdentifier, metricConfig).stream())
189+
.collect(groupingBy(MetricDataPoint::instant)));
190+
}
191+
192+
/**
193+
* Returns available metric period based on the interval time.
194+
* https://docs.aws.amazon.com/AmazonCloudWatch/latest/monitoring/cloudwatch_concepts.html
195+
*/
196+
private Duration metricDataPeriod() {
197+
ZonedDateTime start = interval.getStartUTC();
198+
if (start.isAfter(currentTime.minusDays(14))) {
199+
return Duration.ofMinutes(1);
200+
}
201+
if (start.isAfter(currentTime.minusDays(62))) {
202+
return Duration.ofMinutes(5);
203+
}
204+
return Duration.ofHours(1);
205+
}
206+
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftRawLogsConnector.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19+
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricConfig;
20+
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricName;
21+
import static com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricType;
22+
1923
import com.google.auto.service.AutoService;
2024
import com.google.common.base.Joiner;
2125
import com.google.common.collect.ImmutableList;
@@ -33,6 +37,9 @@
3337
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
3438
import com.google.edwmigration.dumper.application.dumper.connector.ZonedIntervalIterable;
3539
import com.google.edwmigration.dumper.application.dumper.connector.ZonedIntervalIterableGenerator;
40+
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricConfig;
41+
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricName;
42+
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricType;
3643
import com.google.edwmigration.dumper.application.dumper.task.DumpMetadataTask;
3744
import com.google.edwmigration.dumper.application.dumper.task.FormatTask;
3845
import com.google.edwmigration.dumper.application.dumper.task.JdbcSelectIntervalTask;
@@ -43,6 +50,8 @@
4350
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftMetadataDumpFormat;
4451
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftRawLogsDumpFormat;
4552
import java.time.Duration;
53+
import java.time.ZoneOffset;
54+
import java.time.ZonedDateTime;
4655
import java.time.format.DateTimeFormatter;
4756
import java.util.ArrayList;
4857
import java.util.List;
@@ -193,6 +202,14 @@ public void addTasksTo(List<? super Task<?>> out, ConnectorArguments arguments)
193202
wlmQueryTemplateQuery,
194203
"service_class_start_time",
195204
parallelTask);
205+
206+
makeClusterMetricsTasks(
207+
arguments,
208+
intervals,
209+
ImmutableList.of(
210+
MetricConfig.create(MetricName.CPUUtilization, MetricType.Average),
211+
MetricConfig.create(MetricName.PercentageDiskSpaceUsed, MetricType.Average)),
212+
out);
196213
}
197214
}
198215

@@ -233,4 +250,28 @@ private void makeTasks(
233250
out.addTask(new JdbcSelectIntervalTask(file, query, interval));
234251
}
235252
}
253+
254+
/** Creates tasks to get Redshift cluster metrics from AWS CloudWatch API. */
255+
private void makeClusterMetricsTasks(
256+
ConnectorArguments arguments,
257+
ZonedIntervalIterable intervals,
258+
ImmutableList<MetricConfig> metrics,
259+
List<? super Task<?>> out) {
260+
DateTimeFormatter dateFormat =
261+
DateTimeFormatter.ofPattern("yyyy-MM-dd_HHmmss").withZone(ZoneOffset.UTC);
262+
263+
AbstractAwsApiTask.createCredentialsProvider(arguments)
264+
.ifPresent(
265+
awsCredentials -> {
266+
for (ZonedInterval interval : intervals) {
267+
String file =
268+
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.ZIP_ENTRY_PREFIX
269+
+ dateFormat.format(interval.getStartUTC())
270+
+ RedshiftRawLogsDumpFormat.ZIP_ENTRY_SUFFIX;
271+
out.add(
272+
new RedshiftClusterUsageMetricsTask(
273+
awsCredentials, ZonedDateTime.now(), interval, file, metrics));
274+
}
275+
});
276+
}
236277
}

dumper/app/src/main/resources/logback.xml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212
<logger name="org.springframework" level="info"/>
1313
<logger name="com.zaxxer.hikari" level="info"/>
1414
<logger name="org.apache.hc.client5.http" level="info"/>
15+
<logger name="org.apache.http" level="info"/>
16+
<logger name="com.amazonaws" level="info"/>
1517

1618
<root level="${LOG_LEVEL:-debug}">
1719
<appender-ref ref="CONSOLE" />

0 commit comments

Comments
 (0)