Skip to content

Commit 7d75581

Browse files
committed
Add StorageCredential to ResolvingFileIO
1 parent 4b74d6e commit 7d75581

File tree

11 files changed

+323
-39
lines changed

11 files changed

+323
-39
lines changed

aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,8 @@ private boolean recoverObject(ObjectVersion version, String bucket) {
567567
@Override
568568
public void setCredentials(List<StorageCredential> credentials) {
569569
Preconditions.checkArgument(credentials != null, "Invalid storage credentials: null");
570-
this.storageCredentials = credentials;
570+
// copy credentials into a modifiable collection for Kryo serde
571+
this.storageCredentials = Lists.newArrayList(credentials);
571572
}
572573

573574
@Override
@@ -584,6 +585,6 @@ private Map<String, String> storageCredentialConfig() {
584585
Preconditions.checkState(
585586
s3Credentials.size() <= 1, "Invalid S3 Credentials: only one S3 credential should exist");
586587

587-
return s3Credentials.isEmpty() ? ImmutableMap.of() : s3Credentials.get(0).config();
588+
return s3Credentials.isEmpty() ? Map.of() : s3Credentials.get(0).config();
588589
}
589590
}

aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java

+72
Original file line numberDiff line numberDiff line change
@@ -459,6 +459,29 @@ public void testS3FileIOWithEmptyPropsKryoSerialization() throws IOException {
459459
assertThat(roundTripSerializedFileIO.properties()).isEqualTo(testS3FileIO.properties());
460460
}
461461

