Skip to content

Commit d806ef6

Browse files
committed
try to incorporate changes from #4335 and add IcebergRESTVendedS3Provider.
1 parent cc2648a commit d806ef6

18 files changed

Lines changed: 1008 additions & 97 deletions

File tree

common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcher.java

Lines changed: 77 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
package org.apache.comet.cloud.s3;
2121

2222
import java.lang.reflect.InvocationTargetException;
23+
import java.util.Collections;
24+
import java.util.Map;
25+
import java.util.Objects;
2326
import java.util.concurrent.ConcurrentHashMap;
2427

2528
import org.slf4j.Logger;
@@ -28,51 +31,87 @@
2831
/**
2932
* JNI entry point invoked from native code to resolve a {@link CometS3CredentialProvider}.
3033
*
31-
* <p>Native code passes the FQCN named in {@code fs.s3a.comet.credential.provider.class} (or its
32-
* per-bucket / Iceberg-namespaced variants). Each named class is instantiated once via reflection
33-
* and cached, so a single executor JVM can serve multiple providers (e.g. one per bucket).
34+
* <p>Native code names a vendor class via the activation knob ({@code
35+
* fs.s3a.comet.credential.provider.class} for the Parquet path, {@code
36+
* s3.comet.credential.provider.class} on a Spark catalog property for the Iceberg path) and a
37+
* {@code dispatchKey} that scopes the instance: catalog name on the Iceberg path, bucket name on
38+
* the Parquet path. Each {@code (FQCN, dispatchKey)} key gets its own instance, so two catalogs
39+
* sharing one provider class get isolated state.
3440
*/
3541
public final class CometS3CredentialDispatcher {
3642

3743
private static final Logger LOG = LoggerFactory.getLogger(CometS3CredentialDispatcher.class);
3844

39-
private static final ConcurrentHashMap<String, CometS3CredentialProvider> INSTANCES =
45+
private static final ConcurrentHashMap<InstanceKey, CometS3CredentialProvider> INSTANCES =
4046
new ConcurrentHashMap<>();
4147
private static final CometS3AccessMode[] MODES = CometS3AccessMode.values();
4248

4349
private CometS3CredentialDispatcher() {}
4450

4551
/**
46-
* Invoked by native code. {@code mode} is the {@link CometS3AccessMode} ordinal.
47-
*
48-
* @param providerClassName FQCN configured in {@code fs.s3a.comet.credential.provider.class}
52+
* Reflects and initializes the named provider for {@code (FQCN, dispatchKey)} exactly once per
53+
* JVM. Subsequent calls with the same key are no-ops. Native code invokes this synchronously when
54+
* {@code CometS3CredentialBridge} is constructed at plan time, before any per-request {@link
55+
* #getCredentialsForPath} call. {@code catalogProperties} carries the unfiltered FileIO property
56+
* bag on the Iceberg path and is empty on the Parquet path.
57+
*/
58+
public static void ensureInitialized(
59+
String providerClassName, String dispatchKey, Map<String, String> catalogProperties) {
60+
if (providerClassName == null || providerClassName.isEmpty()) {
61+
throw new IllegalArgumentException(
62+
"providerClassName is empty; native side should not call without a configured class");
63+
}
64+
InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey);
65+
Map<String, String> props =
66+
catalogProperties == null ? Collections.emptyMap() : catalogProperties;
67+
INSTANCES.computeIfAbsent(
68+
key,
69+
k -> {
70+
CometS3CredentialProvider provider = instantiate(k.providerClassName);
71+
provider.initialize(props);
72+
return provider;
73+
});
74+
}
75+
76+
/**
77+
* Invoked by native code on every per-request credential fetch. The instance must have been
78+
* created by a prior {@link #ensureInitialized} call; otherwise this throws. {@code mode} is the
79+
* {@link CometS3AccessMode} ordinal.
4980
*/
5081
public static CometS3Credentials getCredentialsForPath(
51-
String providerClassName, String bucket, String path, int mode) throws Exception {
82+
String providerClassName, String dispatchKey, String bucket, String path, int mode)
83+
throws Exception {
5284
if (providerClassName == null || providerClassName.isEmpty()) {
5385
throw new IllegalArgumentException(
5486
"providerClassName is empty; native side should not call without a configured class");
5587
}
5688
if (mode < 0 || mode >= MODES.length) {
5789
throw new IllegalArgumentException("Invalid CometS3AccessMode ordinal: " + mode);
5890
}
59-
CometS3CredentialProvider provider = resolve(providerClassName);
91+
InstanceKey key = new InstanceKey(providerClassName, dispatchKey == null ? "" : dispatchKey);
92+
CometS3CredentialProvider provider = INSTANCES.get(key);
93+
if (provider == null) {
94+
throw new IllegalStateException(
95+
"CometS3CredentialProvider "
96+
+ providerClassName
97+
+ " (dispatchKey="
98+
+ key.dispatchKey
99+
+ ") was not initialized; ensureInitialized must be called before"
100+
+ " getCredentialsForPath");
101+
}
60102
CometS3AccessMode accessMode = MODES[mode];
61103
if (LOG.isDebugEnabled()) {
62104
LOG.debug(
63-
"Fetching credentials via {} for bucket={} path={} mode={}",
105+
"Fetching credentials via {} (dispatchKey={}) for bucket={} path={} mode={}",
64106
providerClassName,
107+
key.dispatchKey,
65108
bucket,
66109
path,
67110
accessMode);
68111
}
69112
return provider.getCredentialsForPath(bucket, path, accessMode);
70113
}
71114

