Skip to content

Commit e153903

Browse files
committed
Core: Pass storage credentials from LoadTableResponse to FileIO
1 parent 2e1577e commit e153903

File tree

10 files changed

+454
-27
lines changed

10 files changed

+454
-27
lines changed

aws/src/main/java/org/apache/iceberg/aws/s3/S3FileIO.java

+41-2
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,14 @@
3939
import org.apache.iceberg.io.FileInfo;
4040
import org.apache.iceberg.io.InputFile;
4141
import org.apache.iceberg.io.OutputFile;
42+
import org.apache.iceberg.io.StorageCredential;
4243
import org.apache.iceberg.io.SupportsRecoveryOperations;
44+
import org.apache.iceberg.io.SupportsStorageCredentials;
4345
import org.apache.iceberg.metrics.MetricsContext;
4446
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
47+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
48+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
49+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
4550
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4651
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
4752
import org.apache.iceberg.relocated.com.google.common.collect.Multimaps;
@@ -80,7 +85,11 @@
8085
* schemes s3a, s3n, https are also treated as s3 file paths. Using this FileIO with other schemes
8186
* will result in {@link org.apache.iceberg.exceptions.ValidationException}.
8287
*/
83-
public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRecoveryOperations {
88+
public class S3FileIO
89+
implements CredentialSupplier,
90+
DelegateFileIO,
91+
SupportsRecoveryOperations,
92+
SupportsStorageCredentials {
8493
private static final Logger LOG = LoggerFactory.getLogger(S3FileIO.class);
8594
private static final String DEFAULT_METRICS_IMPL =
8695
"org.apache.iceberg.hadoop.HadoopMetricsContext";
@@ -96,6 +105,7 @@ public class S3FileIO implements CredentialSupplier, DelegateFileIO, SupportsRec
96105
private MetricsContext metrics = MetricsContext.nullMetrics();
97106
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);
98107
private transient StackTraceElement[] createStack;
108+
private List<? extends StorageCredential> storageCredentials = ImmutableList.of();
99109

100110
/**
101111
* No-arg constructor to load the FileIO dynamically.
@@ -422,7 +432,13 @@ public String getCredential() {
422432
@Override
423433
public void initialize(Map<String, String> props) {
424434
this.properties = SerializableMap.copyOf(props);
425-
this.s3FileIOProperties = new S3FileIOProperties(properties);
435+
Map<String, String> propertiesWithCredentials =
436+
ImmutableMap.<String, String>builder()
437+
.putAll(properties)
438+
.putAll(storageCredentialConfig())
439+
.buildKeepingLast();
440+
441+
this.s3FileIOProperties = new S3FileIOProperties(propertiesWithCredentials);
426442
this.createStack =
427443
PropertyUtil.propertyAsBoolean(props, "init-creation-stacktrace", true)
428444
? Thread.currentThread().getStackTrace()
@@ -547,4 +563,27 @@ private boolean recoverObject(ObjectVersion version, String bucket) {
547563

548564
return true;
549565
}
566+
567+
@Override
568+
public void setCredentials(List<? extends StorageCredential> credentials) {
569+
Preconditions.checkArgument(credentials != null, "Invalid storage credentials: null");
570+
this.storageCredentials = credentials;
571+
}
572+
573+
@Override
574+
public List<? extends StorageCredential> credentials() {
575+
return ImmutableList.copyOf(storageCredentials);
576+
}
577+
578+
private Map<String, String> storageCredentialConfig() {
579+
List<StorageCredential> s3Credentials =
580+
storageCredentials.stream()
581+
.filter(c -> c.prefix().startsWith("s3"))
582+
.collect(Collectors.toList());
583+
584+
Preconditions.checkState(
585+
s3Credentials.size() <= 1, "Invalid S3 Credentials: only one S3 credential should exist");
586+
587+
return s3Credentials.isEmpty() ? ImmutableMap.of() : s3Credentials.get(0).config();
588+
}
550589
}

aws/src/test/java/org/apache/iceberg/aws/s3/TestS3FileIO.java

+113
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,11 @@
6666
import org.apache.iceberg.io.FileIOParser;
6767
import org.apache.iceberg.io.FileInfo;
6868
import org.apache.iceberg.io.IOUtil;
69+
import org.apache.iceberg.io.ImmutableStorageCredential;
6970
import org.apache.iceberg.io.InputFile;
7071
import org.apache.iceberg.io.OutputFile;
7172
import org.apache.iceberg.io.ResolvingFileIO;
73+
import org.apache.iceberg.io.StorageCredential;
7274
import org.apache.iceberg.jdbc.JdbcCatalog;
7375
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
7476
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -77,6 +79,8 @@
7779
import org.apache.iceberg.relocated.com.google.common.collect.Streams;
7880
import org.apache.iceberg.types.Types;
7981
import org.apache.iceberg.util.SerializableSupplier;
82+
import org.assertj.core.api.InstanceOfAssertFactories;
83+
import org.assertj.core.api.ObjectAssert;
8084
import org.junit.jupiter.api.AfterEach;
8185
import org.junit.jupiter.api.BeforeEach;
8286
import org.junit.jupiter.api.Disabled;
@@ -539,6 +543,115 @@ public void testInputFileWithManifest() throws IOException {
539543
verify(s3mock, never()).headObject(any(HeadObjectRequest.class));
540544
}
541545

546+
@Test
547+
public void noStorageCredentialConfigured() {
548+
S3FileIO fileIO = new S3FileIO();
549+
fileIO.setCredentials(ImmutableList.of());
550+
fileIO.initialize(
551+
ImmutableMap.of(
552+
"s3.access-key-id",
553+
"keyIdFromProperties",
554+
"s3.secret-access-key",
555+
"accessKeyFromProperties",
556+
"s3.session-token",
557+
"sessionTokenFromProperties"));
558+
559+
ObjectAssert<S3FileIOProperties> s3FileIOProperties =
560+
assertThat(fileIO)
561+
.extracting("s3FileIOProperties")
562+
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIOProperties.class));
563+
s3FileIOProperties.extracting(S3FileIOProperties::accessKeyId).isEqualTo("keyIdFromProperties");
564+
s3FileIOProperties
565+
.extracting(S3FileIOProperties::secretAccessKey)
566+
.isEqualTo("accessKeyFromProperties");
567+
s3FileIOProperties
568+
.extracting(S3FileIOProperties::sessionToken)
569+
.isEqualTo("sessionTokenFromProperties");
570+
}
571+
572+
@Test
573+
public void singleStorageCredentialConfigured() {
574+
StorageCredential s3Credential =
575+
ImmutableStorageCredential.builder()
576+
.prefix("s3://custom-uri")
577+
.config(
578+
ImmutableMap.of(
579+
"s3.access-key-id",
580+
"keyIdFromCredential",
581+
"s3.secret-access-key",
582+
"accessKeyFromCredential",
583+
"s3.session-token",
584+
"sessionTokenFromCredential"))
585+
.build();
586+
587+
S3FileIO fileIO = new S3FileIO();
588+
fileIO.setCredentials(ImmutableList.of(s3Credential));
589+
fileIO.initialize(
590+
ImmutableMap.of(
591+
"s3.access-key-id",
592+
"keyIdFromProperties",
593+
"s3.secret-access-key",
594+
"accessKeyFromProperties",
595+
"s3.session-token",
596+
"sessionTokenFromProperties"));
597+
598+
ObjectAssert<S3FileIOProperties> s3FileIOProperties =
599+
assertThat(fileIO)
600+
.extracting("s3FileIOProperties")
601+
.asInstanceOf(InstanceOfAssertFactories.type(S3FileIOProperties.class));
602+
s3FileIOProperties.extracting(S3FileIOProperties::accessKeyId).isEqualTo("keyIdFromCredential");
603+
s3FileIOProperties
604+
.extracting(S3FileIOProperties::secretAccessKey)
605+
.isEqualTo("accessKeyFromCredential");
606+
s3FileIOProperties
607+
.extracting(S3FileIOProperties::sessionToken)
608+
.isEqualTo("sessionTokenFromCredential");
609+
}
610+
611+
@Test
612+
public void multipleStorageCredentialsConfigured() {
613+
StorageCredential s3Credential1 =
614+
ImmutableStorageCredential.builder()
615+
.prefix("s3://custom-uri/1")
616+
.config(
617+
ImmutableMap.of(
618+
"s3.access-key-id",
619+
"keyIdFromCredential1",
620+
"s3.secret-access-key",
621+
"accessKeyFromCredential1",
622+
"s3.session-token",
623+
"sessionTokenFromCredential1"))
624+
.build();
625+
626+
StorageCredential s3Credential2 =
627+
ImmutableStorageCredential.builder()
628+
.prefix("s3://custom-uri/2")
629+
.config(
630+
ImmutableMap.of(
631+
"s3.access-key-id",
632+
"keyIdFromCredential2",
633+
"s3.secret-access-key",
634+
"accessKeyFromCredential2",
635+
"s3.session-token",
636+
"sessionTokenFromCredential2"))
637+
.build();
638+
639+
S3FileIO fileIO = new S3FileIO();
640+
fileIO.setCredentials(ImmutableList.of(s3Credential1, s3Credential2));
641+
assertThatThrownBy(
642+
() ->
643+
fileIO.initialize(
644+
ImmutableMap.of(
645+
"s3.access-key-id",
646+
"keyIdFromProperties",
647+
"s3.secret-access-key",
648+
"accessKeyFromProperties",
649+
"s3.session-token",
650+
"sessionTokenFromProperties")))
651+
.isInstanceOf(IllegalStateException.class)
652+
.hasMessage("Invalid S3 Credentials: only one S3 credential should exist");
653+
}
654+
542655
private void createRandomObjects(String prefix, int count) {
543656
S3URI s3URI = new S3URI(prefix);
544657

core/src/main/java/org/apache/iceberg/CatalogUtil.java

+33
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@
3535
import org.apache.iceberg.exceptions.ValidationException;
3636
import org.apache.iceberg.hadoop.Configurable;
3737
import org.apache.iceberg.io.FileIO;
38+
import org.apache.iceberg.io.StorageCredential;
3839
import org.apache.iceberg.io.SupportsBulkOperations;
40+
import org.apache.iceberg.io.SupportsStorageCredentials;
3941
import org.apache.iceberg.metrics.LoggingMetricsReporter;
4042
import org.apache.iceberg.metrics.MetricsReporter;
4143
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
4244
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
45+
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
4346
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
4447
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
4548
import org.apache.iceberg.relocated.com.google.common.collect.MapMaker;
@@ -343,6 +346,33 @@ public static Catalog buildIcebergCatalog(String name, Map<String, String> optio
343346
* loaded class cannot be cast to the given interface type
344347
*/
345348
public static FileIO loadFileIO(String impl, Map<String, String> properties, Object hadoopConf) {
349+
return loadFileIO(impl, properties, hadoopConf, ImmutableList.of());
350+
}
351+
352+
/**
353+
* Load a custom {@link FileIO} implementation.
354+
*
355+
* <p>The implementation must have a no-arg constructor. If the class implements Configurable, a
356+
* Hadoop config will be passed using Configurable.setConf. If the class implements {@link
357+
* SupportsStorageCredentials}, the storage credentials will be passed using {@link
358+
* SupportsStorageCredentials#setCredentials(List)}. {@link FileIO#initialize(Map properties)} is
359+
* called to complete the initialization.
360+
*
361+
* @param impl full class name of a custom FileIO implementation
362+
* @param properties used to initialize the FileIO implementation
363+
* @param hadoopConf a hadoop Configuration
364+
* @param storageCredentials the storage credentials to configure if the FileIO implementation
365+
* implements {@link SupportsStorageCredentials}
366+
* @return FileIO class
367+
* @throws IllegalArgumentException if class path not found or right constructor not found or the
368+
* loaded class cannot be cast to the given interface type
369+
*/
370+
@SuppressWarnings("unchecked")
371+
public static FileIO loadFileIO(
372+
String impl,
373+
Map<String, String> properties,
374+
Object hadoopConf,
375+
List<? extends StorageCredential> storageCredentials) {
346376
LOG.info("Loading custom FileIO implementation: {}", impl);
347377
DynConstructors.Ctor<FileIO> ctor;
348378
try {
@@ -365,6 +395,9 @@ public static FileIO loadFileIO(String impl, Map<String, String> properties, Obj
365395
}
366396

367397
configureHadoopConf(fileIO, hadoopConf);
398+
if (fileIO instanceof SupportsStorageCredentials) {
399+
((SupportsStorageCredentials) fileIO).setCredentials(storageCredentials);
400+
}
368401

369402
fileIO.initialize(properties);
370403
return fileIO;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.io;
20+
21+
import java.util.Map;
22+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
23+
import org.immutables.value.Value;
24+
25+
@Value.Immutable
26+
public interface StorageCredential {
27+
String prefix();
28+
29+
Map<String, String> config();
30+
31+
@Value.Check
32+
default void validate() {
33+
Preconditions.checkArgument(!prefix().isEmpty(), "Invalid prefix: must be non-empty");
34+
Preconditions.checkArgument(!config().isEmpty(), "Invalid config: must be non-empty");
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.iceberg.io;
20+
21+
import java.util.List;
22+
23+
public interface SupportsStorageCredentials {
24+
25+
void setCredentials(List<? extends StorageCredential> credentials);
26+
27+
List<? extends StorageCredential> credentials();
28+
}

0 commit comments

Comments
 (0)