462+
@Test
463+
public void fileIOWithStorageCredentialsKryoSerialization() throws IOException {
464+
S3FileIO fileIO = new S3FileIO();
465+
fileIO.setCredentials(
466+
ImmutableList.of(StorageCredential.create("prefix", Map.of("key1", "val1"))));
467+
fileIO.initialize(Map.of());
468+
469+
assertThat(TestHelpers.KryoHelpers.roundTripSerialize(fileIO).credentials())
470+
.isEqualTo(fileIO.credentials());
471+
}
472+
473+
@Test
474+
public void fileIOWithStorageCredentialsJavaSerialization()
475+
throws IOException, ClassNotFoundException {
476+
S3FileIO fileIO = new S3FileIO();
477+
fileIO.setCredentials(
478+
ImmutableList.of(StorageCredential.create("prefix", Map.of("key1", "val1"))));
479+
fileIO.initialize(Map.of());
480+
481+
assertThat(TestHelpers.roundTripSerialize(fileIO).credentials())
482+
.isEqualTo(fileIO.credentials());
483+
}
484+
462485
@Test
463486
public void testS3FileIOJavaSerialization() throws IOException, ClassNotFoundException {
464487
FileIO testS3FileIO = new S3FileIO();
@@ -543,6 +566,55 @@ public void testInputFileWithManifest() throws IOException {
543566
verify(s3mock, never()).headObject(any(HeadObjectRequest.class));
544567
}
545568

569+
@Test
570+
public void resolvingFileIOLoadWithStorageCredentials()
571+
throws IOException, ClassNotFoundException {
572+
StorageCredential credential = StorageCredential.create("prefix", Map.of("key1", "val1"));
573+
List<StorageCredential> storageCredentials = ImmutableList.of(credential);
574+
ResolvingFileIO resolvingFileIO = new ResolvingFileIO();
575+
resolvingFileIO.setCredentials(storageCredentials);
576+
resolvingFileIO.initialize(ImmutableMap.of());
577+
578+
FileIO result =
579+
DynMethods.builder("io")
580+
.hiddenImpl(ResolvingFileIO.class, String.class)
581+
.build(resolvingFileIO)
582+
.invoke("s3://foo/bar");
583+
assertThat(result)
584+
.isInstanceOf(S3FileIO.class)
585+
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
586+
.extracting(S3FileIO::credentials)
587+
.isEqualTo(storageCredentials);
588+
589+
// make sure credentials are still present after kryo serde
590+
ResolvingFileIO io = TestHelpers.KryoHelpers.roundTripSerialize(resolvingFileIO);
591+
assertThat(io.credentials()).isEqualTo(storageCredentials);
592+
result =
593+
DynMethods.builder("io")
594+
.hiddenImpl(ResolvingFileIO.class, String.class)
595+
.build(io)
596+
.invoke("s3://foo/bar");
597+
assertThat(result)
598+
.isInstanceOf(S3FileIO.class)
599+
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
600+
.extracting(S3FileIO::credentials)
601+
.isEqualTo(storageCredentials);
602+
603+
// make sure credentials are still present after java serde
604+
io = TestHelpers.roundTripSerialize(resolvingFileIO);
605+
assertThat(io.credentials()).isEqualTo(storageCredentials);
606+
result =
607+
DynMethods.builder("io")
608+
.hiddenImpl(ResolvingFileIO.class, String.class)
609+
.build(io)
610+
.invoke("s3://foo/bar");
611+
assertThat(result)
612+
.isInstanceOf(S3FileIO.class)
613+
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIO.class))
614+
.extracting(S3FileIO::credentials)
615+
.isEqualTo(storageCredentials);
616+
}
617+
546618
@Test
547619
public void noStorageCredentialConfigured() {
548620
S3FileIO fileIO = new S3FileIO();

core/src/main/java/org/apache/iceberg/io/ResolvingFileIO.java

+24-3
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
3535
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
3636
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
37+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
3738
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
3839
import org.apache.iceberg.relocated.com.google.common.collect.Iterators;
3940
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
@@ -48,7 +49,8 @@
4849
* Delegate FileIO implementations must implement the {@link DelegateFileIO} mixin interface,
4950
* otherwise initialization will fail.
5051
*/
51-
public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
52+
public class ResolvingFileIO
53+
implements HadoopConfigurable, DelegateFileIO, SupportsStorageCredentials {
5254
private static final Logger LOG = LoggerFactory.getLogger(ResolvingFileIO.class);
5355
private static final int BATCH_SIZE = 100_000;
5456
private static final String FALLBACK_IMPL = "org.apache.iceberg.hadoop.HadoopFileIO";
@@ -71,6 +73,7 @@ public class ResolvingFileIO implements HadoopConfigurable, DelegateFileIO {
7173
private final transient StackTraceElement[] createStack;
7274
private SerializableMap<String, String> properties;
7375
private SerializableSupplier<Configuration> hadoopConf;
76+
private List<StorageCredential> storageCredentials = List.of();
7477

7578
/**
7679
* No-arg constructor to load the FileIO dynamically.
@@ -172,6 +175,11 @@ DelegateFileIO io(String location) {
172175
}
173176
}
174177

178+
if (io instanceof SupportsStorageCredentials
179+
&& !((SupportsStorageCredentials) io).credentials().equals(storageCredentials)) {
180+
((SupportsStorageCredentials) io).setCredentials(storageCredentials);
181+
}
182+
175183
return io;
176184
}
177185

@@ -186,7 +194,7 @@ DelegateFileIO io(String location) {
186194
// ResolvingFileIO is keeping track of the creation stacktrace, so no need to do the
187195
// same in S3FileIO.
188196
props.put("init-creation-stacktrace", "false");
189-
fileIO = CatalogUtil.loadFileIO(key, props, conf);
197+
fileIO = CatalogUtil.loadFileIO(key, props, conf, storageCredentials);
190198
} catch (IllegalArgumentException e) {
191199
if (key.equals(FALLBACK_IMPL)) {
192200
// no implementation to fall back to, throw the exception
@@ -199,7 +207,8 @@ DelegateFileIO io(String location) {
199207
FALLBACK_IMPL,
200208
e);
201209
try {
202-
fileIO = CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf);
210+
fileIO =
211+
CatalogUtil.loadFileIO(FALLBACK_IMPL, properties, conf, storageCredentials);
203212
} catch (IllegalArgumentException suppressed) {
204213
LOG.warn(
205214
"Failed to load FileIO implementation: {} (fallback)",
@@ -268,4 +277,16 @@ public Iterable<FileInfo> listPrefix(String prefix) {
268277
public void deletePrefix(String prefix) {
269278
io(prefix).deletePrefix(prefix);
270279
}
280+
281+
@Override
282+
public void setCredentials(List<StorageCredential> credentials) {
283+
Preconditions.checkArgument(credentials != null, "Invalid storage credentials: null");
284+
// copy credentials into a modifiable collection for Kryo serde
285+
this.storageCredentials = Lists.newArrayList(credentials);
286+
}
287+
288+
@Override
289+
public List<StorageCredential> credentials() {
290+
return ImmutableList.copyOf(storageCredentials);
291+
}
271292
}

core/src/main/java/org/apache/iceberg/io/StorageCredential.java

+7-1
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,14 @@
1818
*/
1919
package org.apache.iceberg.io;
2020

21+
import java.io.Serializable;
2122
import java.util.Map;
2223
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2324
import org.immutables.value.Value;
2425

2526
@Value.Immutable
26-
public interface StorageCredential {
27+
public interface StorageCredential extends Serializable {
28+
2729
String prefix();
2830

2931
Map<String, String> config();
@@ -33,4 +35,8 @@ default void validate() {
3335
Preconditions.checkArgument(!prefix().isEmpty(), "Invalid prefix: must be non-empty");
3436
Preconditions.checkArgument(!config().isEmpty(), "Invalid config: must be non-empty");
3537
}
38+
39+
static StorageCredential create(String prefix, Map<String, String> config) {
40+
return ImmutableStorageCredential.builder().prefix(prefix).config(config).build();
41+
}
3642
}

core/src/main/java/org/apache/iceberg/io/SupportsStorageCredentials.java

+4
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@
2020

2121
import java.util.List;
2222

23+
/**
24+
* This interface is intended as an extension for {@link FileIO} implementations to be able to
25+
* provide and retrieve storage credentials
26+
*/
2327
public interface SupportsStorageCredentials {
2428

2529
void setCredentials(List<StorageCredential> credentials);

core/src/main/java/org/apache/iceberg/rest/RESTSessionCatalog.java

+6-10
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.apache.iceberg.io.CloseableGroup;
5656
import org.apache.iceberg.io.FileIO;
5757
import org.apache.iceberg.io.FileIOTracker;
58-
import org.apache.iceberg.io.ImmutableStorageCredential;
5958
import org.apache.iceberg.io.StorageCredential;
6059
import org.apache.iceberg.metrics.MetricsReporter;
6160
import org.apache.iceberg.metrics.MetricsReporters;
@@ -975,16 +974,13 @@ private FileIO newFileIO(
975974
return ioBuilder.apply(context, properties);
976975
} else {
977976
String ioImpl = properties.getOrDefault(CatalogProperties.FILE_IO_IMPL, DEFAULT_FILE_IO_IMPL);
978-
List<StorageCredential> credentials =
977+
return CatalogUtil.loadFileIO(
978+
ioImpl,
979+
properties,
980+
conf,
979981
storageCredentials.stream()
980-
.map(
981-
c ->
982-
ImmutableStorageCredential.builder()
983-
.prefix(c.prefix())
984-
.config(c.config())
985-
.build())
986-
.collect(Collectors.toList());
987-
return CatalogUtil.loadFileIO(ioImpl, properties, conf, credentials);
982+
.map(c -> StorageCredential.create(c.prefix(), c.config()))
983+
.collect(Collectors.toList()));
988984
}
989985
}
990986

core/src/test/java/org/apache/iceberg/TestCatalogUtil.java

+14-21
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,12 @@
3030
import org.apache.iceberg.catalog.TableIdentifier;
3131
import org.apache.iceberg.hadoop.HadoopFileIO;
3232
import org.apache.iceberg.io.FileIO;
33-
import org.apache.iceberg.io.ImmutableStorageCredential;
3433
import org.apache.iceberg.io.InputFile;
3534
import org.apache.iceberg.io.OutputFile;
3635
import org.apache.iceberg.io.StorageCredential;
3736
import org.apache.iceberg.io.SupportsStorageCredentials;
3837
import org.apache.iceberg.metrics.MetricsReport;
3938
import org.apache.iceberg.metrics.MetricsReporter;
40-
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
4139
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4240
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4341
import org.junit.jupiter.api.Test;
@@ -164,26 +162,21 @@ public void loadCustomFileIO_configurable() {
164162
@Test
165163
public void loadCustomFileIOSupportingStorageCredentials() {
166164
StorageCredential gcsCredential =
167-
ImmutableStorageCredential.builder()
168-
.prefix("gs://custom-uri")
169-
.config(
170-
ImmutableMap.of(
171-
"gcs.oauth2.token", "gcsToken", "gcs.oauth2.token-expires-at", "1000"))
172-
.build();
165+
StorageCredential.create(
166+
"gs://custom-uri",
167+
Map.of("gcs.oauth2.token", "gcsToken", "gcs.oauth2.token-expires-at", "1000"));
173168
StorageCredential s3Credential =
174-
ImmutableStorageCredential.builder()
175-
.prefix("s3://custom-uri")
176-
.config(
177-
ImmutableMap.of(
178-
"s3.access-key-id",
179-
"keyId",
180-
"s3.secret-access-key",
181-
"accessKey",
182-
"s3.session-token",
183-
"sessionToken"))
184-
.build();
185-
186-
List<StorageCredential> storageCredentials = ImmutableList.of(gcsCredential, s3Credential);
169+
StorageCredential.create(
170+
"s3://custom-uri",
171+
Map.of(
172+
"s3.access-key-id",
173+
"keyId",
174+
"s3.secret-access-key",
175+
"accessKey",
176+
"s3.session-token",
177+
"sessionToken"));
178+
179+
List<StorageCredential> storageCredentials = List.of(gcsCredential, s3Credential);
187180
FileIO fileIO =
188181
CatalogUtil.loadFileIO(
189182
TestFileIOWithStorageCredentials.class.getName(),

core/src/test/java/org/apache/iceberg/io/TestResolvingIO.java

+38
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,17 @@
2929

3030
import java.io.IOException;
3131
import java.util.List;
32+
import java.util.Map;
3233
import java.util.UUID;
3334
import java.util.stream.Collectors;
3435
import java.util.stream.IntStream;
3536
import org.apache.hadoop.conf.Configuration;
3637
import org.apache.hadoop.fs.FileSystem;
3738
import org.apache.hadoop.fs.Path;
39+
import org.apache.iceberg.CatalogUtil;
3840
import org.apache.iceberg.TestHelpers;
3941
import org.apache.iceberg.hadoop.HadoopFileIO;
42+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
4043
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4144
import org.junit.jupiter.api.Test;
4245
import org.junit.jupiter.api.io.TempDir;
@@ -182,4 +185,39 @@ public void delegateFileIOWithAndWithoutMixins() {
182185
// being null is ok here as long as the code doesn't throw an exception
183186
assertThat(resolvingFileIO.newInputFile("/file")).isNull();
184187
}
188+
189+
@Test
190+
public void resolvingFileIOWithStorageCredentialsKryoSerialization() throws IOException {
191+
StorageCredential credential = StorageCredential.create("prefix", Map.of("key1", "val1"));
192+
List<StorageCredential> storageCredentials = ImmutableList.of(credential);
193+
ResolvingFileIO resolvingFileIO =
194+
(ResolvingFileIO)
195+
CatalogUtil.loadFileIO(
196+
ResolvingFileIO.class.getName(),
197+
ImmutableMap.of(),
198+
new Configuration(),
199+
storageCredentials);
200+
201+
assertThat(TestHelpers.KryoHelpers.roundTripSerialize(resolvingFileIO).credentials())
202+
.isEqualTo(storageCredentials)
203+
.isEqualTo(resolvingFileIO.credentials());
204+
}
205+
206+
@Test
207+
public void resolvingFileIOWithStorageCredentialsJavaSerialization()
208+
throws IOException, ClassNotFoundException {
209+
StorageCredential credential = StorageCredential.create("prefix", Map.of("key1", "val1"));
210+
List<StorageCredential> storageCredentials = ImmutableList.of(credential);
211+
ResolvingFileIO resolvingFileIO =
212+
(ResolvingFileIO)
213+
CatalogUtil.loadFileIO(
214+
ResolvingFileIO.class.getName(),
215+
ImmutableMap.of(),
216+
new Configuration(),
217+
storageCredentials);
218+
219+
assertThat(TestHelpers.roundTripSerialize(resolvingFileIO).credentials())
220+
.isEqualTo(storageCredentials)
221+
.isEqualTo(resolvingFileIO.credentials());
222+
}
185223
}

0 commit comments

Comments
 (0)