diff --git a/flink-filesystems/flink-s3-fs-native/README.md b/flink-filesystems/flink-s3-fs-native/README.md index 4e2bb103ebab5..95b8fa62571a3 100644 --- a/flink-filesystems/flink-s3-fs-native/README.md +++ b/flink-filesystems/flink-s3-fs-native/README.md @@ -72,6 +72,8 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"), | s3.bulk-copy.enabled | true | Enable bulk copy operations | | s3.async.enabled | true | Enable async read/write with TransferManager | | s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) | +| s3.client.connection-timeout | 60s | Connection timeout for establishing connections to S3 | +| s3.client.socket-timeout | 300s | Socket (read) timeout for S3 operations. Increase for large state uploads to avoid "Read timed out" errors | ### Server-Side Encryption (SSE) diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java new file mode 100644 index 0000000000000..94abfa8f67ce6 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/BucketConfigProvider.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.Configuration; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; + +/** + * Provides bucket-specific S3 configurations from Flink config. Format: + * s3.bucket.<bucket-name>.<property> (e.g. s3.bucket.my-bucket.path-style-access: true) + */ +@Internal +public class BucketConfigProvider { + + private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class); + + private static final String BUCKET_CONFIG_PREFIX = "s3.bucket."; + + private static final String[] KNOWN_PROPERTIES = + new String[] { + "assume-role.external-id", + "assume-role.arn", + "sse.kms-key-id", + "path-style-access", + "sse.type", + "access-key", + "secret-key", + "endpoint", + "region" + }; + + private final Map bucketConfigs = new HashMap<>(); + + public BucketConfigProvider(Configuration flinkConfig) { + parseBucketConfigs(flinkConfig); + } + + private void parseBucketConfigs(Configuration flinkConfig) { + Map> bucketConfigMap = new HashMap<>(); + + for (String key : flinkConfig.keySet()) { + if (key.startsWith(BUCKET_CONFIG_PREFIX)) { + String suffix = key.substring(BUCKET_CONFIG_PREFIX.length()); + String value = flinkConfig.getString(key, null); + if (value == null) { + continue; + } + for (String prop : KNOWN_PROPERTIES) { + if (suffix.endsWith("." + prop)) { + String bucketName = + suffix.substring(0, suffix.length() - prop.length() - 1); + if (!bucketName.isEmpty()) { + bucketConfigMap + .computeIfAbsent(bucketName, k -> new HashMap<>()) + .put(prop, value); + } + break; + } + } + } + } + + for (Map.Entry> entry : bucketConfigMap.entrySet()) { + String bucketName = entry.getKey(); + Map configMap = entry.getValue(); + + S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName); + + if (configMap.containsKey("path-style-access")) { + builder.pathStyleAccess(Boolean.parseBoolean(configMap.get("path-style-access"))); + } + + if (configMap.containsKey("endpoint")) { + builder.endpoint(configMap.get("endpoint")); + } + + if (configMap.containsKey("region")) { + builder.region(configMap.get("region")); + } + + if (configMap.containsKey("access-key")) { + builder.accessKey(configMap.get("access-key")); + } + + if (configMap.containsKey("secret-key")) { + builder.secretKey(configMap.get("secret-key")); + } + + if (configMap.containsKey("sse.type")) { + builder.sseType(configMap.get("sse.type")); + } + + if (configMap.containsKey("sse.kms-key-id")) { + builder.sseKmsKeyId(configMap.get("sse.kms-key-id")); + } + + if (configMap.containsKey("assume-role.arn")) { + builder.assumeRoleArn(configMap.get("assume-role.arn")); + } + + if (configMap.containsKey("assume-role.external-id")) { + builder.assumeRoleExternalId(configMap.get("assume-role.external-id")); + } + + S3BucketConfig bucketConfig = builder.build(); + bucketConfigs.put(bucketName, bucketConfig); + + LOG.info("Registered bucket-specific configuration for bucket: {}", bucketName); + } + } + + /** Returns bucket config if defined, null otherwise. */ + @Nullable + public S3BucketConfig getBucketConfig(String bucketName) { + return bucketConfigs.get(bucketName); + } + + public boolean hasBucketConfig(String bucketName) { + return bucketConfigs.containsKey(bucketName); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java index 81e5075ba445b..11f9c2a6410f5 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystem.java @@ -51,6 +51,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; @@ -99,6 +100,7 @@ class NativeS3FileSystem extends FileSystem private static final long CLOSE_TIMEOUT_SECONDS = 60; private final S3ClientProvider clientProvider; + private final S3ClientProvider effectiveClientProvider; private final URI uri; private final String bucketName; @@ -115,6 +117,13 @@ class NativeS3FileSystem extends FileSystem private final int readBufferSize; private final AtomicBoolean closed = new AtomicBoolean(false); + @Nullable private final BucketConfigProvider bucketConfigProvider; + + private final Duration connectionTimeout; + private final Duration socketTimeout; + + private final S3ClientProviderCache clientProviderCache; + public NativeS3FileSystem( S3ClientProvider clientProvider, URI uri, @@ -126,9 +135,78 @@ public NativeS3FileSystem( @Nullable NativeS3BulkCopyHelper bulkCopyHelper, boolean useAsyncOperations, int readBufferSize) { + this( + clientProvider, + null, + NativeS3FileSystemFactory.CONNECTION_TIMEOUT.defaultValue(), + NativeS3FileSystemFactory.SOCKET_TIMEOUT.defaultValue(), + uri, + entropyInjectionKey, + entropyLength, + localTmpDir, + s3uploadPartSize, + maxConcurrentUploadsPerStream, + bulkCopyHelper, + false, + 0, + useAsyncOperations, + readBufferSize); + } + + public NativeS3FileSystem( + S3ClientProvider clientProvider, + @Nullable BucketConfigProvider bucketConfigProvider, + Duration connectionTimeout, + Duration socketTimeout, + URI uri, + @Nullable String entropyInjectionKey, + int entropyLength, + String localTmpDir, + long s3uploadPartSize, + int maxConcurrentUploadsPerStream, + @Nullable NativeS3BulkCopyHelper bulkCopyHelper, + boolean useAsyncOperations, + int readBufferSize) { + this( + clientProvider, + bucketConfigProvider, + connectionTimeout, + socketTimeout, + uri, + entropyInjectionKey, + entropyLength, + localTmpDir, + s3uploadPartSize, + maxConcurrentUploadsPerStream, + bulkCopyHelper, + false, + 0, + useAsyncOperations, + readBufferSize); + } + + public NativeS3FileSystem( + S3ClientProvider clientProvider, + @Nullable BucketConfigProvider bucketConfigProvider, + Duration connectionTimeout, + Duration socketTimeout, + URI uri, + @Nullable String entropyInjectionKey, + int entropyLength, + String localTmpDir, + long s3uploadPartSize, + int maxConcurrentUploadsPerStream, + @Nullable NativeS3BulkCopyHelper bulkCopyHelper, + boolean bulkCopyEnabled, + int bulkCopyMaxConcurrent, + boolean useAsyncOperations, + int readBufferSize) { this.clientProvider = clientProvider; this.uri = uri; this.bucketName = uri.getHost(); + if (bucketName == null || bucketName.isEmpty()) { + throw new IllegalArgumentException("S3 URI must contain a bucket name: " + uri); + } this.entropyInjectionKey = entropyInjectionKey; this.entropyLength = entropyLength; this.localTmpDir = localTmpDir; @@ -136,14 +214,26 @@ public NativeS3FileSystem( this.maxConcurrentUploadsPerStream = maxConcurrentUploadsPerStream; this.useAsyncOperations = useAsyncOperations; this.readBufferSize = readBufferSize; + this.bucketConfigProvider = bucketConfigProvider; + this.connectionTimeout = connectionTimeout; + this.socketTimeout = socketTimeout; + this.clientProviderCache = new S3ClientProviderCache(); + this.effectiveClientProvider = getClientProviderForBucket(bucketName); this.s3AccessHelper = new NativeS3AccessHelper( - clientProvider.getS3Client(), - clientProvider.getTransferManager(), + effectiveClientProvider.getS3Client(), + effectiveClientProvider.getTransferManager(), bucketName, useAsyncOperations, - clientProvider.getEncryptionConfig()); - this.bulkCopyHelper = bulkCopyHelper; + effectiveClientProvider.getEncryptionConfig()); + this.bulkCopyHelper = + bulkCopyHelper != null + ? bulkCopyHelper + : (bulkCopyEnabled + ? new NativeS3BulkCopyHelper( + effectiveClientProvider.getTransferManager(), + bulkCopyMaxConcurrent) + : null); if (entropyInjectionKey != null && entropyLength <= 0) { throw new IllegalArgumentException( @@ -177,7 +267,7 @@ public Path getHomeDirectory() { public FileStatus getFileStatus(Path path) throws IOException { checkNotClosed(); final String key = NativeS3AccessHelper.extractKey(path); - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); LOG.debug("Getting file status for s3://{}/{}", bucketName, key); @@ -276,7 +366,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l public FSDataInputStream open(Path path, int bufferSize) throws IOException { checkNotClosed(); final String key = NativeS3AccessHelper.extractKey(path); - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); final long fileSize = getFileStatus(path).getLen(); return new NativeS3InputStream(s3Client, bucketName, key, fileSize, bufferSize); } @@ -285,7 +375,7 @@ public FSDataInputStream open(Path path, int bufferSize) throws IOException { public FSDataInputStream open(Path path) throws IOException { checkNotClosed(); final String key = NativeS3AccessHelper.extractKey(path); - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); final long fileSize = getFileStatus(path).getLen(); return new NativeS3InputStream(s3Client, bucketName, key, fileSize, readBufferSize); } @@ -313,7 +403,7 @@ public FileStatus[] listStatus(Path path) throws IOException { key = key + "/"; } - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); final List results = new ArrayList<>(); String continuationToken = null; @@ -356,7 +446,7 @@ public FileStatus[] listStatus(Path path) throws IOException { public boolean delete(Path path, boolean recursive) throws IOException { checkNotClosed(); final String key = NativeS3AccessHelper.extractKey(path); - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); try { final FileStatus status = getFileStatus(path); @@ -428,11 +518,11 @@ public FSDataOutputStream create(Path path, WriteMode overwriteMode) throws IOEx final String key = NativeS3AccessHelper.extractKey(path); return new NativeS3OutputStream( - clientProvider.getS3Client(), + effectiveClientProvider.getS3Client(), bucketName, key, localTmpDir, - clientProvider.getEncryptionConfig()); + effectiveClientProvider.getEncryptionConfig()); } /** @@ -447,7 +537,7 @@ public boolean rename(Path src, Path dst) throws IOException { checkNotClosed(); final String srcKey = NativeS3AccessHelper.extractKey(src); final String dstKey = NativeS3AccessHelper.extractKey(dst); - final S3Client s3Client = clientProvider.getS3Client(); + final S3Client s3Client = effectiveClientProvider.getS3Client(); final FileStatus srcStatus = getFileStatus(src); if (srcStatus.isDir()) { @@ -532,31 +622,18 @@ public CompletableFuture closeAsync() { "Native S3 FileSystem closed for bucket: {}", bucketName)) .thenCompose( - ignored -> { - if (clientProvider != null) { - return clientProvider - .closeAsync() - .whenComplete( - (result, error) -> { - if (error != null) { - LOG.warn( - "Error closing S3 client provider", - error); - } else { - LOG.debug( - "S3 client provider closed"); - } - }); - } - return CompletableFuture.completedFuture(null); - }) + ignored -> + FutureUtils.waitForAll( + java.util.Arrays.asList( + clientProvider.closeAsync(), + clientProviderCache.closeAsync()))) + .thenApply(v -> (Void) null) .orTimeout(CLOSE_TIMEOUT_SECONDS, TimeUnit.SECONDS) .whenComplete( (result, error) -> { if (error != null) { LOG.error( - "FileSystem close timed out after {} seconds for bucket: {}", - CLOSE_TIMEOUT_SECONDS, + "Error closing FileSystem for bucket: {}", bucketName, error); } @@ -565,6 +642,51 @@ public CompletableFuture closeAsync() { return closeFuture; } + /** Returns bucket-specific provider if configured, otherwise default. */ + private S3ClientProvider getClientProviderForBucket(String targetBucket) { + if (bucketConfigProvider == null) { + return clientProvider; + } + + S3BucketConfig bucketConfig = bucketConfigProvider.getBucketConfig(targetBucket); + if (bucketConfig == null) { + return clientProvider; + } + + return clientProviderCache.getOrCreateProvider( + targetBucket, + bucketConfig, + clientProvider, + (config) -> createClientProviderForBucketConfig(config)); + } + + private S3ClientProvider createClientProviderForBucketConfig(S3BucketConfig bucketConfig) { + String accessKey = bucketConfig.getAccessKey(); + String secretKey = bucketConfig.getSecretKey(); + String region = bucketConfig.getRegion(); + String endpoint = bucketConfig.getEndpoint(); + boolean pathStyleAccess = bucketConfig.isPathStyleAccess(); + + S3EncryptionConfig encryptionConfig = + S3EncryptionConfig.fromConfig( + bucketConfig.getSseType(), bucketConfig.getSseKmsKeyId()); + + S3ClientProvider.Builder builder = + S3ClientProvider.builder() + .accessKey(accessKey) + .secretKey(secretKey) + .region(region) + .endpoint(endpoint) + .pathStyleAccess(pathStyleAccess) + .assumeRoleArn(bucketConfig.getAssumeRoleArn()) + .assumeRoleExternalId(bucketConfig.getAssumeRoleExternalId()) + .encryptionConfig(encryptionConfig); + + builder.connectionTimeout(connectionTimeout).socketTimeout(socketTimeout); + + return builder.build(); + } + /** * Verifies that the filesystem has not been closed. * diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java index f93b007822ef5..e556e9b4938da 100644 --- a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/NativeS3FileSystemFactory.java @@ -32,6 +32,7 @@ import java.io.IOException; import java.net.URI; +import java.time.Duration; public class NativeS3FileSystemFactory implements FileSystemFactory { @@ -191,6 +192,23 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "Uses the AWS SDK's default retry strategy (exponential backoff with jitter). " + "Set to 0 to disable retries."); + public static final ConfigOption CONNECTION_TIMEOUT = + ConfigOptions.key("s3.client.connection-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(60)) + .withDescription( + "Connection timeout for establishing connections to S3. " + + "Default: 60 seconds."); + + public static final ConfigOption SOCKET_TIMEOUT = + ConfigOptions.key("s3.client.socket-timeout") + .durationType() + .defaultValue(Duration.ofSeconds(300)) + .withDescription( + "Socket (read) timeout for S3 operations. Increase this for large state " + + "uploads (e.g. checkpoints/savepoints) to avoid 'Read timed out' errors. " + + "Default: 300 seconds (5 minutes)."); + public static final ConfigOption AWS_CREDENTIALS_PROVIDER = ConfigOptions.key("fs.s3.aws.credentials.provider") .stringType() @@ -206,6 +224,7 @@ public class NativeS3FileSystemFactory implements FileSystemFactory { + "static credentials (if configured) -> DefaultCredentialsProvider."); private Configuration flinkConfig; + private BucketConfigProvider bucketConfigProvider; @Override public String getScheme() { @@ -221,6 +240,7 @@ public int getPriority() { @Override public void configure(Configuration config) { this.flinkConfig = config; + this.bucketConfigProvider = new BucketConfigProvider(config); } @Override @@ -305,6 +325,8 @@ public FileSystem create(URI fsUri) throws IOException { .region(region) .endpoint(endpoint) .pathStyleAccess(pathStyleAccess) + .connectionTimeout(config.get(CONNECTION_TIMEOUT)) + .socketTimeout(config.get(SOCKET_TIMEOUT)) .assumeRoleArn(config.get(ASSUME_ROLE_ARN)) .assumeRoleExternalId(config.get(ASSUME_ROLE_EXTERNAL_ID)) .assumeRoleSessionName(config.get(ASSUME_ROLE_SESSION_NAME)) @@ -315,24 +337,33 @@ public FileSystem create(URI fsUri) throws IOException { .encryptionConfig(encryptionConfig) .build(); - NativeS3BulkCopyHelper bulkCopyHelper = null; - if (config.get(BULK_COPY_ENABLED)) { - bulkCopyHelper = - new NativeS3BulkCopyHelper( - clientProvider.getTransferManager(), - config.get(BULK_COPY_MAX_CONCURRENT)); + boolean bulkCopyEnabled = config.get(BULK_COPY_ENABLED); + int bulkCopyMaxConcurrent = config.get(BULK_COPY_MAX_CONCURRENT); + + try { + return new NativeS3FileSystem( + clientProvider, + bucketConfigProvider, + config.get(CONNECTION_TIMEOUT), + config.get(SOCKET_TIMEOUT), + fsUri, + entropyInjectionKey, + numEntropyChars, + localTmpDirectory, + s3minPartSize, + maxConcurrentUploads, + null, + bulkCopyEnabled, + bulkCopyMaxConcurrent, + useAsyncOperations, + readBufferSize); + } catch (Throwable t) { + try { + clientProvider.closeAsync().join(); + } catch (Throwable e) { + t.addSuppressed(e); + } + throw t; } - - return new NativeS3FileSystem( - clientProvider, - fsUri, - entropyInjectionKey, - numEntropyChars, - localTmpDirectory, - s3minPartSize, - maxConcurrentUploads, - bulkCopyHelper, - useAsyncOperations, - readBufferSize); } } diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java new file mode 100644 index 0000000000000..446fe1e6c506d --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3BucketConfig.java @@ -0,0 +1,243 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nullable; + +import java.util.Objects; + +/** Bucket-level S3 config (endpoint, path-style, credentials, SSE, assume-role). */ +@Internal +class S3BucketConfig { + + private final String bucketName; + + @Nullable private final String endpoint; + + private final boolean pathStyleAccess; + + @Nullable private final String region; + + @Nullable private final String accessKey; + + @Nullable private final String secretKey; + + @Nullable private final String sseType; + + @Nullable private final String sseKmsKeyId; + + @Nullable private final String assumeRoleArn; + + @Nullable private final String assumeRoleExternalId; + + private S3BucketConfig(Builder builder) { + this.bucketName = Objects.requireNonNull(builder.bucketName, "bucketName is required"); + this.endpoint = builder.endpoint; + this.pathStyleAccess = builder.pathStyleAccess; + this.region = builder.region; + this.accessKey = builder.accessKey; + this.secretKey = builder.secretKey; + this.sseType = builder.sseType; + this.sseKmsKeyId = builder.sseKmsKeyId; + this.assumeRoleArn = builder.assumeRoleArn; + this.assumeRoleExternalId = builder.assumeRoleExternalId; + } + + public String getBucketName() { + return bucketName; + } + + @Nullable + public String getEndpoint() { + return endpoint; + } + + public boolean isPathStyleAccess() { + return pathStyleAccess; + } + + @Nullable + public String getRegion() { + return region; + } + + @Nullable + public String getAccessKey() { + return accessKey; + } + + @Nullable + public String getSecretKey() { + return secretKey; + } + + @Nullable + public String getSseType() { + return sseType; + } + + @Nullable + public String getSseKmsKeyId() { + return sseKmsKeyId; + } + + @Nullable + public String getAssumeRoleArn() { + return assumeRoleArn; + } + + @Nullable + public String getAssumeRoleExternalId() { + return assumeRoleExternalId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof S3BucketConfig)) { + return false; + } + S3BucketConfig that = (S3BucketConfig) o; + return pathStyleAccess == that.pathStyleAccess + && bucketName.equals(that.bucketName) + && Objects.equals(endpoint, that.endpoint) + && Objects.equals(region, that.region) + && Objects.equals(accessKey, that.accessKey) + && Objects.equals(secretKey, that.secretKey) + && Objects.equals(sseType, that.sseType) + && Objects.equals(sseKmsKeyId, that.sseKmsKeyId) + && Objects.equals(assumeRoleArn, that.assumeRoleArn) + && Objects.equals(assumeRoleExternalId, that.assumeRoleExternalId); + } + + @Override + public int hashCode() { + return Objects.hash( + bucketName, + endpoint, + pathStyleAccess, + region, + accessKey, + secretKey, + sseType, + sseKmsKeyId, + assumeRoleArn, + assumeRoleExternalId); + } + + @Override + public String toString() { + return "S3BucketConfig{" + + "bucketName='" + + bucketName + + '\'' + + ", endpoint='" + + endpoint + + '\'' + + ", pathStyleAccess=" + + pathStyleAccess + + ", region='" + + region + + '\'' + + '}'; + } + + public static Builder builder(String bucketName) { + return new Builder(bucketName); + } + + public static class Builder { + private final String bucketName; + + @Nullable private String endpoint; + + private boolean pathStyleAccess = false; + + @Nullable private String region; + + @Nullable private String accessKey; + + @Nullable private String secretKey; + + @Nullable private String sseType; + + @Nullable private String sseKmsKeyId; + + @Nullable private String assumeRoleArn; + + @Nullable private String assumeRoleExternalId; + + public Builder(String bucketName) { + this.bucketName = bucketName; + } + + public Builder endpoint(@Nullable String endpoint) { + this.endpoint = endpoint; + return this; + } + + public Builder pathStyleAccess(boolean pathStyleAccess) { + this.pathStyleAccess = pathStyleAccess; + return this; + } + + public Builder region(@Nullable String region) { + this.region = region; + return this; + } + + public Builder accessKey(@Nullable String accessKey) { + this.accessKey = accessKey; + return this; + } + + public Builder secretKey(@Nullable String secretKey) { + this.secretKey = secretKey; + return this; + } + + public Builder sseType(@Nullable String sseType) { + this.sseType = sseType; + return this; + } + + public Builder sseKmsKeyId(@Nullable String sseKmsKeyId) { + this.sseKmsKeyId = sseKmsKeyId; + return this; + } + + public Builder assumeRoleArn(@Nullable String assumeRoleArn) { + this.assumeRoleArn = assumeRoleArn; + return this; + } + + public Builder assumeRoleExternalId(@Nullable String assumeRoleExternalId) { + this.assumeRoleExternalId = assumeRoleExternalId; + return this; + } + + public S3BucketConfig build() { + return new S3BucketConfig(this); + } + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProviderCache.java b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProviderCache.java new file mode 100644 index 0000000000000..37332864e90f0 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/main/java/org/apache/flink/fs/s3native/S3ClientProviderCache.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.concurrent.FutureUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + +import static java.util.concurrent.TimeUnit.SECONDS; + +/** Cache for S3 client providers per bucket config. Thread-safe. */ +@Internal +class S3ClientProviderCache { + + private static final Logger LOG = LoggerFactory.getLogger(S3ClientProviderCache.class); + + private static final long CLIENT_CACHE_CLOSE_TIMEOUT_SECONDS = 30; + + private final Map clientCache = new HashMap<>(); + private final Object cacheLock = new Object(); + private boolean closed = false; + + /** + * Returns cached or newly created provider for bucket; falls back to defaultProvider if no + * bucket config. + */ + public S3ClientProvider getOrCreateProvider( + String bucketName, + @Nullable S3BucketConfig bucketConfig, + S3ClientProvider defaultProvider, + S3ClientProviderFactory providerFactory) { + + if (bucketConfig == null) { + return defaultProvider; + } + + synchronized (cacheLock) { + if (closed) { + throw new IllegalStateException("S3ClientProviderCache is closed"); + } + + return clientCache.computeIfAbsent( + bucketName, + k -> { + LOG.debug("Creating S3ClientProvider for bucket: {}", bucketName); + return providerFactory.createProvider(bucketConfig); + }); + } + } + + /** Closes all cached providers. */ + public CompletableFuture closeAsync() { + CompletableFuture[] futures; + synchronized (cacheLock) { + closed = true; + futures = + clientCache.values().stream() + .map(S3ClientProvider::closeAsync) + .toArray(CompletableFuture[]::new); + } + + return FutureUtils.waitForAll(java.util.Arrays.asList(futures)) + .orTimeout(CLIENT_CACHE_CLOSE_TIMEOUT_SECONDS, SECONDS) + .thenRun(() -> {}) + .exceptionally( + ex -> { + LOG.error( + "S3ClientProviderCache close timed out after {} seconds", + CLIENT_CACHE_CLOSE_TIMEOUT_SECONDS, + ex); + return null; + }); + } + + @FunctionalInterface + interface S3ClientProviderFactory { + S3ClientProvider createProvider(S3BucketConfig bucketConfig); + } +} diff --git a/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java new file mode 100644 index 0000000000000..63a5adf5f8015 --- /dev/null +++ b/flink-filesystems/flink-s3-fs-native/src/test/java/org/apache/flink/fs/s3native/BucketConfigProviderTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.fs.s3native; + +import org.apache.flink.configuration.Configuration; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for bucket-level S3 configuration support. */ +public class BucketConfigProviderTest { + + @Test + public void testParseBucketConfigs() { + Configuration config = new Configuration(); + + config.setString("s3.bucket.checkpoint-bucket.path-style-access", "true"); + config.setString("s3.bucket.checkpoint-bucket.access-key", "checkpointAccessKey"); + config.setString("s3.bucket.checkpoint-bucket.secret-key", "checkpointSecretKey"); + + config.setString("s3.bucket.savepoint-bucket.path-style-access", "false"); + config.setString("s3.bucket.savepoint-bucket.endpoint", "https://s3.example.com"); + + BucketConfigProvider provider = new BucketConfigProvider(config); + + assertThat(provider.hasBucketConfig("checkpoint-bucket")).isTrue(); + assertThat(provider.hasBucketConfig("savepoint-bucket")).isTrue(); + assertThat(provider.hasBucketConfig("unknown-bucket")).isFalse(); + + S3BucketConfig checkpointBucketConfig = provider.getBucketConfig("checkpoint-bucket"); + assertThat(checkpointBucketConfig).isNotNull(); + assertThat(checkpointBucketConfig.getBucketName()).isEqualTo("checkpoint-bucket"); + assertThat(checkpointBucketConfig.isPathStyleAccess()).isTrue(); + assertThat(checkpointBucketConfig.getAccessKey()).isEqualTo("checkpointAccessKey"); + assertThat(checkpointBucketConfig.getSecretKey()).isEqualTo("checkpointSecretKey"); + + S3BucketConfig savepointBucketConfig = provider.getBucketConfig("savepoint-bucket"); + assertThat(savepointBucketConfig).isNotNull(); + assertThat(savepointBucketConfig.getBucketName()).isEqualTo("savepoint-bucket"); + assertThat(savepointBucketConfig.isPathStyleAccess()).isFalse(); + assertThat(savepointBucketConfig.getEndpoint()).isEqualTo("https://s3.example.com"); + } + + @Test + public void testBucketConfigWithEncryption() { + Configuration config = new Configuration(); + + config.setString("s3.bucket.encrypted-bucket.sse.type", "sse-kms"); + config.setString( + "s3.bucket.encrypted-bucket.sse.kms-key-id", + "arn:aws:kms:us-east-1:123456789:key/12345678"); + + BucketConfigProvider provider = new BucketConfigProvider(config); + + S3BucketConfig bucketConfig = provider.getBucketConfig("encrypted-bucket"); + assertThat(bucketConfig).isNotNull(); + assertThat(bucketConfig.getSseType()).isEqualTo("sse-kms"); + assertThat(bucketConfig.getSseKmsKeyId()) + .isEqualTo("arn:aws:kms:us-east-1:123456789:key/12345678"); + } + + @Test + public void testBucketConfigWithAssumeRole() { + Configuration config = new Configuration(); + + config.setString( + "s3.bucket.cross-account-bucket.assume-role.arn", + "arn:aws:iam::123456789012:role/S3AccessRole"); + config.setString( + "s3.bucket.cross-account-bucket.assume-role.external-id", "external-id-value"); + + BucketConfigProvider provider = new BucketConfigProvider(config); + + S3BucketConfig bucketConfig = provider.getBucketConfig("cross-account-bucket"); + assertThat(bucketConfig).isNotNull(); + assertThat(bucketConfig.getAssumeRoleArn()) + .isEqualTo("arn:aws:iam::123456789012:role/S3AccessRole"); + assertThat(bucketConfig.getAssumeRoleExternalId()).isEqualTo("external-id-value"); + } + + @Test + public void testBucketNameWithDots() { + Configuration config = new Configuration(); + config.setString("s3.bucket.my.bucket.name.path-style-access", "true"); + config.setString("s3.bucket.my.bucket.name.endpoint", "https://s3.example.com"); + + BucketConfigProvider provider = new BucketConfigProvider(config); + + S3BucketConfig bucketConfig = provider.getBucketConfig("my.bucket.name"); + assertThat(bucketConfig).isNotNull(); + assertThat(bucketConfig.getBucketName()).isEqualTo("my.bucket.name"); + assertThat(bucketConfig.isPathStyleAccess()).isTrue(); + assertThat(bucketConfig.getEndpoint()).isEqualTo("https://s3.example.com"); + } + + @Test + public void testS3BucketConfigBuilder() { + S3BucketConfig config = + S3BucketConfig.builder("test-bucket") + .pathStyleAccess(true) + .endpoint("https://s3-compatible.example.com") + .region("us-west-2") + .accessKey("access-key") + .secretKey("secret-key") + .sseType("sse-s3") + .assumeRoleArn("arn:aws:iam::123456789012:role/S3AccessRole") + .build(); + + assertThat(config.getBucketName()).isEqualTo("test-bucket"); + assertThat(config.isPathStyleAccess()).isTrue(); + assertThat(config.getEndpoint()).isEqualTo("https://s3-compatible.example.com"); + assertThat(config.getRegion()).isEqualTo("us-west-2"); + assertThat(config.getAccessKey()).isEqualTo("access-key"); + assertThat(config.getSecretKey()).isEqualTo("secret-key"); + assertThat(config.getSseType()).isEqualTo("sse-s3"); + assertThat(config.getAssumeRoleArn()) + .isEqualTo("arn:aws:iam::123456789012:role/S3AccessRole"); + } +}