Skip to content

Commit 4c2de13

Browse files
committed
created RedshiftHandle
1 parent 5498183 commit 4c2de13

File tree

10 files changed

+235
-122
lines changed

10 files changed

+235
-122
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractAwsApiTask.java

Lines changed: 1 addition & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import com.amazonaws.auth.BasicAWSCredentials;
2222
import com.amazonaws.auth.BasicSessionCredentials;
2323
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
24-
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
25-
import com.amazonaws.services.redshift.AmazonRedshift;
2624
import com.google.common.io.ByteSink;
2725
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
2826
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
@@ -39,27 +37,12 @@
3937
public abstract class AbstractAwsApiTask extends AbstractTask<Void> {
4038

4139
Class<? extends Enum<?>> headerEnum;
42-
AmazonRedshift redshiftClient;
43-
AmazonCloudWatch cloudWatchClient;
4440

45-
public AbstractAwsApiTask(
46-
AmazonRedshift redshiftClient, String zipEntryName, Class<? extends Enum<?>> headerEnum) {
41+
public AbstractAwsApiTask(String zipEntryName, Class<? extends Enum<?>> headerEnum) {
4742
super(zipEntryName);
48-
this.redshiftClient = redshiftClient;
4943
this.headerEnum = headerEnum;
5044
}
5145

52-
public AbstractAwsApiTask(
53-
AmazonRedshift redshiftClient,
54-
AmazonCloudWatch amazonCloudWatch,
55-
String zipEntryName,
56-
Class<? extends Enum<?>> headerEnum) {
57-
super(zipEntryName);
58-
this.headerEnum = headerEnum;
59-
this.redshiftClient = redshiftClient;
60-
this.cloudWatchClient = amazonCloudWatch;
61-
}
62-
6346
static class CsvRecordWriter implements AutoCloseable {
6447
private final CSVPrinter printer;
6548
private final RecordProgressMonitor monitor;

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractRedshiftConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
2727
import com.google.edwmigration.dumper.application.dumper.connector.AbstractJdbcConnector;
2828
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
29-
import com.google.edwmigration.dumper.application.dumper.handle.JdbcHandle;
29+
import com.google.edwmigration.dumper.application.dumper.handle.RedshiftHandle;
3030
import java.sql.Driver;
3131
import java.sql.SQLException;
3232
import java.time.ZoneOffset;
@@ -172,7 +172,7 @@ public Handle open(@Nonnull ConnectorArguments arguments) throws Exception {
172172
DataSource dataSource =
173173
new SimpleDriverDataSource(driver, url, arguments.getUser(), password.orElse(null));
174174

175-
return JdbcHandle.newPooledJdbcHandle(dataSource, arguments.getThreadPoolSize());
175+
return RedshiftHandle.getInstance(dataSource, arguments);
176176
}
177177

178178
private static void logDriverInfo(@Nonnull Driver driver) throws SQLException {

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterNodesTask.java

Lines changed: 35 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.amazonaws.services.redshift.model.DescribeClustersResult;
2525
import com.google.common.io.ByteSink;
2626
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
27+
import com.google.edwmigration.dumper.application.dumper.handle.RedshiftHandle;
2728
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2829
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftMetadataDumpFormat.ClusterNodes;
2930
import java.io.IOException;
@@ -33,28 +34,49 @@
3334
/** Extraction task to get information about Redshift Cluster nodes from AWS API. */
3435
public class RedshiftClusterNodesTask extends AbstractAwsApiTask {
3536

36-
public RedshiftClusterNodesTask(AmazonRedshift amazonRedshift) {
37-
super(amazonRedshift, ClusterNodes.ZIP_ENTRY_NAME, ClusterNodes.Header.class);
37+
public RedshiftClusterNodesTask() {
38+
super(ClusterNodes.ZIP_ENTRY_NAME, ClusterNodes.Header.class);
3839
}
3940

4041
@Override
4142
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, Handle handle)
4243
throws IOException {
43-
DescribeClustersRequest request = new DescribeClustersRequest();
44-
DescribeClustersResult result = redshiftClient.describeClusters(request);
44+
if (!(handle instanceof RedshiftHandle)) {
45+
throw new IllegalStateException("Redshift handle was expected but got " + handle);
46+
}
47+
RedshiftHandle redshiftHandle = (RedshiftHandle) handle;
48+
49+
// If the user did not provide AWS credentials, the client will not be present.
50+
// In this case, we can simply skip the execution of this task.
51+
if (!redshiftHandle.getRedshiftClient().isPresent()) {
52+
return null;
53+
}
54+
AmazonRedshift redshiftClient = redshiftHandle.getRedshiftClient().get();
55+
56+
dumpClusterNodes(redshiftClient, sink);
57+
return null;
58+
}
4559

60+
private void dumpClusterNodes(AmazonRedshift redshiftClient, @Nonnull ByteSink sink)
61+
throws IOException {
4662
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
4763
try (CsvRecordWriter writer = new CsvRecordWriter(sink, format, getName())) {
48-
for (Cluster item : result.getClusters()) {
49-
writer.handleRecord(
50-
item.getClusterIdentifier(),
51-
item.getEndpoint() != null ? item.getEndpoint().getAddress() : "",
52-
item.getNumberOfNodes(),
53-
item.getNodeType(),
54-
item.getTotalStorageCapacityInMegaBytes());
55-
}
64+
DescribeClustersRequest request = new DescribeClustersRequest();
65+
String marker = null;
66+
do {
67+
request.setMarker(marker);
68+
DescribeClustersResult result = redshiftClient.describeClusters(request);
69+
for (Cluster item : result.getClusters()) {
70+
writer.handleRecord(
71+
item.getClusterIdentifier(),
72+
item.getEndpoint() != null ? item.getEndpoint().getAddress() : "",
73+
item.getNumberOfNodes(),
74+
item.getNodeType(),
75+
item.getTotalStorageCapacityInMegaBytes());
76+
}
77+
marker = result.getMarker();
78+
} while (marker != null);
5679
}
57-
return null;
5880
}
5981

6082
private String toCallDescription() {

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterUsageMetricsTask.java

Lines changed: 32 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@
3131
import com.google.common.collect.ImmutableList;
3232
import com.google.common.io.ByteSink;
3333
import com.google.edwmigration.dumper.application.dumper.connector.ZonedInterval;
34-
import com.google.edwmigration.dumper.application.dumper.connector.redshift.RedshiftClusterUsageMetricsTask.MetricDataPoint;
3534
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
35+
import com.google.edwmigration.dumper.application.dumper.handle.RedshiftHandle;
3636
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
3737
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftRawLogsDumpFormat;
3838
import java.io.IOException;
@@ -49,7 +49,6 @@
4949

5050
/** Extraction task to get Redshift time series metrics from AWS CloudWatch API. */
5151
public class RedshiftClusterUsageMetricsTask extends AbstractAwsApiTask {
52-
5352
protected static enum MetricName {
5453
CPUUtilization,
5554
PercentageDiskSpaceUsed
@@ -73,7 +72,6 @@ public static MetricConfig create(MetricName name, MetricType type) {
7372

7473
@AutoValue
7574
protected abstract static class MetricDataPoint {
76-
7775
public abstract Instant instant();
7876

7977
public abstract Double value();
@@ -95,16 +93,8 @@ public static MetricDataPoint create(Instant instant, Double value, MetricConfig
9593
private final String zipEntryName;
9694

9795
public RedshiftClusterUsageMetricsTask(
98-
AmazonRedshift redshiftClient,
99-
AmazonCloudWatch amazonCloudWatch,
100-
ZonedDateTime currentTime,
101-
ZonedInterval interval,
102-
String zipEntryName) {
103-
super(
104-
redshiftClient,
105-
amazonCloudWatch,
106-
zipEntryName,
107-
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.Header.class);
96+
ZonedDateTime currentTime, ZonedInterval interval, String zipEntryName) {
97+
super(zipEntryName, RedshiftRawLogsDumpFormat.ClusterUsageMetrics.Header.class);
10898
this.interval = interval;
10999
this.currentTime = currentTime;
110100
this.zipEntryName = zipEntryName;
@@ -114,26 +104,44 @@ public RedshiftClusterUsageMetricsTask(
114104
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, @Nonnull Handle handle)
115105
throws IOException {
116106
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
107+
RedshiftHandle redshiftHandle;
108+
if (handle instanceof RedshiftHandle) {
109+
redshiftHandle = (RedshiftHandle) handle;
110+
} else {
111+
throw new IllegalStateException("Redshift handle was expected but got " + handle);
112+
}
113+
114+
// customer did not provide aws credentials;
115+
if (!redshiftHandle.getRedshiftClient().isPresent()
116+
|| !redshiftHandle.getCloudWatchClient().isPresent()) {
117+
return null;
118+
}
119+
120+
AmazonRedshift redshiftClient = redshiftHandle.getRedshiftClient().get();
121+
117122
try (CsvRecordWriter writer = new CsvRecordWriter(sink, format, getName())) {
118123
List<Cluster> clusters =
119124
redshiftClient.describeClusters(new DescribeClustersRequest()).getClusters();
120125
for (Cluster item : clusters) {
121-
writeCluster(writer, item.getClusterIdentifier());
126+
writeCluster(
127+
writer, item.getClusterIdentifier(), redshiftHandle.getCloudWatchClient().get());
122128
}
123129
}
124130
return null;
125131
}
126132

127-
private void writeCluster(CsvRecordWriter writer, String clusterId) throws IOException {
133+
private void writeCluster(
134+
CsvRecordWriter writer, String clusterId, AmazonCloudWatch cloudWatchClient)
135+
throws IOException {
128136
TreeMap<Instant, String> cpuPoints = new TreeMap<>();
129137
TreeMap<Instant, String> diskPoints = new TreeMap<>();
130138

131-
for (MetricDataPoint dataPoint : getCpuMetrics(clusterId)) {
139+
for (MetricDataPoint dataPoint : getCpuMetrics(clusterId, cloudWatchClient)) {
132140
cpuPoints.put(dataPoint.instant(), dataPoint.value().toString());
133141
diskPoints.putIfAbsent(dataPoint.instant(), "");
134142
}
135143

136-
for (MetricDataPoint dataPoint : getDiskMetrics(clusterId)) {
144+
for (MetricDataPoint dataPoint : getDiskMetrics(clusterId, cloudWatchClient)) {
137145
cpuPoints.putIfAbsent(dataPoint.instant(), "");
138146
diskPoints.put(dataPoint.instant(), dataPoint.value().toString());
139147
}
@@ -146,7 +154,7 @@ private void writeCluster(CsvRecordWriter writer, String clusterId) throws IOExc
146154
}
147155

148156
private ImmutableList<MetricDataPoint> getMetricDataPoints(
149-
String clusterId, MetricConfig metricConfig) {
157+
String clusterId, MetricConfig metricConfig, AmazonCloudWatch cloudWatchClient) {
150158
GetMetricStatisticsRequest request =
151159
new GetMetricStatisticsRequest()
152160
.withMetricName(metricConfig.name().name())
@@ -177,15 +185,17 @@ private Double getDatapointValue(MetricConfig metricConfig, Datapoint datapoint)
177185
}
178186
}
179187

180-
private ImmutableList<MetricDataPoint> getDiskMetrics(String clusterId) {
188+
private ImmutableList<MetricDataPoint> getDiskMetrics(
189+
String clusterId, AmazonCloudWatch cloudWatchClient) {
181190
MetricConfig config =
182191
MetricConfig.create(MetricName.PercentageDiskSpaceUsed, MetricType.Average);
183-
return getMetricDataPoints(clusterId, config);
192+
return getMetricDataPoints(clusterId, config, cloudWatchClient);
184193
}
185194

186-
private ImmutableList<MetricDataPoint> getCpuMetrics(String clusterId) {
195+
private ImmutableList<MetricDataPoint> getCpuMetrics(
196+
String clusterId, AmazonCloudWatch cloudWatchClient) {
187197
MetricConfig config = MetricConfig.create(MetricName.CPUUtilization, MetricType.Average);
188-
return getMetricDataPoints(clusterId, config);
198+
return getMetricDataPoints(clusterId, config, cloudWatchClient);
189199
}
190200

191201
/**

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftMetadataConnector.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19-
import com.amazonaws.auth.AWSCredentialsProvider;
20-
import com.amazonaws.services.redshift.AmazonRedshift;
21-
import com.amazonaws.services.redshift.AmazonRedshiftClient;
2219
import com.google.auto.service.AutoService;
2320
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
2421
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
@@ -32,7 +29,6 @@
3229
import com.google.edwmigration.dumper.plugin.ext.jdk.annotation.Description;
3330
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftMetadataDumpFormat;
3431
import java.util.List;
35-
import java.util.Optional;
3632
import javax.annotation.Nonnull;
3733

3834
/** @author shevek */
@@ -104,10 +100,6 @@ public void addTasksTo(List<? super Task<?>> out, ConnectorArguments arguments)
104100

105101
ParallelTaskGroup.Builder parallelTask = new ParallelTaskGroup.Builder(this.getName());
106102

107-
// AWS API tasks, enabled by default if IAM credentials are provided
108-
Optional<AWSCredentialsProvider> awsCredentials =
109-
RedshiftUrlUtil.createCredentialsProvider(arguments);
110-
111103
parallelTask.addTask(
112104
new JdbcSelectTask(SvvColumnsFormat.ZIP_ENTRY_NAME, "SELECT * FROM SVV_COLUMNS"));
113105
selStar(parallelTask, "SVV_TABLES");
@@ -199,12 +191,6 @@ public void addTasksTo(List<? super Task<?>> out, ConnectorArguments arguments)
199191
out.add(new DumpMetadataTask(arguments, FORMAT_NAME));
200192
out.add(new FormatTask(FORMAT_NAME));
201193
out.add(new RedshiftEnvironmentYamlTask());
202-
203-
awsCredentials.ifPresent(
204-
awsCreds -> {
205-
AmazonRedshift redshiftClient =
206-
AmazonRedshiftClient.builder().withCredentials(awsCreds).build();
207-
out.add(new RedshiftClusterNodesTask(redshiftClient));
208-
});
194+
out.add(new RedshiftClusterNodesTask());
209195
}
210196
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftRawLogsConnector.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,6 @@
1818

1919
import static com.google.edwmigration.dumper.application.dumper.utils.ArchiveNameUtil.getEntryFileNameWithTimestamp;
2020

21-
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
22-
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
23-
import com.amazonaws.services.redshift.AmazonRedshift;
24-
import com.amazonaws.services.redshift.AmazonRedshiftClient;
2521
import com.google.auto.service.AutoService;
2622
import com.google.common.base.Joiner;
2723
import com.google.common.collect.ImmutableList;
@@ -247,17 +243,11 @@ private void makeClusterMetricsTasks(
247243
AbstractAwsApiTask.createCredentialsProvider(arguments)
248244
.ifPresent(
249245
awsCredentials -> {
250-
AmazonCloudWatch amazonCloudWatch =
251-
AmazonCloudWatchClient.builder().withCredentials(awsCredentials).build();
252-
AmazonRedshift redshiftClient =
253-
AmazonRedshiftClient.builder().withCredentials(awsCredentials).build();
254246
for (ZonedInterval interval : intervals) {
255247
String file =
256248
getEntryFileNameWithTimestamp(
257249
RedshiftRawLogsDumpFormat.ClusterUsageMetrics.ZIP_ENTRY_PREFIX, interval);
258-
out.add(
259-
new RedshiftClusterUsageMetricsTask(
260-
redshiftClient, amazonCloudWatch, ZonedDateTime.now(), interval, file));
250+
out.add(new RedshiftClusterUsageMetricsTask(ZonedDateTime.now(), interval, file));
261251
}
262252
});
263253
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftUrlUtil.java

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,10 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19-
import com.amazonaws.auth.AWSCredentialsProvider;
20-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
21-
import com.amazonaws.auth.BasicAWSCredentials;
22-
import com.amazonaws.auth.BasicSessionCredentials;
23-
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
2419
import com.google.common.collect.Iterables;
2520
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
2621
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
2722
import java.io.UnsupportedEncodingException;
28-
import java.util.Optional;
2923
import javax.annotation.Nonnull;
3024
import javax.annotation.ParametersAreNonnullByDefault;
3125

@@ -110,26 +104,4 @@ private static String makeIamProperties(ConnectorArguments arguments)
110104
return "";
111105
}
112106
}
113-
114-
public static Optional<AWSCredentialsProvider> createCredentialsProvider(
115-
ConnectorArguments arguments) {
116-
String profileName = arguments.getIAMProfile();
117-
if (profileName != null) {
118-
return Optional.of(new ProfileCredentialsProvider(profileName));
119-
}
120-
String accessKeyId = arguments.getIAMAccessKeyID();
121-
String secretAccessKey = arguments.getIAMSecretAccessKey();
122-
String sessionToken = arguments.getIamSessionToken();
123-
if (accessKeyId == null || secretAccessKey == null) {
124-
return Optional.empty();
125-
}
126-
127-
if (sessionToken != null) {
128-
BasicSessionCredentials credentials =
129-
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
130-
return Optional.of(new AWSStaticCredentialsProvider(credentials));
131-
}
132-
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
133-
return Optional.of(new AWSStaticCredentialsProvider(credentials));
134-
}
135107
}

0 commit comments

Comments
 (0)