Skip to content

Commit 0f316b3

Browse files
authored
[b/382467945] move aws-api creatiion in handle (#992)
* moved responsibility of creating aws api to handle * moved responsibility of creating aws api to handle * moved responsibility of creating aws api to handle * created RedshiftHandle * created RedshiftHandle * style changes
1 parent 76912ad commit 0f316b3

File tree

10 files changed

+251
-198
lines changed

10 files changed

+251
-198
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/MetadataDumper.java

Lines changed: 48 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -115,66 +115,66 @@ protected boolean run(@Nonnull Connector connector) throws Exception {
115115
print(task, 1);
116116
}
117117
return true;
118-
} else {
119-
Stopwatch stopwatch = Stopwatch.createStarted();
120-
long outputFileLength = 0;
121-
TaskSetState.Impl state = new TaskSetState.Impl();
118+
}
122119

123-
logger.info("Using connector: [{}]", connector);
124-
SummaryPrinter summaryPrinter = new SummaryPrinter();
125-
boolean requiredTaskSucceeded = false;
120+
Stopwatch stopwatch = Stopwatch.createStarted();
121+
long outputFileLength = 0;
122+
TaskSetState.Impl state = new TaskSetState.Impl();
126123

127-
try (Closer closer = Closer.create()) {
128-
Path outputPath = prepareOutputPath(outputFileLocation, closer, connectorArguments);
124+
logger.info("Using connector: [{}]", connector);
125+
SummaryPrinter summaryPrinter = new SummaryPrinter();
126+
boolean requiredTaskSucceeded = false;
129127

130-
URI outputUri = URI.create("jar:" + outputPath.toUri());
128+
try (Closer closer = Closer.create()) {
129+
Path outputPath = prepareOutputPath(outputFileLocation, closer, connectorArguments);
131130

132-
Map<String, Object> fileSystemProperties =
133-
ImmutableMap.<String, Object>builder()
134-
.put("create", "true")
135-
.put("useTempFile", Boolean.TRUE)
136-
.build();
137-
FileSystem fileSystem =
138-
closer.register(FileSystems.newFileSystem(outputUri, fileSystemProperties));
139-
OutputHandleFactory sinkFactory =
140-
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
141-
logger.debug("Target filesystem is [{}]", sinkFactory);
131+
URI outputUri = URI.create("jar:" + outputPath.toUri());
142132

143-
Handle handle = closer.register(connector.open(connectorArguments));
133+
Map<String, Object> fileSystemProperties =
134+
ImmutableMap.<String, Object>builder()
135+
.put("create", "true")
136+
.put("useTempFile", Boolean.TRUE)
137+
.build();
138+
FileSystem fileSystem =
139+
closer.register(FileSystems.newFileSystem(outputUri, fileSystemProperties));
140+
OutputHandleFactory sinkFactory =
141+
new FileSystemOutputHandleFactory(fileSystem, "/"); // It's required to be "/"
142+
logger.debug("Target filesystem is [{}]", sinkFactory);
144143

145-
new TasksRunner(
146-
sinkFactory,
147-
handle,
148-
connectorArguments.getThreadPoolSize(),
149-
state,
150-
tasks,
151-
connectorArguments)
152-
.run();
144+
Handle handle = closer.register(connector.open(connectorArguments));
153145

154-
requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);
146+
new TasksRunner(
147+
sinkFactory,
148+
handle,
149+
connectorArguments.getThreadPoolSize(),
150+
state,
151+
tasks,
152+
connectorArguments)
153+
.run();
155154

156-
telemetryProcessor.addDumperRunMetricsToPayload(
157-
connectorArguments, state, stopwatch, requiredTaskSucceeded);
158-
telemetryProcessor.processTelemetry(fileSystem);
159-
} finally {
160-
// We must do this in finally after the ZipFileSystem has been closed.
161-
File outputFile = new File(outputFileLocation);
162-
if (outputFile.isFile()) {
163-
outputFileLength = outputFile.length();
164-
}
155+
requiredTaskSucceeded = checkRequiredTaskSuccess(summaryPrinter, state, outputFileLocation);
165156

166-
printTaskResults(summaryPrinter, state);
167-
logFinalSummary(
168-
summaryPrinter,
169-
state,
170-
outputFileLength,
171-
stopwatch,
172-
outputFileLocation,
173-
requiredTaskSucceeded);
157+
telemetryProcessor.addDumperRunMetricsToPayload(
158+
connectorArguments, state, stopwatch, requiredTaskSucceeded);
159+
telemetryProcessor.processTelemetry(fileSystem);
160+
} finally {
161+
// We must do this in finally after the ZipFileSystem has been closed.
162+
File outputFile = new File(outputFileLocation);
163+
if (outputFile.isFile()) {
164+
outputFileLength = outputFile.length();
174165
}
175166

176-
return requiredTaskSucceeded;
167+
printTaskResults(summaryPrinter, state);
168+
logFinalSummary(
169+
summaryPrinter,
170+
state,
171+
outputFileLength,
172+
stopwatch,
173+
outputFileLocation,
174+
requiredTaskSucceeded);
177175
}
176+
177+
return requiredTaskSucceeded;
178178
}
179179

