diff --git a/.run/CService.run.xml b/.run/CService.run.xml new file mode 100644 index 0000000000..7edf734a7a --- /dev/null +++ b/.run/CService.run.xml @@ -0,0 +1,15 @@ + + + + \ No newline at end of file diff --git a/pom.xml b/pom.xml index f627ff07eb..dd60fd5d2d 100644 --- a/pom.xml +++ b/pom.xml @@ -373,6 +373,28 @@ + + software.amazon.awssdk + s3 + ${aws-sdk2-version} + + + jackson-annotations + com.fasterxml.jackson.core + + + + + software.amazon.awssdk + auth + ${aws-sdk2-version} + + + jackson-annotations + com.fasterxml.jackson.core + + + software.amazon.awssdk rds @@ -384,21 +406,32 @@ - - - aws-java-sdk-core + software.amazon.awssdk + secretsmanager + ${aws-sdk2-version} + + + jackson-annotations + com.fasterxml.jackson.core + + + + + software.amazon.awssdk + apache-client + ${aws-sdk2-version} jackson-annotations com.fasterxml.jackson.core - com.amazonaws - ${aws-sdk-version} + + - aws-java-sdk-s3 + aws-java-sdk-core jackson-annotations diff --git a/xyz-connectors/pom.xml b/xyz-connectors/pom.xml index 2ebce34e9e..d5a02a2e85 100644 --- a/xyz-connectors/pom.xml +++ b/xyz-connectors/pom.xml @@ -62,10 +62,6 @@ aws-java-sdk-lambda com.amazonaws - - aws-java-sdk-s3 - com.amazonaws - diff --git a/xyz-connectors/src/main/java/com/here/xyz/connectors/RelocationClient.java b/xyz-connectors/src/main/java/com/here/xyz/connectors/RelocationClient.java index defd455e9b..c08517b5fc 100644 --- a/xyz-connectors/src/main/java/com/here/xyz/connectors/RelocationClient.java +++ b/xyz-connectors/src/main/java/com/here/xyz/connectors/RelocationClient.java @@ -16,23 +16,26 @@ * SPDX-License-Identifier: Apache-2.0 * License-Filename: LICENSE */ - package com.here.xyz.connectors; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.AmazonS3URI; -import com.amazonaws.services.s3.model.ObjectMetadata; import com.here.xyz.events.RelocatedEvent; -import com.here.xyz.responses.XyzError; -import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.net.URI; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; + +import com.here.xyz.responses.XyzError; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Uri; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; @SuppressWarnings("WeakerAccess") public class RelocationClient { @@ -40,37 +43,39 @@ public class RelocationClient { private static final Logger logger = LogManager.getLogger(); private final static String S3_PATH = "tmp/"; - private AmazonS3 defaultS3Client; - private Map s3clients = new ConcurrentHashMap<>(); + private S3Client defaultS3Client; + private Map s3clients = new ConcurrentHashMap<>(); private final String bucket; public RelocationClient(String bucket) { this.bucket = bucket; } - private AmazonS3 getS3Client() { + private S3Client getS3Client() { return getS3Client(null); } - private AmazonS3 getS3Client(String region) { - if (region == null) { - if (defaultS3Client == null) + private S3Client getS3Client(String region) { + if (region == null || region.isEmpty()) { + if (defaultS3Client == null) { defaultS3Client = getS3ClientBuilder() - .build(); + .build(); + } return defaultS3Client; + } else { + if (!s3clients.containsKey(region)) { + S3Client client = getS3ClientBuilder() + .region(Region.of(region)) + .build(); + s3clients.put(region, client); + } + return s3clients.get(region); } - if (s3clients.get(region) == null) { - s3clients.put(region, getS3ClientBuilder() - .withRegion(region) - .build()); - } - return s3clients.get(region); } - private AmazonS3ClientBuilder getS3ClientBuilder() { - return AmazonS3ClientBuilder - .standard() - .withCredentials(new DefaultAWSCredentialsProviderChain()); + private S3ClientBuilder getS3ClientBuilder() { + return S3Client.builder() + .credentialsProvider(DefaultCredentialsProvider.create()); } /** @@ -89,13 +94,17 @@ public byte[] relocate(String streamId, byte[] bytes) { else { //Keep backward compatibility. event - .withLocation(name) - .withURI(createS3Uri(bucket, S3_PATH + name)) - .withRegion(System.getenv("AWS_REGION")); + .withLocation(name) + .withURI(createS3Uri(bucket, S3_PATH + name)) + .withRegion(System.getenv("AWS_REGION")); } logger.debug("{} - Relocating data to: {}", streamId, event.getURI()); - uploadToS3(new AmazonS3URI(event.getURI()), bytes); + S3Uri s3Uri = S3Uri.builder() + .bucket(bucket) + .uri(URI.create(event.getURI())) + .build(); + uploadToS3(s3Uri, bytes); return event.toString().getBytes(); } @@ -123,34 +132,41 @@ public InputStream processRelocatedEvent(RelocatedEvent event, String region) th if (event.getURI() == null && event.getLocation() != null) { event.setURI(createS3Uri(bucket, S3_PATH + event.getLocation())); logger.warn("{}, the RelocatedEvent returned by the connector still uses the deprecated \"location\" field." - + "The connector should use the field \"URI\" instead."); + + "The connector should use the field \"URI\" instead."); } if (event.getRegion() != null && !event.getRegion().isEmpty()) region = event.getRegion(); logger.debug("{}, Found relocation event, loading original event from '{}'", event.getStreamId(), event.getURI()); if (event.getURI().startsWith("s3://") || event.getURI().startsWith("http")) { - return downloadFromS3(new AmazonS3URI(event.getURI()), region); + S3Uri s3Uri = S3Uri.builder() + .uri(URI.create(event.getURI())) + .build(); + return downloadFromS3(s3Uri, region); } - throw new ErrorResponseException(event.getStreamId(), XyzError.ILLEGAL_ARGUMENT, "Unsupported URI type"); } /** * Downloads the file from S3. */ - public InputStream downloadFromS3(AmazonS3URI amazonS3URI, String region) { - String downloadRegion = region != null ? region : amazonS3URI.getRegion(); - return getS3Client(downloadRegion).getObject(amazonS3URI.getBucket(), amazonS3URI.getKey()).getObjectContent(); + public InputStream downloadFromS3(S3Uri amazonS3URI, String region) { + GetObjectRequest getRequest = GetObjectRequest.builder() + .bucket(amazonS3URI.bucket().orElseThrow(() -> new IllegalStateException("Unrecognized bucket"))) + .key(amazonS3URI.key().orElseThrow(() -> new IllegalStateException("Unrecognized key"))) + .build(); + return getS3Client(region).getObject(getRequest); } /** * Uploads the data, which should be relocated to S3. */ - private void uploadToS3(AmazonS3URI amazonS3URI, byte[] content) { - ObjectMetadata metaData = new ObjectMetadata(); - metaData.setContentLength(content.length); - this.getS3Client().putObject(amazonS3URI.getBucket(), amazonS3URI.getKey(), new ByteArrayInputStream(content), metaData); + private void uploadToS3(S3Uri amazonS3URI, byte[] content) { + PutObjectRequest putRequest = PutObjectRequest.builder() + .bucket(amazonS3URI.bucket().orElseThrow(() -> new IllegalStateException("Unrecognized bucket"))) + .key(amazonS3URI.key().orElseThrow(() -> new IllegalStateException("Unrecognized key"))) + .build(); + getS3Client(null).putObject(putRequest, RequestBody.fromBytes(content)); } private String createS3Uri(String bucket, String key) { @@ -166,4 +182,4 @@ private String createS3Uri(String region, String bucket, String key) { private static final boolean runsAsConnectorWithRelocation() { return System.getenv("S3_BUCKET") != null; } -} +} \ No newline at end of file diff --git a/xyz-hub-service/pom.xml b/xyz-hub-service/pom.xml index 87d9ea8b00..7097139b59 100644 --- a/xyz-hub-service/pom.xml +++ b/xyz-hub-service/pom.xml @@ -263,10 +263,6 @@ - - aws-java-sdk-s3 - com.amazonaws - aws-java-sdk-lambda com.amazonaws @@ -283,10 +279,6 @@ aws-java-sdk-sns com.amazonaws - - software.amazon.awssdk - sns - com.amazonaws aws-java-sdk-cloudwatch @@ -296,6 +288,24 @@ aws-java-sdk-secretsmanager + + + software.amazon.awssdk + sns + + + software.amazon.awssdk + s3 + + + software.amazon.awssdk + auth + + + software.amazon.awssdk + secretsmanager + + aws-java-sdk-emrserverless diff --git a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/AwsS3Client.java b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/AwsS3Client.java index 1f5cd09b7f..73f68d2bfb 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/AwsS3Client.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/AwsS3Client.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2023 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,113 +16,143 @@ * SPDX-License-Identifier: Apache-2.0 * License-Filename: LICENSE */ - package com.here.xyz.httpconnector.config; -import com.amazonaws.HttpMethod; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.S3ObjectSummary; +import com.here.xyz.jobs.util.S3ClientHelper; +import com.here.xyz.util.service.aws.S3ObjectSummary; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; + import com.here.xyz.httpconnector.CService; import com.here.xyz.util.service.aws.SecretManagerCredentialsProvider; -import java.net.URL; -import java.util.Date; -import java.util.LinkedList; -import java.util.List; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import java.net.URL; +import java.net.URI; +import java.time.Duration; +import java.util.List; +import java.util.stream.Collectors; /** * A client for reading and writing from and to S3 */ public class AwsS3Client { private static final Logger logger = LogManager.getLogger(); - protected static final int PRESIGNED_URL_EXPIRATION_SECONDS = 7 * 24 * 60 * 60; + protected static final Duration PRESIGNED_URL_EXPIRATION = Duration.ofDays(7); - protected final AmazonS3 client; + protected final S3Client client; + protected final S3Presigner presigner; public AwsS3Client() { - final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); - - if (isLocal()) { - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - CService.configuration.LOCALSTACK_ENDPOINT, CService.configuration.JOBS_REGION)) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("localstack", "localstack"))) - .withPathStyleAccessEnabled(true); - } - else { - final String region = CService.configuration != null ? CService.configuration.JOBS_REGION : "eu-west-1"; - builder.setRegion(region); - } - - if (CService.configuration != null && CService.configuration.JOB_BOT_SECRET_ARN != null) { - synchronized (AwsS3Client.class) { - builder.setCredentials(new SecretManagerCredentialsProvider(CService.configuration.JOBS_REGION, - CService.configuration.LOCALSTACK_ENDPOINT, CService.configuration.JOB_BOT_SECRET_ARN)); + S3ClientBuilder builder = S3Client.builder(); + + AwsCredentialsProvider credentialsProvider; + Region region = Region.of(CService.configuration != null ? CService.configuration.JOBS_REGION : "eu-west-1"); + + boolean local = isLocal(); + + if (local) { + builder.endpointOverride(URI.create(CService.configuration.LOCALSTACK_ENDPOINT)) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()); + credentialsProvider = StaticCredentialsProvider.create( + AwsBasicCredentials.create("localstack", "localstack")); + } else { + credentialsProvider = DefaultCredentialsProvider.create(); + + if (CService.configuration != null && CService.configuration.JOB_BOT_SECRET_ARN != null) { + synchronized (AwsS3Client.class) { + credentialsProvider = new SecretManagerCredentialsProvider( + region.toString(), + CService.configuration.LOCALSTACK_ENDPOINT, + CService.configuration.JOB_BOT_SECRET_ARN); + } } } + + builder.region(region); + builder.credentialsProvider(credentialsProvider); client = builder.build(); + + S3Presigner.Builder presignerBuilder = S3Presigner.builder() + .region(region) + .credentialsProvider(credentialsProvider); + + if (local) { + presignerBuilder + .endpointOverride(URI.create(CService.configuration.LOCALSTACK_ENDPOINT)) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()); + } + + presigner = presignerBuilder.build(); } public URL generateDownloadURL(String bucketName, String key) { - return generatePresignedUrl(bucketName, key, HttpMethod.GET); + return S3ClientHelper.generateDownloadURL(presigner, bucketName, key, PRESIGNED_URL_EXPIRATION); } public URL generateUploadURL(String bucketName, String key) { - return generatePresignedUrl(bucketName, key, HttpMethod.PUT); + return S3ClientHelper.generateUploadURL(presigner, bucketName, key, PRESIGNED_URL_EXPIRATION); } - public URL generatePresignedUrl(String bucketName, String key, HttpMethod method) { - GeneratePresignedUrlRequest generatePresignedUrlRequest = - new GeneratePresignedUrlRequest(bucketName, key) - .withMethod(method) - .withExpiration(new Date(System.currentTimeMillis() + PRESIGNED_URL_EXPIRATION_SECONDS * 1000)); + public void deleteS3Folder(String bucketName, String folderPath) { + try { + List objectsToDelete = scanFolder(bucketName, folderPath); - return client.generatePresignedUrl(generatePresignedUrlRequest); - } + if (!objectsToDelete.isEmpty()) { + List toDelete = objectsToDelete.stream() + .map(obj -> ObjectIdentifier.builder().key(obj.key()).build()) + .collect(Collectors.toList()); - public void deleteS3Folder(String bucketName, String folderPath) { - for (S3ObjectSummary file : client.listObjects(bucketName, folderPath).getObjectSummaries()){ - client.deleteObject(bucketName, file.getKey()); + DeleteObjectsRequest deleteObjectsRequest = DeleteObjectsRequest.builder() + .bucket(bucketName) + .delete(Delete.builder().objects(toDelete).build()) + .build(); + + client.deleteObjects(deleteObjectsRequest); + } + } catch (Exception e) { + logger.error("Failed to delete folder '{}' in bucket '{}': {}", folderPath, bucketName, e.getMessage(), e); } } public void copyFolder(String bucketName, String sourceFolderPath, String targetFolderPath) { - for (S3ObjectSummary summary : scanFolder(bucketName, sourceFolderPath)) { - String objectPath = summary.getKey(); - String targetObjectPath = objectPath.replace(sourceFolderPath, targetFolderPath); - client.copyObject(bucketName, objectPath, bucketName, targetObjectPath); - } - } + try { + List sourceObjects = scanFolder(bucketName, sourceFolderPath); - public List scanFolder(String bucketName, String folderPath) { - logger.info("Scanning folder for bucket {} and path {} ...", bucketName, folderPath); + for (S3ObjectSummary s3ObjectSummary : sourceObjects) { + String sourceKey = s3ObjectSummary.key(); + String targetKey = sourceKey.replace(sourceFolderPath, targetFolderPath); - ListObjectsRequest listObjects = new ListObjectsRequest() - .withPrefix(folderPath) - .withBucketName(bucketName); + CopyObjectRequest copyRequest = CopyObjectRequest.builder() + .copySource(bucketName + "/" + sourceKey) + .destinationBucket(bucketName) + .destinationKey(targetKey) + .build(); - ObjectListing objectListing = client.listObjects(listObjects); - List summaries = new LinkedList<>(objectListing.getObjectSummaries()); - while (objectListing.isTruncated()) { - objectListing = client.listNextBatchOfObjects(objectListing); - summaries.addAll(objectListing.getObjectSummaries()); + client.copyObject(copyRequest); + } + } catch (Exception e) { + logger.error("Failed to copy folder from '{}' to '{}' in bucket '{}': {}", sourceFolderPath, targetFolderPath, bucketName, e.getMessage(), e); } + } - return summaries; + public List scanFolder(String bucketName, String folderPath) { + return S3ClientHelper.scanFolder(client, bucketName, folderPath); } public boolean isLocal() { - if(CService.configuration.HUB_ENDPOINT.contains("localhost") || + if (CService.configuration.HUB_ENDPOINT.contains("localhost") || CService.configuration.HUB_ENDPOINT.contains("xyz-hub:8080")) return true; return false; } -} +} \ No newline at end of file diff --git a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/JobS3Client.java b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/JobS3Client.java index d47c233eac..b35df04dc5 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/JobS3Client.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/config/JobS3Client.java @@ -18,12 +18,6 @@ */ package com.here.xyz.httpconnector.config; -import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.amazonaws.services.s3.model.S3ObjectInputStream; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.google.common.cache.Cache; import com.google.common.cache.CacheBuilder; import com.here.xyz.httpconnector.CService; @@ -33,12 +27,11 @@ import com.here.xyz.httpconnector.util.jobs.ImportObject; import com.here.xyz.httpconnector.util.jobs.Job; import com.here.xyz.httpconnector.util.jobs.validate.Validator; +import com.here.xyz.jobs.util.S3ClientHelper; +import com.here.xyz.util.service.aws.S3ObjectSummary; import io.vertx.core.Future; -import java.io.BufferedReader; -import java.io.EOFException; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.UnsupportedEncodingException; + +import java.io.*; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.HashMap; @@ -47,22 +40,32 @@ import java.util.concurrent.TimeUnit; import java.util.zip.GZIPInputStream; import java.util.zip.ZipException; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.core.ResponseInputStream; +import software.amazon.awssdk.core.exception.SdkException; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.HeadObjectRequest; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; public class JobS3Client extends AwsS3Client { - private static final Logger logger = LogManager.getLogger(); - - private static final int VALIDATE_LINE_KB_STEPS = 512 * 1024; - private static final int VALIDATE_LINE_MAX_LINE_SIZE_BYTES = 4 * 1024 * 1024; - - protected static final String IMPORT_MANUAL_UPLOAD_FOLDER = "manual"; - protected static final String IMPORT_UPLOAD_FOLDER = "imports"; public static final String EXPORT_DOWNLOAD_FOLDER = "exports"; public static final String EXPORT_PERSIST_FOLDER = "persistent"; + protected static final String IMPORT_MANUAL_UPLOAD_FOLDER = "manual"; + protected static final String IMPORT_UPLOAD_FOLDER = "imports"; + private static final Logger logger = LogManager.getLogger(); + private static final int VALIDATE_LINE_KB_STEPS = 512 * 1024; + private static final int VALIDATE_LINE_MAX_LINE_SIZE_BYTES = 4 * 1024 * 1024; + private static Cache> s3ScanningCache = CacheBuilder + .newBuilder() + .maximumSize(100) + .expireAfterWrite(10, TimeUnit.MINUTES) + .build(); - public static String getImportPath(String jobId, String part){ - return IMPORT_UPLOAD_FOLDER +"/"+ jobId+"/"+part; + public static String getImportPath(String jobId, String part) { + return IMPORT_UPLOAD_FOLDER + "/" + jobId + "/" + part; } public ImportObject generateUploadURL(Import job) throws IOException { @@ -75,43 +78,41 @@ public ImportObject generateUploadURL(String bucketName, Import job) throws IOEx int currentPart = job.getImportObjects().size(); String key = "${uploadFolder}/${jobId}/part_${currentPart}.${extension}" - .replace("${uploadFolder}",IMPORT_UPLOAD_FOLDER) - .replace("${jobId}",job.getId()) - .replace("${currentPart}",Integer.toString(currentPart)) - .replace("${extension}",extension); + .replace("${uploadFolder}", IMPORT_UPLOAD_FOLDER) + .replace("${jobId}", job.getId()) + .replace("${currentPart}", Integer.toString(currentPart)) + .replace("${extension}", extension); URL url = generateUploadURL(bucketName, key); - return new ImportObject(key,url); + return new ImportObject(key, url); } - public Map scanImportPath(Import job, Job.CSVFormat csvFormat){ + public Map scanImportPath(Import job, Job.CSVFormat csvFormat) { /** if we cant find a upload url read from IMPORT_MANUAL_UPLOAD_FOLDER */ String firstKey = (String) job.getImportObjects().keySet().toArray()[0]; String path = getS3Path(job); /** manual uploaded files are not allowed to be named as part_*.csv */ - if(!firstKey.matches("part_\\d*.csv")) - path = IMPORT_MANUAL_UPLOAD_FOLDER +"/"+ path; + if (!firstKey.matches("part_\\d*.csv")) + path = IMPORT_MANUAL_UPLOAD_FOLDER + "/" + path; return scanImportPath(path, CService.configuration.JOBS_S3_BUCKET, csvFormat); } - public Map scanImportPath(String prefix, String bucketName, Job.CSVFormat csvFormat){ + public Map scanImportPath(String prefix, String bucketName, Job.CSVFormat csvFormat) { Map importObjectList = new HashMap<>(); - for (S3ObjectSummary objectSummary : scanFolder(bucketName, prefix)) { - /** localstack does not set the bucket name */ - if(objectSummary.getBucketName() == null) - objectSummary.setBucketName(bucketName); - ObjectMetadata objectMetadata = client.getObjectMetadata(bucketName, objectSummary.getKey()); - ImportObject importObject = checkFile(objectSummary, objectMetadata, csvFormat); - importObjectList.put(importObject.getFilename(), importObject ); + for (S3ObjectSummary s3ObjectSummary : scanFolder(bucketName, prefix)) { + + HeadObjectResponse metadata = S3ClientHelper.loadMetadata(client, bucketName, s3ObjectSummary.key()); + ImportObject importObject = checkFile(s3ObjectSummary, metadata, csvFormat); + importObjectList.put(importObject.getFilename(), importObject); } return importObjectList; } - private ImportObject checkFile(S3ObjectSummary s3ObjectSummary, ObjectMetadata objectMetadata, Job.CSVFormat csvFormat){ + private ImportObject checkFile(S3ObjectSummary s3ObjectSummary, HeadObjectResponse objectMetadata, Job.CSVFormat csvFormat) { //skip validation till refactoring is done. ImportObject io = new ImportObject(s3ObjectSummary, objectMetadata); io.setStatus(ImportObject.Status.waiting); @@ -119,24 +120,24 @@ private ImportObject checkFile(S3ObjectSummary s3ObjectSummary, ObjectMetadata o return io; } - private ImportObject checkFileBak(S3ObjectSummary s3ObjectSummary, ObjectMetadata objectMetadata, Job.CSVFormat csvFormat){ + private ImportObject checkFileBak(S3ObjectSummary s3ObjectSummary, HeadObjectResponse objectMetadata, Job.CSVFormat csvFormat) { ImportObject io = new ImportObject(s3ObjectSummary, objectMetadata); try { - if(objectMetadata.getContentEncoding() != null && - objectMetadata.getContentEncoding().equalsIgnoreCase("gzip")){ - validateFirstZippedCSVLine(io.getS3Key(), s3ObjectSummary.getBucketName(), csvFormat, "", 0); - }else{ - validateFirstCSVLine(io.getS3Key(), s3ObjectSummary.getBucketName(),csvFormat, "", 0); + if (objectMetadata.contentEncoding() != null && + objectMetadata.contentEncoding().equalsIgnoreCase("gzip")) { + validateFirstZippedCSVLine(io.getS3Key(), s3ObjectSummary.bucket(), csvFormat, "", 0); + } else { + validateFirstCSVLine(io.getS3Key(), s3ObjectSummary.bucket(), csvFormat, "", 0); } io.setStatus(ImportObject.Status.waiting); io.setValid(true); } catch (Exception e) { - if(e instanceof UnsupportedEncodingException){ + if (e instanceof UnsupportedEncodingException) { logger.info("CSV Format is not valid: {}", io.getS3Key()); - }else if(e instanceof ZipException){ + } else if (e instanceof ZipException) { logger.info("Wrong content-encoding: [}", io.getS3Key()); - }else + } else logger.warn("checkFile error {} {}", io.getS3Key(), e); io.setValid(false); } @@ -144,81 +145,82 @@ private ImportObject checkFileBak(S3ObjectSummary s3ObjectSummary, ObjectMetadat return io; } - private void validateFirstCSVLine(String key_name, String bucket_name, Job.CSVFormat csvFormat, String line, int fromKB) throws AmazonServiceException, IOException { - int toKB = fromKB + VALIDATE_LINE_KB_STEPS; + private void validateFirstCSVLine(String keyName, String bucketName, Job.CSVFormat csvFormat, String line, int fromKB) throws IOException { + int toKB = fromKB + VALIDATE_LINE_KB_STEPS; GetObjectRequest gor; - S3Object o = null; - S3ObjectInputStream s3is = null; + InputStream is = null; BufferedReader reader = null; try { - gor = new GetObjectRequest(bucket_name,key_name).withRange(fromKB, toKB ); - o = client.getObject(gor); + gor = GetObjectRequest.builder() + .bucket(bucketName) + .key(keyName) + .range("bytes=" + fromKB + "-" + toKB) + .build(); - s3is = o.getObjectContent(); - reader = new BufferedReader(new InputStreamReader(s3is, StandardCharsets.UTF_8)); + ResponseInputStream objectContent = client.getObject(gor); + is = objectContent; + reader = new BufferedReader(new InputStreamReader(is, StandardCharsets.UTF_8)); int val; - while((val = reader.read()) != -1) { - char c = (char)val; + while ((val = reader.read()) != -1) { + char c = (char) val; line += c; - if(c == '\n' || c == '\r'){ + if (c == '\n' || c == '\r') { Validator.validateCSVLine(line, csvFormat); return; } } - }catch (AmazonServiceException e){ + } catch (SdkException e) { /** Did not find a lineBreak - maybe CSV with 1LOC */ - if(e.getErrorCode().equalsIgnoreCase("InvalidRange")){ - logger.info("Invalid Range found for s3Key {}", key_name); + if (e.getMessage().contains("InvalidRange")) { + logger.info("Invalid Range found for s3Key {}", keyName); Validator.validateCSVLine(line, csvFormat); return; } throw e; - }finally { - if(s3is !=null) { - s3is.abort(); - s3is.close(); + } finally { + if (is != null) { + is.close(); } - if(o != null) - o.close(); - if(reader != null) + if (reader != null) reader.close(); } /** not found a line break */ - - if(toKB <= VALIDATE_LINE_MAX_LINE_SIZE_BYTES) { + if (toKB <= VALIDATE_LINE_MAX_LINE_SIZE_BYTES) { /** not found a line break till now - search further */ fromKB = toKB + 1; - validateFirstCSVLine(key_name, bucket_name, csvFormat, line, fromKB); - } - else + validateFirstCSVLine(keyName, bucketName, csvFormat, line, fromKB); + } else throw new UnsupportedEncodingException("Not able to find EOL!"); } - private void validateFirstZippedCSVLine(String key_name, String bucket_name, Job.CSVFormat csvFormat, String line, int toKB) throws AmazonServiceException, IOException { - if(toKB == 0) + private void validateFirstZippedCSVLine(String keyName, String bucketName, Job.CSVFormat csvFormat, String line, int toKB) throws IOException { + if (toKB == 0) toKB = VALIDATE_LINE_KB_STEPS; GetObjectRequest gor; - S3Object o = null; - S3ObjectInputStream s3is = null; + InputStream is = null; BufferedReader reader = null; int val; try { - gor = new GetObjectRequest(bucket_name,key_name).withRange(0, toKB ); + gor = GetObjectRequest.builder() + .bucket(bucketName) + .key(keyName) + .range("bytes=0-" + toKB) + .build(); - o = client.getObject(gor); - s3is = o.getObjectContent(); + ResponseInputStream objectContent = client.getObject(gor); + is = objectContent; reader = new BufferedReader(new InputStreamReader( - new GZIPInputStream(s3is))); + new GZIPInputStream(is))); while ((val = reader.read()) != -1) { char c = (char) val; @@ -229,55 +231,45 @@ private void validateFirstZippedCSVLine(String key_name, String bucket_name, Job return; } } - }catch (EOFException e) { + } catch (EOFException e) { /** Ignore incomplete stream */ - }finally { - if(s3is !=null) { - s3is.abort(); - s3is.close(); + } finally { + if (is != null) { + is.close(); } - if(o != null) - o.close(); - if(reader != null) + if (reader != null) reader.close(); } - if(toKB <= VALIDATE_LINE_MAX_LINE_SIZE_BYTES) { + if (toKB <= VALIDATE_LINE_MAX_LINE_SIZE_BYTES) { /** not found a line break till now - search further */ toKB = toKB + VALIDATE_LINE_KB_STEPS; - validateFirstZippedCSVLine(key_name, bucket_name, csvFormat, line, toKB); - } - else + validateFirstZippedCSVLine(keyName, bucketName, csvFormat, line, toKB); + } else throw new UnsupportedEncodingException("Not able to find EOL!"); } - private static Cache> s3ScanningCache = CacheBuilder - .newBuilder() - .maximumSize(100) - .expireAfterWrite(10, TimeUnit.MINUTES) - .build(); - - public Map scanExportPathCached(String prefix) { + public Map scanExportPathCached(String prefix) { try { return s3ScanningCache.get(prefix, () -> scanExportPath(prefix)); - } - catch (ExecutionException e) { + } catch (ExecutionException e) { throw new RuntimeException(e.getCause()); } } - public Map scanExportPath(String prefix) { + public Map scanExportPath(String prefix) { String bucketName = CService.configuration.JOBS_S3_BUCKET; Map exportObjects = new HashMap<>(); for (S3ObjectSummary objectSummary : scanFolder(bucketName, prefix)) { //Skip empty files - if (objectSummary.getSize() == 0) + if (objectSummary.isEmpty()) continue; - ExportObject eo = new ExportObject(objectSummary.getKey(), objectSummary.getSize()); + ExportObject eo = new ExportObject(objectSummary.key(), objectSummary.size()); if (eo.getFilename().equalsIgnoreCase("manifest.json")) - continue;; + continue; + ; exportObjects.put(eo.getFilename(prefix), eo); } @@ -291,18 +283,18 @@ public String getS3Path(Job job) { //Decide if persistent or not. String subFolder = job instanceof Export export && export.readPersistExport() - ? CService.jobS3Client.EXPORT_PERSIST_FOLDER - : CService.jobS3Client.EXPORT_DOWNLOAD_FOLDER; + ? CService.jobS3Client.EXPORT_PERSIST_FOLDER + : CService.jobS3Client.EXPORT_DOWNLOAD_FOLDER; String jobId = job.getId(); return String.join("/", new String[]{ subFolder, jobId - }); + }); } - public Future cleanJobData(Job job){ + public Future cleanJobData(Job job) { String path = getS3Path(job); if (job instanceof Export export && export.getSuperId() != null) diff --git a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/util/jobs/ImportObject.java b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/util/jobs/ImportObject.java index e1b9afdee6..7322a8f769 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/util/jobs/ImportObject.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/httpconnector/util/jobs/ImportObject.java @@ -18,12 +18,12 @@ */ package com.here.xyz.httpconnector.util.jobs; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonInclude; import com.here.xyz.httpconnector.config.JobS3Client; +import com.here.xyz.util.service.aws.S3ObjectSummary; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; import java.net.URL; @@ -65,12 +65,12 @@ public ImportObject(String s3Key, URL uploadUrl) { } - public ImportObject(S3ObjectSummary s3ObjectSummary, ObjectMetadata objectMetadata) { - this.s3Key = s3ObjectSummary.getKey(); - this.filesize = s3ObjectSummary.getSize(); + public ImportObject(S3ObjectSummary s3ObjectSummary, HeadObjectResponse objectMetadata) { + this.s3Key = s3ObjectSummary.key(); + this.filesize = s3ObjectSummary.size(); - if(objectMetadata.getContentEncoding() != null && - objectMetadata.getContentEncoding().equalsIgnoreCase("gzip")) + if(objectMetadata.contentEncoding() != null && + objectMetadata.contentEncoding().equalsIgnoreCase("gzip")) this.compressed = true; } diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/Service.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/Service.java index 5f9dd60a92..5668a52c24 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/Service.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/Service.java @@ -19,11 +19,7 @@ package com.here.xyz.hub; -import com.here.xyz.hub.cache.CacheClient; -import com.here.xyz.hub.cache.InMemoryCacheClient; -import com.here.xyz.hub.cache.MultiLevelCacheClient; -import com.here.xyz.hub.cache.RedisCacheClient; -import com.here.xyz.hub.cache.S3CacheClient; +import com.here.xyz.hub.cache.*; import com.here.xyz.hub.config.ConnectorConfigClient; import com.here.xyz.hub.config.SettingsConfigClient; import com.here.xyz.hub.config.SpaceConfigClient; diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/cache/S3CacheClient.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/cache/S3CacheClient.java index 3939710963..23e32f710b 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/cache/S3CacheClient.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/cache/S3CacheClient.java @@ -16,143 +16,178 @@ * SPDX-License-Identifier: Apache-2.0 * License-Filename: LICENSE */ - package com.here.xyz.hub.cache; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.CopyObjectRequest; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3Object; -import com.google.common.collect.ImmutableMap; -import com.google.common.io.ByteStreams; +import com.here.xyz.jobs.util.S3ClientHelper; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectRequest; +import software.amazon.awssdk.services.s3.model.GetObjectResponse; +import software.amazon.awssdk.services.s3.model.PutObjectRequest; +import software.amazon.awssdk.core.sync.ResponseTransformer; import com.here.xyz.hub.Service; import com.here.xyz.util.service.Core; import io.vertx.core.Future; -import java.io.ByteArrayInputStream; -import java.io.IOException; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.TimeUnit; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; public class S3CacheClient implements CacheClient { - private static final String EXPIRES_AT = "expiresAt"; - private static final String LAST_ACCESSED_AT = "lastAccessedAt"; - private static final long ACCESS_UPDATE_TIME_THRESHOLD = TimeUnit.DAYS.toMillis(1); - private static CacheClient instance; - private static final Logger logger = LogManager.getLogger(); - private volatile AmazonS3 s3client; - private String bucket; - private static final String prefix = "xyz-hub-cache/"; - - - private S3CacheClient() { - if (Service.configuration.XYZ_HUB_S3_BUCKET == null) - throw new RuntimeException("No S3 bucket defined. S3CacheClient can not be used."); - bucket = Service.configuration.XYZ_HUB_S3_BUCKET; - initS3Client(); - } - - private void initS3Client() { - AmazonS3ClientBuilder builder = AmazonS3ClientBuilder - .standard() - .withCredentials(new DefaultAWSCredentialsProviderChain()); - - if (Service.configuration.LOCALSTACK_ENDPOINT != null) { - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - Service.configuration.LOCALSTACK_ENDPOINT, Service.configuration.AWS_REGION)) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("localstack", "localstack"))) - .withPathStyleAccessEnabled(true); + private static final String EXPIRES_AT = "expiresAt"; + private static final String LAST_ACCESSED_AT = "lastAccessedAt"; + private static final long ACCESS_UPDATE_TIME_THRESHOLD = TimeUnit.DAYS.toMillis(1); + private static final Logger logger = LogManager.getLogger(); + private static final String prefix = "xyz-hub-cache/"; + private static CacheClient instance; + private volatile S3Client s3client; + private String bucket; + + private S3CacheClient() { + if (Service.configuration.XYZ_HUB_S3_BUCKET == null) + throw new RuntimeException("No S3 bucket defined. S3CacheClient can not be used."); + bucket = Service.configuration.XYZ_HUB_S3_BUCKET; + initS3Client(); } - s3client = builder.build(); - } + public static synchronized CacheClient getInstance() { + try { + if (instance == null) + instance = new S3CacheClient(); + } catch (Exception e) { + logger.error("Error when trying to create the S3 client.", e); + instance = new NoopCacheClient(); + } + return instance; + } - public static synchronized CacheClient getInstance() { - try { - if (instance == null) - instance = new S3CacheClient(); + private void initS3Client() { + S3ClientBuilder builder = S3Client.builder() + .credentialsProvider(DefaultCredentialsProvider.create()); + + if (Service.configuration.LOCALSTACK_ENDPOINT != null) { + builder + .region(Region.EU_WEST_1) + .endpointOverride(URI.create(Service.configuration.LOCALSTACK_ENDPOINT)) + .credentialsProvider( + StaticCredentialsProvider.create( + AwsBasicCredentials.create("localstack", "localstack"))) + .forcePathStyle(true); + } + + if (Service.configuration.AWS_REGION != null && !Service.configuration.AWS_REGION.isEmpty()) { + builder.region(Region.of(Service.configuration.AWS_REGION)); + } + + s3client = builder.build(); } - catch (Exception e) { - logger.error("Error when trying to create the S3 client.", e); - instance = new NoopCacheClient(); + + @Override + public Future get(String key) { + return Core.vertx.executeBlocking(promise -> { + try { + GetObjectRequest request = GetObjectRequest.builder() + .bucket(bucket) + .key(prefix + key) + .build(); + + byte[] content = s3client.getObject(request, ResponseTransformer.toBytes()).asByteArray(); + + GetObjectResponse response = s3client.getObject(request, ResponseTransformer.toBytes()).response(); + Map metadata = response.metadata(); + String lastAccessedAt = metadata.get(LAST_ACCESSED_AT.toLowerCase()); + + if (lastAccessedAt != null) { + // Update the "lastAccessedAt" metadata field asynchronously + updateLastAccessedAt(key, metadata, Core.currentTimeMillis()); + } + + promise.complete(content); + } catch (Exception e) { + logger.error("Exception trying to read S3 object with key {}.", key, e); + promise.complete(null); + } + }); + } + + @Override + public void set(String key, byte[] value, long ttl) { + Core.vertx.executeBlocking(promise -> { + try { + final long now = Core.currentTimeMillis(); + Map metadata = new HashMap<>(); + metadata.put(EXPIRES_AT.toLowerCase(), "" + (now + TimeUnit.SECONDS.toMillis(ttl))); + metadata.put(LAST_ACCESSED_AT.toLowerCase(), "" + now); + + PutObjectRequest request = PutObjectRequest.builder() + .bucket(bucket) + .key(prefix + key) + .contentLength((long) value.length) + .metadata(metadata) + .build(); + + s3client.putObject(request, RequestBody.fromBytes(value)); + promise.complete(); + } catch (Exception e) { + logger.error("Exception trying to write S3 object with key {}.", key, e); + promise.fail(e); + } + }, false); + } + + private void updateLastAccessedAt(String key, Map existingMetadata, long lastAccessedAt) { + // Only perform the update if the last update was not done too recently (to save requests) + String oldAccessedAtStr = existingMetadata.get(LAST_ACCESSED_AT.toLowerCase()); + if (oldAccessedAtStr == null) return; + + long oldAccessedAt = Long.parseLong(oldAccessedAtStr) + ACCESS_UPDATE_TIME_THRESHOLD; + if (lastAccessedAt - ACCESS_UPDATE_TIME_THRESHOLD < oldAccessedAt) + return; + + Core.vertx.executeBlocking(promise -> { + try { + Map newMetadata = new HashMap<>(existingMetadata); + newMetadata.put(LAST_ACCESSED_AT.toLowerCase(), "" + lastAccessedAt); + + CopyObjectRequest copyRequest = CopyObjectRequest.builder() + .sourceBucket(bucket) + .sourceKey(prefix + key) + .destinationBucket(bucket) + .destinationKey(prefix + key) + .metadata(newMetadata) + .metadataDirective(software.amazon.awssdk.services.s3.model.MetadataDirective.REPLACE) + .build(); + + s3client.copyObject(copyRequest); + promise.complete(); + } catch (Exception e) { + logger.error("Exception trying to update metadata for S3 object with key {}.", key, e); + promise.fail(e); + } + }, false); + } + + @Override + public void remove(String key) { + Core.vertx.executeBlocking(promise -> { + S3ClientHelper.deleteObject(s3client, bucket, prefix + key); + promise.complete(); + }, false); + } + + @Override + public void shutdown() { + instance = null; + s3client.close(); } - return instance; - } - - @Override - public Future get(String key) { - return Core.vertx.executeBlocking(promise -> { - S3Object object = s3client.getObject(bucket, prefix + key); - try { - promise.complete(ByteStreams.toByteArray(object.getObjectContent())); - //Update the "lastAccessedAt" metadata field asynchronously - updateLastAccessedAt(key, object.getObjectMetadata(), Core.currentTimeMillis()); - } - catch (IOException e) { - logger.error("Exception trying to read S3 object with key {}.", key, e); - promise.complete(null); - } - }); - } - - @Override - public void set(String key, byte[] value, long ttl) { - Core.vertx.executeBlocking(promise -> { - final long now = Core.currentTimeMillis(); - s3client.putObject(bucket, prefix + key, new ByteArrayInputStream(value), - getMetadata(now + TimeUnit.SECONDS.toMillis(ttl), now, value.length)); - promise.complete(); - }, false); - } - - private static ObjectMetadata getMetadata(long expiresAt, long lastAccessedAt, long contentLength) { - return getMetadata("" + expiresAt, lastAccessedAt, contentLength); - } - - private static ObjectMetadata getMetadata(String expiresAt, long lastAccessedAt, long contentLength) { - ObjectMetadata metaData = new ObjectMetadata(); - metaData.setContentLength(contentLength); - metaData.setUserMetadata(ImmutableMap.of( - EXPIRES_AT, "" + expiresAt, - LAST_ACCESSED_AT, "" + lastAccessedAt - )); - return metaData; - } - - private void updateLastAccessedAt(String key, ObjectMetadata existingMetadata, long lastAccessedAt) { - //Only perform the update if the last update was not done too recently (to save requests) - long oldAccessedAt = Long.parseLong(existingMetadata.getUserMetadata().get(LAST_ACCESSED_AT)) + ACCESS_UPDATE_TIME_THRESHOLD; - if (lastAccessedAt - ACCESS_UPDATE_TIME_THRESHOLD < oldAccessedAt) - return; - Core.vertx.executeBlocking(promise -> { - s3client.copyObject(new CopyObjectRequest() - .withSourceBucketName(bucket) - .withSourceKey(key) - .withDestinationBucketName(bucket) - .withDestinationKey(key) - .withNewObjectMetadata(getMetadata(existingMetadata.getUserMetadata().get(EXPIRES_AT), lastAccessedAt, - existingMetadata.getContentLength())) - ); - promise.complete(); - }, false); - } - - @Override - public void remove(String key) { - Core.vertx.executeBlocking(promise -> { - s3client.deleteObject(bucket, prefix + key); - promise.complete(); - }, false); - } - - @Override - public void shutdown() { - instance = null; - s3client.shutdown(); - } -} +} \ No newline at end of file diff --git a/xyz-hub-test/README.md b/xyz-hub-test/README.md new file mode 100644 index 0000000000..2d9eebb975 --- /dev/null +++ b/xyz-hub-test/README.md @@ -0,0 +1,33 @@ +# XYZ Hub Tests + +## Steps to prepare local env to run tests + +1. Start all dependent containers: +```shell + docker compose --file ../docker-compose-dynamodb.yml up -d --build --force-recreate postgres redis dynamodb aws-localstack +``` +2. Build & deploy the Job Step Lambda into the localstack by running the run-config `xyz-job-steps [install]` +3. Start the XYZ Hub Service by running the run-config `HubService` +4. Start the XYZ Job Service by running the run-config `JobService` +5. Start the XYZ Connector service by running the run-config `CService` + +## Troubleshooting + +If you experience **500** or **403** errors when running tests in the master branch, it might be due to issues with your Docker volumes. Follow the steps below to reset your Docker environment: + +1. **Shut down all running Docker containers associated with your compose files:** + +```shell + docker compose --file ../docker-compose-dynamodb.yml down +``` + +2. **Delete all Docker volumes:** + +> **Warning:** This step will remove all Docker volumes on your system. Ensure that you do not need any of the data stored in these volumes before running this command. + +```shell + docker volume prune -f +``` + +After completing these steps, try setting up your env from scratch. +This process ensures any corrupted or outdated volume data is removed, giving you a clean state for your Docker environment. diff --git a/xyz-jobs/xyz-job-service/pom.xml b/xyz-jobs/xyz-job-service/pom.xml index c26bc9d1aa..1872a334fc 100644 --- a/xyz-jobs/xyz-job-service/pom.xml +++ b/xyz-jobs/xyz-job-service/pom.xml @@ -66,16 +66,6 @@ - - aws-java-sdk-s3 - com.amazonaws - - - jackson-annotations - com.fasterxml.jackson.core - - - aws-java-sdk-secretsmanager com.amazonaws diff --git a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/AsyncS3Client.java b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/AsyncS3Client.java index d4105ccbe1..b3b3a660fa 100644 --- a/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/AsyncS3Client.java +++ b/xyz-jobs/xyz-job-service/src/main/java/com/here/xyz/jobs/util/AsyncS3Client.java @@ -19,9 +19,9 @@ package com.here.xyz.jobs.util; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.here.xyz.jobs.service.Config; import com.here.xyz.util.Async; +import com.here.xyz.util.service.aws.S3ObjectSummary; import io.vertx.core.Future; import java.util.List; import java.util.Map; diff --git a/xyz-jobs/xyz-job-steps/pom.xml b/xyz-jobs/xyz-job-steps/pom.xml index 4d86913963..356ce22cc3 100644 --- a/xyz-jobs/xyz-job-steps/pom.xml +++ b/xyz-jobs/xyz-job-steps/pom.xml @@ -84,17 +84,6 @@ - - - aws-java-sdk-s3 - com.amazonaws - - - jackson-annotations - com.fasterxml.jackson.core - - - aws-java-sdk-rds diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java index 38b991715d..327561306d 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/Step.java @@ -226,12 +226,12 @@ private List loadOutputs(Set s3Prefixes, boolean modelBased) { //TODO: Scan the different folders in parallel .flatMap(s3Prefix -> S3Client.getInstance().scanFolder(s3Prefix) .stream() - .filter(s3ObjectSummary -> s3ObjectSummary.getSize() > 0) + .filter(s3ObjectSummary -> s3ObjectSummary.size() > 0) .map(s3ObjectSummary -> modelBased - ? ModelBasedOutput.load(s3ObjectSummary.getKey(), outputMetadata) + ? ModelBasedOutput.load(s3ObjectSummary.key(), outputMetadata) : new DownloadUrl() - .withS3Key(s3ObjectSummary.getKey()) - .withByteSize(s3ObjectSummary.getSize()) + .withS3Key(s3ObjectSummary.key()) + .withByteSize(s3ObjectSummary.size()) .withMetadata(outputMetadata))) .collect(Collectors.toList()); } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java index 0a596be22f..ed1cbd92fd 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/execution/RunEmrJob.java @@ -22,8 +22,6 @@ import static com.here.xyz.jobs.steps.execution.LambdaBasedStep.ExecutionMode.SYNC; import static java.util.regex.Matcher.quoteReplacement; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.annotation.JsonIgnore; import com.here.xyz.jobs.steps.StepExecution; import com.here.xyz.jobs.steps.inputs.Input; @@ -49,8 +47,11 @@ import java.util.Objects; import java.util.function.Function; import java.util.regex.Pattern; + +import com.here.xyz.util.service.aws.S3ObjectSummary; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.services.s3.model.S3Exception; public class RunEmrJob extends LambdaBasedStep { public static final String EMR_JOB_NAME_PREFIX = "step:"; @@ -337,7 +338,7 @@ private String copyFileFromS3ToLocal(String s3Path) { catch (FileAlreadyExistsException e) { logger.info("[EMR-local] File: '{}' already exists locally - skip download.", s3Path); } - catch (AmazonS3Exception e) { + catch (S3Exception e) { throw new RuntimeException("[EMR-local] Can't download File: '" + s3Path + "' for local copy!", e); } catch (IOException e) { @@ -355,8 +356,8 @@ private String copyFolderFromS3ToLocal(String s3Path) { List s3ObjectSummaries = S3Client.getInstance().scanFolder(s3Path); for (S3ObjectSummary s3ObjectSummary : s3ObjectSummaries) { - if (!s3ObjectSummary.getKey().contains("modelBased")) - copyFileFromS3ToLocal(s3ObjectSummary.getKey()); + if (!s3ObjectSummary.key().contains("modelBased")) + copyFileFromS3ToLocal(s3ObjectSummary.key()); } return getLocalTmpPath(s3Path); } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java index 9605c5bb02..9277065243 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/steps/inputs/Input.java @@ -23,9 +23,6 @@ import static com.here.xyz.jobs.util.S3Client.getKeyFromS3Uri; import com.amazonaws.AmazonServiceException; -import com.amazonaws.services.s3.model.AmazonS3Exception; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonSubTypes; @@ -47,8 +44,12 @@ import java.util.concurrent.ForkJoinPool; import java.util.stream.Collectors; import java.util.stream.Stream; + +import com.here.xyz.util.service.aws.S3ObjectSummary; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.services.s3.model.HeadObjectResponse; +import software.amazon.awssdk.services.s3.model.S3Exception; @JsonSubTypes({ @JsonSubTypes.Type(value = UploadUrl.class, name = "UploadUrl"), @@ -172,7 +173,7 @@ private static List loadInputsAndWriteMetadata(String jobId return (maxReturnSize > 0 ? inputs.unordered().limit(maxReturnSize) : inputs).toList(); } - catch (IOException | AmazonS3Exception ignore) {} + catch (IOException | S3Exception ignore) {} final List inputs = loadInputsInParallel(defaultBucket(), inputS3Prefix(jobId, setName), maxReturnSize, inputType); //Only write metadata of jobs which are submitted already @@ -191,7 +192,7 @@ public static final S3Uri loadResolvedUserInputPrefixUri(String jobId, String se static List loadAllInputSetNames(String jobId) { return S3Client.getInstance().scanFolder(inputMetaS3Prefix(jobId)).stream() - .map(s3ObjectSummary -> s3ObjectSummary.getKey().substring(0, s3ObjectSummary.getKey().lastIndexOf(".json"))) + .map(s3ObjectSummary -> s3ObjectSummary.key().substring(0, s3ObjectSummary.key().lastIndexOf(".json"))) .toList(); } @@ -199,12 +200,12 @@ private static Optional loadMetadataIfExists(String jobId, Strin try { return Optional.of(loadMetadata(jobId, setName)); } - catch (IOException | AmazonS3Exception e) { + catch (IOException | S3Exception e) { return Optional.empty(); } } - static final InputsMetadata loadMetadata(String jobId, String setName) throws IOException, AmazonS3Exception { + static final InputsMetadata loadMetadata(String jobId, String setName) throws IOException, S3Exception { InputsMetadata metadata = getFromMetadataCache(jobId, setName); if (metadata != null) return metadata; @@ -221,7 +222,7 @@ static final InputsMetadata loadMetadata(String jobId, String setName) throws IO } static final void addInputReferences(String referencedJobId, String referencingJobId, String setName) throws IOException, - AmazonS3Exception { + S3Exception { InputsMetadata referencedMetadata = loadMetadata(referencedJobId, setName); //Add the referencing job to the list of jobs referencing the metadata referencedMetadata.referencingJobs().add(referencingJobId); @@ -293,8 +294,8 @@ public static List loadInputsSample(String jobId, String se private static List loadAndTransformInputs(String bucketName, String inputS3Prefix, int maxReturnSize, Class inputType) { Stream inputsStream = S3Client.getInstance(bucketName).scanFolder(inputS3Prefix) .parallelStream() - .map(s3ObjectSummary -> createInput(defaultBucket().equals(bucketName) ? null : bucketName, s3ObjectSummary.getKey(), - s3ObjectSummary.getSize(), inputIsCompressed(s3ObjectSummary))) + .map(s3ObjectSummary -> createInput(defaultBucket().equals(bucketName) ? null : bucketName, s3ObjectSummary.key(), + s3ObjectSummary.size(), inputIsCompressed(s3ObjectSummary))) .filter(input -> input.getByteSize() > 0 && inputType.isAssignableFrom(input.getClass())); if (maxReturnSize > 0) @@ -325,7 +326,7 @@ private static void deleteInputs(String owningJobId, String referencingJob, Stri metadata = loadMetadata(owningJobId, setName); metadata.referencingJobs().remove(referencingJob); } - catch (AmazonS3Exception | IOException ignore) {} + catch (S3Exception | IOException ignore) {} //Only delete the inputs if no other job is referencing them anymore if (metadata == null || metadata.referencingJobs().isEmpty()) { @@ -352,9 +353,9 @@ private static Input createInput(String s3Bucket, String s3Key, long byteSize, b } private static boolean inputIsCompressed(S3ObjectSummary objectSummary) { - if (objectSummary.getKey().endsWith(".gz")) + if (objectSummary.key().endsWith(".gz")) return true; - if (!objectSummary.getBucketName().equals(defaultBucket())) + if (!objectSummary.bucket().equals(defaultBucket())) return false; /* NOTE: @@ -362,8 +363,8 @@ private static boolean inputIsCompressed(S3ObjectSummary objectSummary) { the metadata still has to be loaded for now. */ // - ObjectMetadata metadata = S3Client.getInstance(objectSummary.getBucketName()).loadMetadata(objectSummary.getKey()); - return metadata.getContentEncoding() != null && metadata.getContentEncoding().equalsIgnoreCase("gzip"); + HeadObjectResponse metadata = S3Client.getInstance(objectSummary.bucket()).loadMetadata(objectSummary.key()); + return metadata.contentEncoding() != null && metadata.contentEncoding().equalsIgnoreCase("gzip"); } public static void activateInputsCache(String jobId) { diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3Client.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3Client.java index ffa1846671..6559b99afc 100644 --- a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3Client.java +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3Client.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2017-2024 HERE Europe B.V. + * Copyright (C) 2017-2025 HERE Europe B.V. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,307 +16,250 @@ * SPDX-License-Identifier: Apache-2.0 * License-Filename: LICENSE */ - package com.here.xyz.jobs.util; -import static com.amazonaws.HttpMethod.GET; -import static com.amazonaws.HttpMethod.PUT; - -import com.amazonaws.HttpMethod; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.s3.AmazonS3; -import com.amazonaws.services.s3.AmazonS3ClientBuilder; -import com.amazonaws.services.s3.model.GeneratePresignedUrlRequest; -import com.amazonaws.services.s3.model.GetObjectRequest; -import com.amazonaws.services.s3.model.ListObjectsRequest; -import com.amazonaws.services.s3.model.ListObjectsV2Request; -import com.amazonaws.services.s3.model.ListObjectsV2Result; -import com.amazonaws.services.s3.model.ObjectListing; -import com.amazonaws.services.s3.model.ObjectMetadata; -import com.amazonaws.services.s3.model.PutObjectRequest; -import com.amazonaws.services.s3.model.S3ObjectSummary; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonValue; import com.here.xyz.jobs.steps.Config; +import com.here.xyz.util.service.aws.S3ObjectSummary; import com.here.xyz.util.service.aws.SecretManagerCredentialsProvider; -import java.io.ByteArrayInputStream; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.s3.S3ClientBuilder; +import software.amazon.awssdk.services.s3.S3Configuration; +import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; + import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; import java.net.URL; -import java.util.ArrayList; -import java.util.Date; -import java.util.LinkedList; +import java.time.Duration; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.zip.GZIPOutputStream; +import java.util.stream.Collectors; public class S3Client { - private static Map instances = new ConcurrentHashMap<>(); - private final String bucketName; - protected static final int PRESIGNED_URL_EXPIRATION_SECONDS = 7 * 24 * 60 * 60; - - //TODO: Switch to AWS SDK2 - - protected final AmazonS3 client; - - protected S3Client(String bucketName) { - this.bucketName = bucketName; - - final AmazonS3ClientBuilder builder = AmazonS3ClientBuilder.standard(); + protected static final int PRESIGNED_URL_EXPIRATION_SECONDS = 7 * 24 * 60 * 60; + private static Map instances = new ConcurrentHashMap<>(); + protected final software.amazon.awssdk.services.s3.S3Client client; + protected final S3Presigner presigner; + private final String bucketName; + + protected S3Client(String bucketName) { + this.bucketName = bucketName; + final String defaultRegion = "eu-west-1"; + S3ClientBuilder builder = software.amazon.awssdk.services.s3.S3Client.builder(); + S3Presigner.Builder presignerBuilder = S3Presigner.builder(); + + if (Config.instance != null && Config.instance.LOCALSTACK_ENDPOINT != null) { + + builder + .credentialsProvider( + software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("localstack", "localstack") + ) + ) + .endpointOverride(Config.instance.LOCALSTACK_ENDPOINT) + .region(Region.of(defaultRegion)) + .forcePathStyle(true); + presignerBuilder + .endpointOverride(Config.instance.LOCALSTACK_ENDPOINT) + .serviceConfiguration(S3Configuration.builder().pathStyleAccessEnabled(true).build()) + .region(Region.of(defaultRegion)) + .credentialsProvider( + software.amazon.awssdk.auth.credentials.StaticCredentialsProvider.create( + software.amazon.awssdk.auth.credentials.AwsBasicCredentials.create("localstack", "localstack") + )); + } else if (Config.instance != null && Config.instance.JOBS_S3_BUCKET.equals(bucketName)) { + final String region = Config.instance != null ? Config.instance.AWS_REGION : defaultRegion; //TODO: Remove default value + builder.region(Region.of(region)); + presignerBuilder.region(Region.of(region)); + } else { + GetBucketLocationResponse bucketLocation = getInstance().client.getBucketLocation(GetBucketLocationRequest.builder().bucket(bucketName).build()); + String bucketRegion = bucketLocation.locationConstraintAsString(); + if (Config.instance.forbiddenSourceRegions().contains(bucketRegion)) + throw new IllegalArgumentException("Source bucket region " + bucketRegion + " is not allowed."); + builder.region(Region.of(bucketRegion)); + presignerBuilder.region(Region.of(bucketRegion)); + } + + if (Config.instance != null && Config.instance.JOB_BOT_SECRET_ARN != null) { + synchronized (S3Client.class) { + builder.credentialsProvider(new SecretManagerCredentialsProvider(Config.instance.AWS_REGION, + Config.instance.LOCALSTACK_ENDPOINT == null ? null : Config.instance.LOCALSTACK_ENDPOINT.toString(), + Config.instance.JOB_BOT_SECRET_ARN)); + presignerBuilder.credentialsProvider(new SecretManagerCredentialsProvider(Config.instance.AWS_REGION, + Config.instance.LOCALSTACK_ENDPOINT == null ? null : Config.instance.LOCALSTACK_ENDPOINT.toString(), + Config.instance.JOB_BOT_SECRET_ARN)); + } + } + + this.client = builder.build(); + this.presigner = presignerBuilder.build(); + } - if (Config.instance != null && Config.instance.LOCALSTACK_ENDPOINT != null) { - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration( - Config.instance.LOCALSTACK_ENDPOINT.toString(), Config.instance.AWS_REGION)) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("localstack", "localstack"))) - .withPathStyleAccessEnabled(true); + public static S3Client getInstance() { + return getInstance(Config.instance.JOBS_S3_BUCKET); } - else if (Config.instance.JOBS_S3_BUCKET.equals(bucketName)) { - final String region = Config.instance != null ? Config.instance.AWS_REGION : "eu-west-1"; //TODO: Remove default value - builder.setRegion(region); + + public static S3Client getInstance(String bucketName) { + if (!instances.containsKey(bucketName)) + instances.put(bucketName, new S3Client(bucketName)); + return instances.get(bucketName); } - else { - String bucketRegion = getInstance().client.getBucketLocation(bucketName); - if (Config.instance.forbiddenSourceRegions().contains(bucketRegion)) - throw new IllegalArgumentException("Source bucket region " + bucketRegion + " is not allowed."); - builder.setRegion(bucketRegion); + + public static String getBucketFromS3Uri(String s3Uri) { + if (!s3Uri.startsWith("s3://")) + return null; + return s3Uri.substring(5, s3Uri.substring(5).indexOf("/") + 5); } - if (Config.instance != null && Config.instance.JOB_BOT_SECRET_ARN != null) { - synchronized (S3Client.class) { - builder.setCredentials(new SecretManagerCredentialsProvider(Config.instance.AWS_REGION, - Config.instance.LOCALSTACK_ENDPOINT == null ? null : Config.instance.LOCALSTACK_ENDPOINT.toString(), - Config.instance.JOB_BOT_SECRET_ARN)); - } + public static String getKeyFromS3Uri(String s3Uri) { + if (!s3Uri.startsWith("s3://")) + return null; + return s3Uri.substring(s3Uri.substring(5).indexOf("/") + 5 + 1); } - client = builder.build(); - } - - public static S3Client getInstance() { - return getInstance(Config.instance.JOBS_S3_BUCKET); - } - - public static S3Client getInstance(String bucketName) { - if (!instances.containsKey(bucketName)) - instances.put(bucketName, new S3Client(bucketName)); - return instances.get(bucketName); - } - - private URL generatePresignedUrl(String key, HttpMethod method) { - GeneratePresignedUrlRequest generatePresignedUrlRequest = - new GeneratePresignedUrlRequest(bucketName, key) - .withMethod(method) - .withExpiration(new Date(System.currentTimeMillis() + PRESIGNED_URL_EXPIRATION_SECONDS * 1000)); - - return client.generatePresignedUrl(generatePresignedUrlRequest); - } - - public URL generateDownloadURL(String key) { - return generatePresignedUrl(key, GET); - } - - public URL generateUploadURL(String key) { - return generatePresignedUrl(key, PUT); - } - - public List scanFolder(String folderPath) { - ListObjectsRequest listObjects = new ListObjectsRequest() - .withPrefix(folderPath) - .withBucketName(bucketName); - - ObjectListing objectListing = client.listObjects(listObjects); - List summaries = new LinkedList<>(objectListing.getObjectSummaries()); - while (objectListing.isTruncated()) { - objectListing = client.listNextBatchOfObjects(objectListing); - summaries.addAll(objectListing.getObjectSummaries()); + + public URL generateDownloadURL(String key) { + return S3ClientHelper.generateDownloadURL(presigner, bucketName, key, Duration.ofSeconds(PRESIGNED_URL_EXPIRATION_SECONDS)); } - return summaries; - } - - public byte[] loadObjectContent(String s3Key) throws IOException { - return loadObjectContent(s3Key, -1, -1); - } - - public byte[] loadObjectContent(String s3Key, long offset, long length) throws IOException { - return streamObjectContent(s3Key, offset, length).readAllBytes(); - } - - public InputStream streamObjectContent(String s3Key) { - return streamObjectContent(s3Key, -1, -1); - } - - public InputStream streamObjectContent(String s3Key, long offset, long length) { - GetObjectRequest getObjectRequest = new GetObjectRequest(bucketName, s3Key); - - if (offset >= 0 && length >= 0) - getObjectRequest.setRange(offset, length); - else if (offset >= 0) - getObjectRequest.setRange(offset); - return client.getObject(getObjectRequest).getObjectContent(); - } - - public void putObject(String s3Key, String contentType, String content) throws IOException { - putObject(s3Key, contentType, content.getBytes()); - } - public void putObject(String s3Key, String contentType, byte[] content) throws IOException { - putObject(s3Key, contentType, content,false); - } - - public void putObject(String s3Key, String contentType, byte[] content, boolean gzip) throws IOException { - ObjectMetadata metadata = new ObjectMetadata(); - metadata.setContentType(contentType); - - if (gzip) { - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - try (GZIPOutputStream gzipOutputStream = new GZIPOutputStream(baos)) { - gzipOutputStream.write(content); - } - - metadata.setContentEncoding("gzip"); - metadata.setContentLength(baos.size()); - - client.putObject(new PutObjectRequest(bucketName, s3Key, - new ByteArrayInputStream(baos.toByteArray()), metadata)); + public URL generateUploadURL(String key) { + return S3ClientHelper.generateUploadURL(presigner, bucketName, key, Duration.ofSeconds(PRESIGNED_URL_EXPIRATION_SECONDS)); } - else { - metadata.setContentLength(content.length); - client.putObject(new PutObjectRequest(bucketName, s3Key, new ByteArrayInputStream(content), metadata)); + + public List scanFolder(String folderPath) { + return S3ClientHelper.scanFolder(client, bucketName, folderPath); } - } - - public ObjectMetadata loadMetadata(String key) { - return client.getObjectMetadata(bucketName, key); - } - - public void deleteFolder(String folderPath) { - //TODO: Run partially in parallel in multiple threads - for (S3ObjectSummary file : scanFolder(folderPath)) - //TODO: Delete multiple objects (batches of 1000) with one request instead - client.deleteObject(bucketName, file.getKey()); - } - - public static String getBucketFromS3Uri(String s3Uri) { - if (!s3Uri.startsWith("s3://")) - return null; - return s3Uri.substring(5, s3Uri.substring(5).indexOf("/") + 5); - } - - public static String getKeyFromS3Uri(String s3Uri) { - if (!s3Uri.startsWith("s3://")) - return null; - return s3Uri.substring(s3Uri.substring(5).indexOf("/") + 5 + 1); - } - - /** - * Checks if the provided S3 key is a folder. - * A key is considered a folder if it has other objects under it - * - * @return True if the key is a folder, otherwise false. - */ - public boolean isFolder(String s3Key) { - // enforce prefix formatting for "folders" - if (!s3Key.endsWith("/")) { - s3Key += "/"; + + public byte[] loadObjectContent(String s3Key) throws IOException { + return loadObjectContent(s3Key, -1, -1); } - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(bucketName) - .withPrefix(s3Key) - .withMaxKeys(2); // fetch up to 2 to differentiate a single self object and siblings + public byte[] loadObjectContent(String s3Key, long offset, long length) throws IOException { + return streamObjectContent(s3Key, offset, length).readAllBytes(); + } - ListObjectsV2Result result = client.listObjectsV2(request); + public InputStream streamObjectContent(String s3Key) { + return streamObjectContent(s3Key, -1, -1); + } - // more than one object means it's a folder - if (result.getKeyCount() > 1) { - return true; + public InputStream streamObjectContent(String s3Key, long offset, long length) { + return S3ClientHelper.streamObjectContent(client, bucketName, s3Key, offset, length); } - // exactly one object - check if it matches the key itself - if (result.getKeyCount() == 1) { - String onlyKey = result.getObjectSummaries().get(0).getKey(); - return onlyKey.equals(s3Key); + public void putObject(String s3Key, String contentType, String content) throws IOException { + putObject(s3Key, contentType, content.getBytes()); } - return false; - } - - /** - * Lists all objects starting with the specified prefix (recursively). - * Useful for traversing folders in S3. - * - * @param prefix The prefix or "folder path" to list objects for. - * @return A list of object keys under the specified prefix. - */ - public List listObjects(String prefix) { - List objectKeys = new ArrayList<>(); - String continuationToken = null; - - do { - ListObjectsV2Request request = new ListObjectsV2Request() - .withBucketName(bucketName) - .withPrefix(prefix) - .withContinuationToken(continuationToken); - - ListObjectsV2Result result = client.listObjectsV2(request); - - for (S3ObjectSummary s3Object : result.getObjectSummaries()) { - objectKeys.add(s3Object.getKey()); - } - - continuationToken = result.getNextContinuationToken(); - } while (continuationToken != null); - - return objectKeys; - } - - public static class S3Uri { - private String bucket; - private String key; - private URI uri; - - public S3Uri(URI uri) { - this.uri = uri; + public void putObject(String s3Key, String contentType, byte[] content) throws IOException { + putObject(s3Key, contentType, content, false); } - @JsonCreator - public S3Uri(String uri) { - this(URI.create(uri)); + public void putObject(String s3Key, String contentType, byte[] content, boolean gzip) throws IOException { + byte[] finalContent = content; + if (gzip) { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(content.length); + try (java.util.zip.GZIPOutputStream gzipOutputStream = new java.util.zip.GZIPOutputStream(byteArrayOutputStream)) { + gzipOutputStream.write(content); + } + finalContent = byteArrayOutputStream.toByteArray(); + } + + PutObjectRequest.Builder requestBuilder = PutObjectRequest.builder() + .bucket(bucketName) + .key(s3Key) + .contentLength((long) finalContent.length) + .contentType(contentType); + + if (gzip) { + requestBuilder.contentEncoding("gzip"); + } + + client.putObject(requestBuilder.build(), RequestBody.fromBytes(finalContent)); } - public S3Uri(String bucket, String key) { - assert bucket != null; - assert key != null; - this.bucket = bucket; - this.key = key; + public HeadObjectResponse loadMetadata(String key) { + return S3ClientHelper.loadMetadata(client, bucketName, key); } - public String bucket() { - if (bucket == null) - bucket = uri.getHost(); - return bucket; + public void deleteFolder(String folderPath) { + listObjects(folderPath) + .stream() + .parallel() + .forEach((key) -> S3ClientHelper.deleteObject(client, bucketName, key)); } - public String key() { - if (key == null) - key = uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); - return key; + /** + * Checks if the provided S3 key is a folder. + * A key is considered a folder if it has other objects under it + * + * @return True if the key is a folder, otherwise false. + */ + public boolean isFolder(String s3Key) { + return S3ClientHelper.checkIsFolder(client, bucketName, s3Key); } - public String uri() { - if (uri == null) - uri = URI.create("s3://" + bucket + "/" + key); - return uri.toString(); + /** + * Lists all objects starting with the specified prefix (recursively). + * Useful for traversing folders in S3. + * + * @param prefix The prefix or "folder path" to list objects for. + * @return A list of object keys under the specified prefix. + */ + public List listObjects(String prefix) { + List objects = scanFolder(prefix); + return objects.stream() + .map(S3ObjectSummary::key) + .collect(Collectors.toList()); } - @JsonValue - @Override - public String toString() { - return uri().toString(); + public static class S3Uri { + private String bucket; + private String key; + private URI uri; + + public S3Uri(URI uri) { + this.uri = uri; + } + + @JsonCreator + public S3Uri(String uri) { + this(URI.create(uri)); + } + + public S3Uri(String bucket, String key) { + assert bucket != null; + assert key != null; + this.bucket = bucket; + this.key = key; + } + + public String bucket() { + if (bucket == null) + bucket = uri.getHost(); + return bucket; + } + + public String key() { + if (key == null) + key = uri.getPath().startsWith("/") ? uri.getPath().substring(1) : uri.getPath(); + return key; + } + + public String uri() { + if (uri == null) + uri = URI.create("s3://" + bucket + "/" + key); + return uri.toString(); + } + + @JsonValue + @Override + public String toString() { + return uri().toString(); + } } - } } diff --git a/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3ClientHelper.java b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3ClientHelper.java new file mode 100644 index 0000000000..482d0e343a --- /dev/null +++ b/xyz-jobs/xyz-job-steps/src/main/java/com/here/xyz/jobs/util/S3ClientHelper.java @@ -0,0 +1,124 @@ +/* + * Copyright (C) 2017-2025 HERE Europe B.V. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + * License-Filename: LICENSE + */ +package com.here.xyz.jobs.util; + +import com.here.xyz.util.service.aws.S3ObjectSummary; +import software.amazon.awssdk.services.s3.S3Client; +import software.amazon.awssdk.services.s3.model.*; +import software.amazon.awssdk.services.s3.presigner.S3Presigner; +import software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest; +import software.amazon.awssdk.services.s3.presigner.model.PutObjectPresignRequest; + +import java.io.InputStream; +import java.net.URL; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; + +public class S3ClientHelper { + + public static URL generateDownloadURL(S3Presigner presigner, String bucketName, String key, Duration duration) { + GetObjectRequest getObjectRequest = GetObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + return presigner.presignGetObject(GetObjectPresignRequest.builder() + .signatureDuration(duration) + .getObjectRequest(getObjectRequest) + .build()).url(); + } + + public static URL generateUploadURL(S3Presigner presigner, String bucketName, String key, Duration duration) { + PutObjectRequest putObjectRequest = PutObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + return presigner.presignPutObject(PutObjectPresignRequest.builder() + .signatureDuration(duration) + .putObjectRequest(putObjectRequest) + .build()).url(); + } + + public static List scanFolder(S3Client client, String bucketName, String folderPath) { + + List summaries = new ArrayList<>(); + ListObjectsV2Request listObjectsV2Request = ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(folderPath) + .build(); + + ListObjectsV2Response listResponse; + + do { + listResponse = client.listObjectsV2(listObjectsV2Request); + summaries.addAll(listResponse.contents()); + + listObjectsV2Request = listObjectsV2Request.toBuilder() + .continuationToken(listResponse.nextContinuationToken()) + .build(); + + } while (listResponse.isTruncated()); + + return summaries.stream().map((it) -> S3ObjectSummary.fromS3Object(it, bucketName)).collect(Collectors.toList()); + } + + public static void deleteObject(S3Client client, String bucketName, String key) { + DeleteObjectRequest deleteObjectRequest = DeleteObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + + client.deleteObject(deleteObjectRequest); + } + + public static InputStream streamObjectContent(S3Client client, String bucketName, String s3Key, long offset, long length) { + GetObjectRequest.Builder builder = GetObjectRequest.builder() + .bucket(bucketName) + .key(s3Key); + + if (offset > 0 && length > 0) { + builder.range("bytes=" + offset + "-" + (offset + length - 1)); + } + + return client.getObject(builder.build()); + } + + public static boolean checkIsFolder(S3Client client, String bucketName, String s3Key) { + String normalizedKey = s3Key.endsWith("/") ? s3Key : s3Key + "/"; + + ListObjectsV2Request listRequest = ListObjectsV2Request.builder() + .bucket(bucketName) + .prefix(normalizedKey) + .maxKeys(1) + .build(); + + ListObjectsV2Response response = client.listObjectsV2(listRequest); + return !response.contents().isEmpty(); + } + + public static HeadObjectResponse loadMetadata(S3Client client, String bucketName, String key) { + HeadObjectRequest headObjectRequest = HeadObjectRequest.builder() + .bucket(bucketName) + .key(key) + .build(); + + return client.headObject(headObjectRequest); + } +} diff --git a/xyz-util/pom.xml b/xyz-util/pom.xml index 4914dcb6ee..ab704e667d 100644 --- a/xyz-util/pom.xml +++ b/xyz-util/pom.xml @@ -102,6 +102,24 @@ log4j-jcl + + + software.amazon.awssdk + secretsmanager + + + software.amazon.awssdk + lambda + + + software.amazon.awssdk + apache-client + + + software.amazon.awssdk + s3 + + com.amazonaws @@ -119,10 +137,6 @@ aws-lambda-java-core com.amazonaws - - software.amazon.awssdk - lambda - diff --git a/xyz-util/src/main/java/com/here/xyz/util/service/aws/AwsSecretManagerClient.java b/xyz-util/src/main/java/com/here/xyz/util/service/aws/AwsSecretManagerClient.java index 84de391e27..899a9c6fee 100644 --- a/xyz-util/src/main/java/com/here/xyz/util/service/aws/AwsSecretManagerClient.java +++ b/xyz-util/src/main/java/com/here/xyz/util/service/aws/AwsSecretManagerClient.java @@ -1,19 +1,23 @@ package com.here.xyz.util.service.aws; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSStaticCredentialsProvider; -import com.amazonaws.auth.BasicAWSCredentials; -import com.amazonaws.client.builder.AwsClientBuilder; -import com.amazonaws.services.secretsmanager.AWSSecretsManager; -import com.amazonaws.services.secretsmanager.AWSSecretsManagerClientBuilder; -import com.amazonaws.services.secretsmanager.model.GetSecretValueRequest; -import com.amazonaws.services.secretsmanager.model.GetSecretValueResult; import io.vertx.core.json.JsonObject; +import software.amazon.awssdk.auth.credentials.AwsBasicCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.http.SdkHttpClient; +import software.amazon.awssdk.http.apache.ApacheHttpClient; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClient; +import software.amazon.awssdk.services.secretsmanager.SecretsManagerClientBuilder; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueRequest; +import software.amazon.awssdk.services.secretsmanager.model.GetSecretValueResponse; + +import java.net.URI; public class AwsSecretManagerClient { private final String region; private final String endpointOverride; - private final AWSSecretsManager client; + private final SecretsManagerClient client; public AwsSecretManagerClient(String region) { this(region, null); @@ -23,18 +27,27 @@ public AwsSecretManagerClient(String region, String endpointOverride) { this.region = region; this.endpointOverride = endpointOverride; - AWSSecretsManagerClientBuilder builder = AWSSecretsManagerClientBuilder.standard(); + SdkHttpClient httpClient = ApacheHttpClient.builder().build(); + + SecretsManagerClientBuilder builder = SecretsManagerClient.builder() + .region(Region.of(region)) + .httpClient(httpClient); - if (endpointOverride != null) - builder.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(endpointOverride, region)) - .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials("localstack", "localstack"))); + if (endpointOverride != null) { + builder.endpointOverride(URI.create(endpointOverride)) + .credentialsProvider(StaticCredentialsProvider.create( + AwsBasicCredentials.create("localstack", "localstack"))); + } this.client = builder.build(); } private String getSecret(String secretArn) { - GetSecretValueResult result = client.getSecretValue(new GetSecretValueRequest().withSecretId(secretArn)); - return result.getSecretString(); + GetSecretValueRequest request = GetSecretValueRequest.builder() + .secretId(secretArn) + .build(); + GetSecretValueResponse result = client.getSecretValue(request); + return result.secretString(); } /** @@ -47,8 +60,8 @@ private String getSecret(String secretArn) { * @return the AWSCredenitials created from the secret * */ - public AWSCredentials getCredentialsFromSecret(String secretArn) { + public AwsCredentials getCredentialsFromSecret(String secretArn) { JsonObject secretJson = new JsonObject(getSecret(secretArn)); - return new BasicAWSCredentials(secretJson.getString("accessKey"), secretJson.getString("secretKey")); + return AwsBasicCredentials.create(secretJson.getString("accessKey"), secretJson.getString("secretKey")); } } diff --git a/xyz-util/src/main/java/com/here/xyz/util/service/aws/S3ObjectSummary.java b/xyz-util/src/main/java/com/here/xyz/util/service/aws/S3ObjectSummary.java new file mode 100644 index 0000000000..8ca828a1a4 --- /dev/null +++ b/xyz-util/src/main/java/com/here/xyz/util/service/aws/S3ObjectSummary.java @@ -0,0 +1,12 @@ +package com.here.xyz.util.service.aws; + +import software.amazon.awssdk.services.s3.model.S3Object; + +public record S3ObjectSummary(String key, String bucket, long size) { + public boolean isEmpty() { + return size == 0; + } + public static S3ObjectSummary fromS3Object(S3Object s3Object, String bucketName) { + return new S3ObjectSummary(s3Object.key(), bucketName, s3Object.size()); + } +} diff --git a/xyz-util/src/main/java/com/here/xyz/util/service/aws/SecretManagerCredentialsProvider.java b/xyz-util/src/main/java/com/here/xyz/util/service/aws/SecretManagerCredentialsProvider.java index 9bb9939512..35b2edab5c 100644 --- a/xyz-util/src/main/java/com/here/xyz/util/service/aws/SecretManagerCredentialsProvider.java +++ b/xyz-util/src/main/java/com/here/xyz/util/service/aws/SecretManagerCredentialsProvider.java @@ -1,19 +1,19 @@ package com.here.xyz.util.service.aws; -import com.amazonaws.auth.AWSCredentials; -import com.amazonaws.auth.AWSCredentialsProvider; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import software.amazon.awssdk.auth.credentials.AwsCredentials; +import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider; -public class SecretManagerCredentialsProvider implements AWSCredentialsProvider { +public class SecretManagerCredentialsProvider implements AwsCredentialsProvider { private static final Logger logger = LogManager.getLogger(); private static final int DEFAULT_REFRESH_INTERVAL_SECONDS = 3600; - private final AtomicReference credentialsRef; + private final AtomicReference credentialsRef; private final ScheduledExecutorService scheduler; private String secretArn; @@ -25,7 +25,6 @@ public class SecretManagerCredentialsProvider implements AWSCredentialsProvider public SecretManagerCredentialsProvider(String region, String endpointOverride, String secretArn) { this(region, endpointOverride, secretArn, DEFAULT_REFRESH_INTERVAL_SECONDS); - } public SecretManagerCredentialsProvider(String region, String endpointOverride, String secretArn, long refreshInterval) { @@ -44,9 +43,8 @@ private void scheduleCredentialsRefresh(long refreshInterval) { } @Override - public AWSCredentials getCredentials() { - - AWSCredentials currentCredentials = credentialsRef.get(); + public AwsCredentials resolveCredentials() { + AwsCredentials currentCredentials = credentialsRef.get(); if(currentCredentials == null) { refresh(); @@ -54,13 +52,11 @@ public AWSCredentials getCredentials() { } return currentCredentials; - } - @Override public void refresh() { try { - AWSCredentials newCredentials = jobSecretClient.getCredentialsFromSecret(secretArn); + AwsCredentials newCredentials = jobSecretClient.getCredentialsFromSecret(secretArn); credentialsRef.set(newCredentials); } catch (Exception e) { logger.error("Failed to refresh credentials from secret manager! ", e); diff --git a/xyz-util/src/main/java/com/here/xyz/util/service/aws/dynamo/DynamoClient.java b/xyz-util/src/main/java/com/here/xyz/util/service/aws/dynamo/DynamoClient.java index c4cda6d694..d79bbe1bf1 100644 --- a/xyz-util/src/main/java/com/here/xyz/util/service/aws/dynamo/DynamoClient.java +++ b/xyz-util/src/main/java/com/here/xyz/util/service/aws/dynamo/DynamoClient.java @@ -40,8 +40,8 @@ import com.amazonaws.services.dynamodbv2.model.ResourceInUseException; import com.amazonaws.services.dynamodbv2.model.TimeToLiveSpecification; import com.amazonaws.services.dynamodbv2.model.UpdateTimeToLiveRequest; -import com.amazonaws.services.s3.model.Region; import com.amazonaws.util.CollectionUtils; +import software.amazon.awssdk.regions.Region; import com.here.xyz.util.ARN; import com.here.xyz.util.service.Core; import io.vertx.core.Future; @@ -135,7 +135,7 @@ public void createTable(String tableName, String attributes, String keys, List r.toAWSRegion().getName().equals(arn.getRegion())); + return Region.regions().stream().noneMatch(r -> r.id().equals(arn.getRegion())); } public Future>> executeStatement(ExecuteStatementRequest request) {