Skip to content

Commit 9445fe3

Browse files
committed
[FLINK-39166] Add bucket-level configuration support to Native S3 FileSystem
1 parent 2f6743b commit 9445fe3

File tree

7 files changed

+832
-50
lines changed

7 files changed

+832
-50
lines changed

flink-filesystems/flink-s3-fs-native/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ input.sinkTo(FileSink.forRowFormat(new Path("s3://my-bucket/output"),
7272
| s3.bulk-copy.enabled | true | Enable bulk copy operations |
7373
| s3.async.enabled | true | Enable async read/write with TransferManager |
7474
| s3.read.buffer.size | 262144 (256KB) | Read buffer size per stream (64KB - 4MB) |
75+
| s3.client.connection-timeout | 60s | Connection timeout for establishing connections to S3 |
76+
| s3.client.socket-timeout | 300s | Socket (read) timeout for S3 operations. Increase for large state uploads to avoid "Read timed out" errors |
7577

7678
### Server-Side Encryption (SSE)
7779

Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.fs.s3native;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.configuration.Configuration;
23+
24+
import org.slf4j.Logger;
25+
import org.slf4j.LoggerFactory;
26+
27+
import javax.annotation.Nullable;
28+
29+
import java.util.HashMap;
30+
import java.util.Map;
31+
32+
/**
33+
* Provides bucket-specific S3 configurations from Flink config. Format:
34+
* s3.bucket.<bucket-name>.<property> (e.g. s3.bucket.my-bucket.path-style-access: true)
35+
*/
36+
@Internal
37+
public class BucketConfigProvider {
38+
39+
private static final Logger LOG = LoggerFactory.getLogger(BucketConfigProvider.class);
40+
41+
private static final String BUCKET_CONFIG_PREFIX = "s3.bucket.";
42+
43+
private static final String[] KNOWN_PROPERTIES =
44+
new String[] {
45+
"assume-role.external-id",
46+
"assume-role.arn",
47+
"sse.kms-key-id",
48+
"path-style-access",
49+
"sse.type",
50+
"access-key",
51+
"secret-key",
52+
"endpoint",
53+
"region"
54+
};
55+
56+
private final Map<String, S3BucketConfig> bucketConfigs = new HashMap<>();
57+
58+
public BucketConfigProvider(Configuration flinkConfig) {
59+
parseBucketConfigs(flinkConfig);
60+
}
61+
62+
private void parseBucketConfigs(Configuration flinkConfig) {
63+
Map<String, Map<String, String>> bucketConfigMap = new HashMap<>();
64+
65+
for (String key : flinkConfig.keySet()) {
66+
if (key.startsWith(BUCKET_CONFIG_PREFIX)) {
67+
String suffix = key.substring(BUCKET_CONFIG_PREFIX.length());
68+
String value = flinkConfig.getString(key, null);
69+
if (value == null) {
70+
continue;
71+
}
72+
for (String prop : KNOWN_PROPERTIES) {
73+
if (suffix.endsWith("." + prop)) {
74+
String bucketName =
75+
suffix.substring(0, suffix.length() - prop.length() - 1);
76+
if (!bucketName.isEmpty()) {
77+
bucketConfigMap
78+
.computeIfAbsent(bucketName, k -> new HashMap<>())
79+
.put(prop, value);
80+
}
81+
break;
82+
}
83+
}
84+
}
85+
}
86+
87+
for (Map.Entry<String, Map<String, String>> entry : bucketConfigMap.entrySet()) {
88+
String bucketName = entry.getKey();
89+
Map<String, String> configMap = entry.getValue();
90+
91+
S3BucketConfig.Builder builder = S3BucketConfig.builder(bucketName);
92+
93+
if (configMap.containsKey("path-style-access")) {
94+
builder.pathStyleAccess(Boolean.parseBoolean(configMap.get("path-style-access")));
95+
}
96+
97+
if (configMap.containsKey("endpoint")) {
98+
builder.endpoint(configMap.get("endpoint"));
99+
}
100+
101+
if (configMap.containsKey("region")) {
102+
builder.region(configMap.get("region"));
103+
}
104+
105+
if (configMap.containsKey("access-key")) {
106+
builder.accessKey(configMap.get("access-key"));
107+
}
108+
109+
if (configMap.containsKey("secret-key")) {
110+
builder.secretKey(configMap.get("secret-key"));
111+
}
112+
113+
if (configMap.containsKey("sse.type")) {
114+
builder.sseType(configMap.get("sse.type"));
115+
}
116+
117+
if (configMap.containsKey("sse.kms-key-id")) {
118+
builder.sseKmsKeyId(configMap.get("sse.kms-key-id"));
119+
}
120+
121+
if (configMap.containsKey("assume-role.arn")) {
122+
builder.assumeRoleArn(configMap.get("assume-role.arn"));
123+
}
124+
125+
if (configMap.containsKey("assume-role.external-id")) {
126+
builder.assumeRoleExternalId(configMap.get("assume-role.external-id"));
127+
}
128+
129+
S3BucketConfig bucketConfig = builder.build();
130+
bucketConfigs.put(bucketName, bucketConfig);
131+
132+
LOG.info("Registered bucket-specific configuration for bucket: {}", bucketName);
133+
}
134+
}
135+
136+
/** Returns bucket config if defined, null otherwise. */
137+
@Nullable
138+
public S3BucketConfig getBucketConfig(String bucketName) {
139+
return bucketConfigs.get(bucketName);
140+
}
141+
142+
public boolean hasBucketConfig(String bucketName) {
143+
return bucketConfigs.containsKey(bucketName);
144+
}
145+
}

0 commit comments

Comments
 (0)