72-
private static CometS3CredentialProvider resolve(String providerClassName) {
73-
return INSTANCES.computeIfAbsent(providerClassName, CometS3CredentialDispatcher::instantiate);
74-
}
75-
76115
private static CometS3CredentialProvider instantiate(String providerClassName) {
77116
Class<?> clazz;
78117
try {
@@ -100,4 +139,28 @@ private static CometS3CredentialProvider instantiate(String providerClassName) {
100139
"Failed to instantiate CometS3CredentialProvider " + providerClassName, e);
101140
}
102141
}
142+
143+
private static final class InstanceKey {
144+
final String providerClassName;
145+
final String dispatchKey;
146+
147+
InstanceKey(String providerClassName, String dispatchKey) {
148+
this.providerClassName = providerClassName;
149+
this.dispatchKey = dispatchKey;
150+
}
151+
152+
@Override
153+
public boolean equals(Object o) {
154+
if (this == o) return true;
155+
if (!(o instanceof InstanceKey)) return false;
156+
InstanceKey other = (InstanceKey) o;
157+
return providerClassName.equals(other.providerClassName)
158+
&& dispatchKey.equals(other.dispatchKey);
159+
}
160+
161+
@Override
162+
public int hashCode() {
163+
return Objects.hash(providerClassName, dispatchKey);
164+
}
165+
}
103166
}

common/src/main/java/org/apache/comet/cloud/s3/CometS3CredentialProvider.java

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
package org.apache.comet.cloud.s3;
2121

