Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .run/CService.run.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
<component name="ProjectRunConfigurationManager">
<configuration default="false" name="CService" type="Application" factoryName="Application">
<option name="MAIN_CLASS_NAME" value="com.here.xyz.httpconnector.CService" />
<module name="xyz-hub-service" />
<extension name="coverage">
<pattern>
<option name="PATTERN" value="com.here.xyz.httpconnector.*" />
<option name="ENABLED" value="true" />
</pattern>
</extension>
<method v="2">
<option name="Make" enabled="true" />
</method>
</configuration>
</component>
45 changes: 39 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,28 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>${aws-sdk2-version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-annotations</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
<version>${aws-sdk2-version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-annotations</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>rds</artifactId>
Expand All @@ -384,21 +406,32 @@
</exclusion>
</exclusions>
</dependency>

<!-- AWS SDK 1.x -->
<dependency>
<artifactId>aws-java-sdk-core</artifactId>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
<version>${aws-sdk2-version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-annotations</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws-sdk2-version}</version>
<exclusions>
<exclusion>
<artifactId>jackson-annotations</artifactId>
<groupId>com.fasterxml.jackson.core</groupId>
</exclusion>
</exclusions>
<groupId>com.amazonaws</groupId>
<version>${aws-sdk-version}</version>
</dependency>

<!-- AWS SDK 1.x -->
<dependency>
<artifactId>aws-java-sdk-s3</artifactId>
<artifactId>aws-java-sdk-core</artifactId>
<exclusions>
<exclusion>
<artifactId>jackson-annotations</artifactId>
Expand Down
4 changes: 0 additions & 4 deletions xyz-connectors/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,6 @@
<artifactId>aws-java-sdk-lambda</artifactId>
<groupId>com.amazonaws</groupId>
</dependency>
<dependency>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>com.amazonaws</groupId>
</dependency>

<!-- PSQL JDBC driver -->
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,61 +16,66 @@
* SPDX-License-Identifier: Apache-2.0
* License-Filename: LICENSE
*/

package com.here.xyz.connectors;

import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.s3.AmazonS3URI;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.here.xyz.events.RelocatedEvent;
import com.here.xyz.responses.XyzError;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.net.URI;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

import com.here.xyz.responses.XyzError;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.core.sync.RequestBody;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.S3ClientBuilder;
import software.amazon.awssdk.services.s3.S3Uri;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;

@SuppressWarnings("WeakerAccess")
public class RelocationClient {

private static final Logger logger = LogManager.getLogger();

private final static String S3_PATH = "tmp/";
private AmazonS3 defaultS3Client;
private Map<String, AmazonS3> s3clients = new ConcurrentHashMap<>();
private S3Client defaultS3Client;
private Map<String, S3Client> s3clients = new ConcurrentHashMap<>();
private final String bucket;

public RelocationClient(String bucket) {
this.bucket = bucket;
}

private AmazonS3 getS3Client() {
private S3Client getS3Client() {
return getS3Client(null);
}

private AmazonS3 getS3Client(String region) {
if (region == null) {
if (defaultS3Client == null)
private S3Client getS3Client(String region) {
if (region == null || region.isEmpty()) {
if (defaultS3Client == null) {
defaultS3Client = getS3ClientBuilder()
.build();
.build();
}
return defaultS3Client;
} else {
if (!s3clients.containsKey(region)) {
S3Client client = getS3ClientBuilder()
.region(Region.of(region))
.build();
s3clients.put(region, client);
}
return s3clients.get(region);
}
if (s3clients.get(region) == null) {
s3clients.put(region, getS3ClientBuilder()
.withRegion(region)
.build());
}
return s3clients.get(region);
}

private AmazonS3ClientBuilder getS3ClientBuilder() {
return AmazonS3ClientBuilder
.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());
private S3ClientBuilder getS3ClientBuilder() {
return S3Client.builder()
.credentialsProvider(DefaultCredentialsProvider.create());
}

/**
Expand All @@ -89,13 +94,17 @@ public byte[] relocate(String streamId, byte[] bytes) {
else {
//Keep backward compatibility.
event
.withLocation(name)
.withURI(createS3Uri(bucket, S3_PATH + name))
.withRegion(System.getenv("AWS_REGION"));
.withLocation(name)
.withURI(createS3Uri(bucket, S3_PATH + name))
.withRegion(System.getenv("AWS_REGION"));
}

logger.debug("{} - Relocating data to: {}", streamId, event.getURI());
uploadToS3(new AmazonS3URI(event.getURI()), bytes);
S3Uri s3Uri = S3Uri.builder()
.bucket(bucket)
.uri(URI.create(event.getURI()))
.build();
uploadToS3(s3Uri, bytes);

return event.toString().getBytes();
}
Expand Down Expand Up @@ -123,34 +132,41 @@ public InputStream processRelocatedEvent(RelocatedEvent event, String region) th
if (event.getURI() == null && event.getLocation() != null) {
event.setURI(createS3Uri(bucket, S3_PATH + event.getLocation()));
logger.warn("{}, the RelocatedEvent returned by the connector still uses the deprecated \"location\" field."
+ "The connector should use the field \"URI\" instead.");
+ "The connector should use the field \"URI\" instead.");
}
if (event.getRegion() != null && !event.getRegion().isEmpty())
region = event.getRegion();
logger.debug("{}, Found relocation event, loading original event from '{}'", event.getStreamId(), event.getURI());

if (event.getURI().startsWith("s3://") || event.getURI().startsWith("http")) {
return downloadFromS3(new AmazonS3URI(event.getURI()), region);
S3Uri s3Uri = S3Uri.builder()
.uri(URI.create(event.getURI()))
.build();
return downloadFromS3(s3Uri, region);
}

throw new ErrorResponseException(event.getStreamId(), XyzError.ILLEGAL_ARGUMENT, "Unsupported URI type");
}

/**
* Downloads the file from S3.
*/
public InputStream downloadFromS3(AmazonS3URI amazonS3URI, String region) {
String downloadRegion = region != null ? region : amazonS3URI.getRegion();
return getS3Client(downloadRegion).getObject(amazonS3URI.getBucket(), amazonS3URI.getKey()).getObjectContent();
public InputStream downloadFromS3(S3Uri amazonS3URI, String region) {
GetObjectRequest getRequest = GetObjectRequest.builder()
.bucket(amazonS3URI.bucket().orElseThrow(() -> new IllegalStateException("Unrecognized bucket")))
.key(amazonS3URI.key().orElseThrow(() -> new IllegalStateException("Unrecognized key")))
.build();
return getS3Client(region).getObject(getRequest);
}

/**
* Uploads the data, which should be relocated to S3.
*/
private void uploadToS3(AmazonS3URI amazonS3URI, byte[] content) {
ObjectMetadata metaData = new ObjectMetadata();
metaData.setContentLength(content.length);
this.getS3Client().putObject(amazonS3URI.getBucket(), amazonS3URI.getKey(), new ByteArrayInputStream(content), metaData);
private void uploadToS3(S3Uri amazonS3URI, byte[] content) {
PutObjectRequest putRequest = PutObjectRequest.builder()
.bucket(amazonS3URI.bucket().orElseThrow(() -> new IllegalStateException("Unrecognized bucket")))
.key(amazonS3URI.key().orElseThrow(() -> new IllegalStateException("Unrecognized key")))
.build();
getS3Client(null).putObject(putRequest, RequestBody.fromBytes(content));
}

private String createS3Uri(String bucket, String key) {
Expand All @@ -166,4 +182,4 @@ private String createS3Uri(String region, String bucket, String key) {
private static final boolean runsAsConnectorWithRelocation() {
return System.getenv("S3_BUCKET") != null;
}
}
}
26 changes: 18 additions & 8 deletions xyz-hub-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,6 @@
</dependency>

<!-- AWS SDK -->
<dependency>
<artifactId>aws-java-sdk-s3</artifactId>
<groupId>com.amazonaws</groupId>
</dependency>
<dependency>
<artifactId>aws-java-sdk-lambda</artifactId>
<groupId>com.amazonaws</groupId>
Expand All @@ -283,10 +279,6 @@
<artifactId>aws-java-sdk-sns</artifactId>
<groupId>com.amazonaws</groupId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-cloudwatch</artifactId>
Expand All @@ -296,6 +288,24 @@
<artifactId>aws-java-sdk-secretsmanager</artifactId>
</dependency>

<!-- AWS SDK V2 -->
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sns</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>auth</artifactId>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>secretsmanager</artifactId>
</dependency>

<!-- For Job API only -->
<dependency>
<artifactId>aws-java-sdk-emrserverless</artifactId>
Expand Down
Loading