180180
private void print(@Nonnull Task<?> task, int indent) {

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractAwsApiTask.java

Lines changed: 1 addition & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -16,67 +16,23 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.redshift;
1818

19-
import com.amazonaws.auth.AWSCredentialsProvider;
20-
import com.amazonaws.auth.AWSStaticCredentialsProvider;
21-
import com.amazonaws.auth.BasicAWSCredentials;
22-
import com.amazonaws.auth.BasicSessionCredentials;
23-
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
24-
import com.amazonaws.services.cloudwatch.AmazonCloudWatch;
25-
import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
26-
import com.amazonaws.services.redshift.AmazonRedshift;
27-
import com.amazonaws.services.redshift.AmazonRedshiftClient;
2819
import com.google.common.io.ByteSink;
29-
import com.google.edwmigration.dumper.application.dumper.ConnectorArguments;
3020
import com.google.edwmigration.dumper.application.dumper.task.AbstractTask;
3121
import com.google.edwmigration.dumper.plugin.ext.jdk.progress.RecordProgressMonitor;
3222
import java.io.IOException;
3323
import java.io.Writer;
3424
import java.nio.charset.StandardCharsets;
35-
import java.util.Optional;
36-
import javax.annotation.Nonnull;
37-
import javax.annotation.Nullable;
3825
import org.apache.commons.csv.CSVFormat;
3926
import org.apache.commons.csv.CSVPrinter;
4027

4128
/** Abstract class that provides methods for connecting with AWS API and writing results. */
4229
public abstract class AbstractAwsApiTask extends AbstractTask<Void> {
4330

44-
AWSCredentialsProvider credentialsProvider;
4531
Class<? extends Enum<?>> headerEnum;
46-
Optional<AmazonRedshift> redshiftClient;
47-
Optional<AmazonCloudWatch> cloudWatchClient;
4832

49-
public AbstractAwsApiTask(
50-
AWSCredentialsProvider credentialsProvider,
51-
String zipEntryName,
52-
Class<? extends Enum<?>> headerEnum) {
33+
public AbstractAwsApiTask(String zipEntryName, Class<? extends Enum<?>> headerEnum) {
5334
super(zipEntryName);
5435
this.headerEnum = headerEnum;
55-
this.redshiftClient = Optional.empty();
56-
this.cloudWatchClient = Optional.empty();
57-
this.credentialsProvider = credentialsProvider;
58-
}
59-
60-
@Nonnull
61-
public AbstractAwsApiTask withRedshiftApiClient(AmazonRedshift redshiftClient) {
62-
this.redshiftClient = Optional.of(redshiftClient);
63-
return this;
64-
}
65-
66-
@Nonnull
67-
public AbstractAwsApiTask withCloudWatchApiClient(AmazonCloudWatch cloudWatchClient) {
68-
this.cloudWatchClient = Optional.of(cloudWatchClient);
69-
return this;
70-
}
71-
72-
public AmazonRedshift redshiftApiClient() {
73-
return redshiftClient.orElseGet(
74-
() -> AmazonRedshiftClient.builder().withCredentials(credentialsProvider).build());
75-
}
76-
77-
public AmazonCloudWatch cloudWatchApiClient() {
78-
return cloudWatchClient.orElseGet(
79-
() -> AmazonCloudWatchClient.builder().withCredentials(credentialsProvider).build());
8036
}
8137

8238
static class CsvRecordWriter implements AutoCloseable {
@@ -102,31 +58,4 @@ public void close() throws IOException {
10258
writer.close();
10359
}
10460
}
105-
106-
public static Optional<AWSCredentialsProvider> createCredentialsProvider(
107-
ConnectorArguments arguments) {
108-
return Optional.ofNullable(doCreateProvider(arguments));
109-
}
110-
111-
@Nullable
112-
private static AWSCredentialsProvider doCreateProvider(ConnectorArguments arguments) {
113-
String profileName = arguments.getIAMProfile();
114-
if (profileName != null) {
115-
return new ProfileCredentialsProvider(profileName);
116-
}
117-
String accessKeyId = arguments.getIAMAccessKeyID();
118-
String secretAccessKey = arguments.getIAMSecretAccessKey();
119-
String sessionToken = arguments.getIamSessionToken();
120-
if (accessKeyId == null || secretAccessKey == null) {
121-
return null;
122-
}
123-
124-
if (sessionToken != null) {
125-
BasicSessionCredentials credentials =
126-
new BasicSessionCredentials(accessKeyId, secretAccessKey, sessionToken);
127-
return new AWSStaticCredentialsProvider(credentials);
128-
}
129-
BasicAWSCredentials credentials = new BasicAWSCredentials(accessKeyId, secretAccessKey);
130-
return new AWSStaticCredentialsProvider(credentials);
131-
}
13261
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/AbstractRedshiftConnector.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import com.google.edwmigration.dumper.application.dumper.annotations.RespectsInput;
2727
import com.google.edwmigration.dumper.application.dumper.connector.AbstractJdbcConnector;
2828
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
29-
import com.google.edwmigration.dumper.application.dumper.handle.JdbcHandle;
29+
import com.google.edwmigration.dumper.application.dumper.handle.RedshiftHandle;
3030
import java.sql.Driver;
3131
import java.sql.SQLException;
3232
import java.time.ZoneOffset;
@@ -172,7 +172,7 @@ public Handle open(@Nonnull ConnectorArguments arguments) throws Exception {
172172
DataSource dataSource =
173173
new SimpleDriverDataSource(driver, url, arguments.getUser(), password.orElse(null));
174174

175-
return JdbcHandle.newPooledJdbcHandle(dataSource, arguments.getThreadPoolSize());
175+
return new RedshiftHandle(dataSource, arguments);
176176
}
177177

178178
private static void logDriverInfo(@Nonnull Driver driver) throws SQLException {

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/redshift/RedshiftClusterNodesTask.java

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,13 @@
1818

1919
import static com.google.edwmigration.dumper.application.dumper.SummaryPrinter.joinSummaryDoubleLine;
2020

21-
import com.amazonaws.auth.AWSCredentialsProvider;
2221
import com.amazonaws.services.redshift.AmazonRedshift;
2322
import com.amazonaws.services.redshift.model.Cluster;
2423
import com.amazonaws.services.redshift.model.DescribeClustersRequest;
2524
import com.amazonaws.services.redshift.model.DescribeClustersResult;
2625
import com.google.common.io.ByteSink;
27-
import com.google.edwmigration.dumper.application.dumper.connector.redshift.AbstractAwsApiTask.CsvRecordWriter;
2826
import com.google.edwmigration.dumper.application.dumper.handle.Handle;
27+
import com.google.edwmigration.dumper.application.dumper.handle.RedshiftHandle;
2928
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
3029
import com.google.edwmigration.dumper.plugin.lib.dumper.spi.RedshiftMetadataDumpFormat.ClusterNodes;
3130
import java.io.IOException;
@@ -35,29 +34,46 @@
3534
/** Extraction task to get information about Redshift Cluster nodes from AWS API. */
3635
public class RedshiftClusterNodesTask extends AbstractAwsApiTask {
3736

38-
public RedshiftClusterNodesTask(AWSCredentialsProvider credentialsProvider) {
39-
super(credentialsProvider, ClusterNodes.ZIP_ENTRY_NAME, ClusterNodes.Header.class);
37+
public RedshiftClusterNodesTask() {
38+
super(ClusterNodes.ZIP_ENTRY_NAME, ClusterNodes.Header.class);
4039
}
4140

4241
@Override
4342
protected Void doRun(TaskRunContext context, @Nonnull ByteSink sink, Handle handle)
4443
throws IOException {
45-
AmazonRedshift client = redshiftApiClient();
46-
DescribeClustersRequest request = new DescribeClustersRequest();
47-
DescribeClustersResult result = client.describeClusters(request);
4844

45+
RedshiftHandle redshiftHandle = (RedshiftHandle) handle;
46+
// customer did not provide aws credentials;
47+
if (!redshiftHandle.getRedshiftClient().isPresent()) {
48+
return null;
49+
}
50+
51+
AmazonRedshift redshiftClient = redshiftHandle.getRedshiftClient().get();
52+
53+
dumpClusterNodes(redshiftClient, sink);
54+
return null;
55+
}
56+
57+
private void dumpClusterNodes(AmazonRedshift redshiftClient, @Nonnull ByteSink sink)
58+
throws IOException {
4959
CSVFormat format = FORMAT.builder().setHeader(headerEnum).build();
5060
try (CsvRecordWriter writer = new CsvRecordWriter(sink, format, getName())) {
51-
for (Cluster item : result.getClusters()) {
52-
writer.handleRecord(
53-
item.getClusterIdentifier(),
54-
item.getEndpoint() != null ? item.getEndpoint().getAddress() : "",
55-
item.getNumberOfNodes(),
56-
item.getNodeType(),
57-
item.getTotalStorageCapacityInMegaBytes());
58-
}
61+
DescribeClustersRequest request = new DescribeClustersRequest();
62+
String marker = null;
63+
do {
64+
request.setMarker(marker);
65+
DescribeClustersResult result = redshiftClient.describeClusters(request);
66+
for (Cluster item : result.getClusters()) {
67+
writer.handleRecord(
68+
item.getClusterIdentifier(),
69+
item.getEndpoint() != null ? item.getEndpoint().getAddress() : "",
70+
item.getNumberOfNodes(),
71+
item.getNodeType(),
72+
item.getTotalStorageCapacityInMegaBytes());
73+
}
74+
marker = result.getMarker();
75+
} while (marker != null);
5976
}
60-
return null;
6177
}
6278

6379
private String toCallDescription() {

0 commit comments

Comments
 (0)