22+
import java.util.Map;
23+
2224
/**
2325
* SPI for supplying AWS credentials to Comet's native S3 readers, which bypass Spark's Hadoop S3A
2426
* code path and cannot reach signer-based or path-aware credential mechanisms through the standard
@@ -47,16 +49,47 @@
4749
*
4850
* <p>Vendors register an implementation by setting {@code
4951
* spark.hadoop.fs.s3a.comet.credential.provider.class} (or the per-bucket form {@code
50-
* spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) to the implementing FQCN. The
51-
* class must have a public no-arg constructor. {@link #getCredentialsForPath} may be invoked
52-
* concurrently from many native tokio tasks, so implementations must be thread-safe.
52+
* spark.hadoop.fs.s3a.bucket.<name>.comet.credential.provider.class}) for the Parquet path, or
53+
* {@code spark.sql.catalog.<catalog>.s3.comet.credential.provider.class} for the Iceberg path. The
54+
* class must have a public no-arg constructor.
55+
*
56+
* <h2>Lifecycle</h2>
57+
*
58+
* <p>Comet keys provider instances by {@code (FQCN, dispatchKey)}, where {@code dispatchKey} is the
59+
* Spark V2 catalog name on the Iceberg path and the bucket on the Parquet path. The first time a
60+
* given key is seen on an executor, Comet reflects the class, calls {@link #initialize(Map)} once,
61+
* and caches the instance. Subsequent requests for the same key reuse it. Two catalogs that share
62+
* one FQCN therefore get isolated instances with their own {@code initialize} maps.
63+
*
64+
* <p>{@link #initialize(Map)} should be cheap and non-blocking; defer real credential fetches to
65+
* the first {@link #getCredentialsForPath} call. {@link #getCredentialsForPath} may be invoked
66+
* concurrently from many native tokio worker threads, so implementations must be thread-safe.
67+
*
68+
* <h2>Caching, refresh, and distribution are the vendor's job</h2>
69+
*
70+
* <p>Comet does not maintain a TTL cache, broadcast catalog state, or schedule refresh. Vendors
71+
* decide whether to cache (e.g. by wrapping {@code
72+
* org.apache.iceberg.aws.s3.VendedCredentialsProvider}'s {@code CachedSupplier}), when to refresh,
73+
* and how to distribute driver-side state to executors (typically by reading {@link #initialize}'s
74+
* {@code catalogProperties}, which Comet has already serialized through the native plan op).
5375
*
5476
* <p>Returns credentials or throws; there is no fall-through return value. A provider that is only
5577
* authoritative for some paths should resolve the default AWS chain itself for the rest. See the
5678
* user guide on cloud credential providers.
5779
*/
5880
public interface CometS3CredentialProvider {
5981

82+
/**
83+
* Called once per {@code (FQCN, dispatchKey)} on each executor before any {@link
84+
* #getCredentialsForPath} call. The {@code catalogProperties} map carries the full FileIO
85+
* property bag for the Iceberg path (including {@code credentials.uri}, OAuth tokens, vendor keys
86+
* like {@code tenant-id}) and is empty on the Parquet path. The default no-op keeps Parquet
87+
* vendors source-compatible.
88+
*
89+
* @param catalogProperties unfiltered FileIO/catalog properties; may contain secrets, do not log
90+
*/
91+
default void initialize(Map<String, String> catalogProperties) {}
92+
6093
/**
6194
* @param bucket S3 bucket name (no scheme, no path)
6295
* @param path object key or prefix, leading slash included (matches the URL path component)

common/src/test/java/org/apache/comet/cloud/s3/CometS3CredentialDispatcherTest.java

Lines changed: 78 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@
1919

2020
package org.apache.comet.cloud.s3;
2121

22+
import java.util.Collections;
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
2226
import org.junit.Before;
2327
import org.junit.Test;
2428

@@ -32,6 +36,7 @@
3236
public class CometS3CredentialDispatcherTest {
3337

3438
private static final String TEST_PROVIDER = TestCometS3CredentialProvider.class.getName();
39+
private static final String DK = "test-dispatch-key";
3540
private static final int READ = CometS3AccessMode.READ.ordinal();
3641
private static final int WRITE = CometS3AccessMode.WRITE.ordinal();
3742

@@ -40,11 +45,16 @@ public void resetTestProvider() {
4045
TestCometS3CredentialProvider.reset();
4146
}
4247

48+
private void init() {
49+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, DK, Collections.emptyMap());
50+
}
51+
4352
@Test
4453
public void getCredentialsRoundTripsThroughProvider() throws Exception {
54+
init();
4555
CometS3Credentials creds =
4656
CometS3CredentialDispatcher.getCredentialsForPath(
47-
TEST_PROVIDER, "my-bucket", "path/to/object", READ);
57+
TEST_PROVIDER, DK, "my-bucket", "path/to/object", READ);
4858

4959
assertNotNull(creds);
5060
assertEquals("AKIATEST", creds.getAccessKeyId());
@@ -60,25 +70,30 @@ public void getCredentialsRoundTripsThroughProvider() throws Exception {
6070

6171
@Test
6272
public void writeModeIsForwarded() throws Exception {
63-
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b", "k", WRITE);
73+
init();
74+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b", "k", WRITE);
6475
assertEquals(CometS3AccessMode.WRITE, TestCometS3CredentialProvider.lastMode);
6576
}
6677

6778
@Test
6879
public void unknownModeRejected() {
80+
init();
6981
assertThrows(
7082
IllegalArgumentException.class,
71-
() -> CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b", "k", 99));
83+
() -> CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b", "k", 99));
7284
}
7385

7486
@Test
7587
public void emptyClassNameRejected() {
7688
assertThrows(
7789
IllegalArgumentException.class,
78-
() -> CometS3CredentialDispatcher.getCredentialsForPath("", "b", "k", READ));
90+
() -> CometS3CredentialDispatcher.ensureInitialized("", DK, Collections.emptyMap()));
91+
assertThrows(
92+
IllegalArgumentException.class,
93+
() -> CometS3CredentialDispatcher.ensureInitialized(null, DK, Collections.emptyMap()));
7994
assertThrows(
8095
IllegalArgumentException.class,
81-
() -> CometS3CredentialDispatcher.getCredentialsForPath(null, "b", "k", READ));
96+
() -> CometS3CredentialDispatcher.getCredentialsForPath("", DK, "b", "k", READ));
8297
}
8398

8499
@Test
@@ -87,8 +102,8 @@ public void missingClassReportsActionableError() {
87102
assertThrows(
88103
IllegalStateException.class,
89104
() ->
90-
CometS3CredentialDispatcher.getCredentialsForPath(
91-
"com.does.not.Exist", "b", "k", READ));
105+
CometS3CredentialDispatcher.ensureInitialized(
106+
"com.does.not.Exist", DK, Collections.emptyMap()));
92107
assertTrue(thrown.getMessage().contains("not found"));
93108
}
94109

@@ -98,8 +113,8 @@ public void classNotImplementingInterfaceRejected() {
98113
assertThrows(
99114
IllegalStateException.class,
100115
() ->
101-
CometS3CredentialDispatcher.getCredentialsForPath(
102-
NotACredentialProvider.class.getName(), "b", "k", READ));
116+
CometS3CredentialDispatcher.ensureInitialized(
117+
NotACredentialProvider.class.getName(), DK, Collections.emptyMap()));
103118
assertTrue(thrown.getMessage().contains("does not implement"));
104119
}
105120

@@ -109,38 +124,85 @@ public void classWithoutNoArgCtorRejected() {
109124
assertThrows(
110125
IllegalStateException.class,
111126
() ->
112-
CometS3CredentialDispatcher.getCredentialsForPath(
113-
NoNoArgCtorProvider.class.getName(), "b", "k", READ));
127+
CometS3CredentialDispatcher.ensureInitialized(
128+
NoNoArgCtorProvider.class.getName(), DK, Collections.emptyMap()));
114129
assertTrue(thrown.getMessage().contains("no-arg constructor"));
115130
}
116131

132+
@Test
133+
public void getWithoutEnsureInitializedThrows() {
134+
Exception thrown =
135+
assertThrows(
136+
IllegalStateException.class,
137+
() ->
138+
CometS3CredentialDispatcher.getCredentialsForPath(
139+
TEST_PROVIDER, "never-initialized", "b", "k", READ));
140+
assertTrue(thrown.getMessage().contains("not initialized"));
141+
}
142+
143+
@Test
144+
public void initializeCalledExactlyOncePerKey() {
145+
Map<String, String> props = new HashMap<>();
146+
props.put("tenant-id", "T1");
147+
148+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, "cat-a", props);
149+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, "cat-a", props);
150+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, "cat-a", props);
151+
152+
assertEquals(1, TestCometS3CredentialProvider.initCount.get());
153+
}
154+
155+
@Test
156+
public void distinctDispatchKeysIsolateInstances() throws Exception {
157+
Map<String, String> propsA = new HashMap<>();
158+
propsA.put("tenant-id", "T-A");
159+
Map<String, String> propsB = new HashMap<>();
160+
propsB.put("tenant-id", "T-B");
161+
162+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, "iso-cat-a", propsA);
163+
CometS3CredentialDispatcher.ensureInitialized(TEST_PROVIDER, "iso-cat-b", propsB);
164+
165+
assertEquals(2, TestCometS3CredentialProvider.initCount.get());
166+
167+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "iso-cat-a", "b", "k", READ);
168+
assertEquals("T-A", TestCometS3CredentialProvider.lastTenantSeen);
169+
170+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "iso-cat-b", "b", "k", READ);
171+
assertEquals("T-B", TestCometS3CredentialProvider.lastTenantSeen);
172+
}
173+
117174
@Test
118175
public void providerExceptionsPropagate() {
176+
init();
119177
IllegalStateException boom = new IllegalStateException("simulated STS failure");
120178
TestCometS3CredentialProvider.throwOnNext = boom;
121179

122180
Exception thrown =
123181
assertThrows(
124182
Exception.class,
125-
() -> CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b", "k", READ));
183+
() ->
184+
CometS3CredentialDispatcher.getCredentialsForPath(
185+
TEST_PROVIDER, DK, "b", "k", READ));
126186
assertSame(boom, thrown);
127187
}
128188

129189
@Test
130190
public void nullSessionTokenAllowed() throws Exception {
191+
init();
131192
TestCometS3CredentialProvider.nextResult = new CometS3Credentials("AKIA", "sec", null, 0L);
132193

133194
CometS3Credentials creds =
134-
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b", "k", READ);
195+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b", "k", READ);
135196

136197
assertNull(creds.getSessionToken());
137198
}
138199

139200
@Test
140201
public void providerReceivesEachCallSeparately() throws Exception {
141-
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b1", "k1", READ);
142-
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b2", "k2", READ);
143-
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, "b3", "k3", READ);
202+
init();
203+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b1", "k1", READ);
204+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b2", "k2", READ);
205+
CometS3CredentialDispatcher.getCredentialsForPath(TEST_PROVIDER, DK, "b3", "k3", READ);
144206

145207
assertEquals(3, TestCometS3CredentialProvider.callCount.get());
146208
assertEquals("b3", TestCometS3CredentialProvider.lastBucket);

0 commit comments

Comments
 (0)