Skip to content

Commit 63416f7

Browse files
authored
Fix NPE in STS client usage (#510)
<!-- FLEET-5859 --> Fixes #509 Adds additional tests.
1 parent a49cd0c commit 63416f7

6 files changed

Lines changed: 211 additions & 15 deletions

File tree

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
version=3.5.0-SNAPSHOT
1+
version=3.4.1-SNAPSHOT
22

33
sonatypeUsername=<fill>
44
sonatypePassword=<fill>

s3-commons/src/main/java/io/aiven/kafka/connect/config/s3/S3ConfigFragment.java

Lines changed: 29 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.kafka.common.config.AbstractConfig;
2424
import org.apache.kafka.common.config.ConfigDef;
2525
import org.apache.kafka.common.config.ConfigException;
26+
import org.apache.kafka.common.utils.Utils;
2627

2728
import io.aiven.kafka.connect.common.config.ConfigFragment;
2829
import io.aiven.kafka.connect.common.config.validators.FileCompressionTypeValidator;
@@ -43,6 +44,7 @@
4344
import org.slf4j.LoggerFactory;
4445
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
4546
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
47+
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
4648

4749
/**
4850
* The configuration fragment that defines the S3 specific characteristics.
@@ -94,6 +96,12 @@ public final class S3ConfigFragment extends ConfigFragment {
9496
public static final String AWS_ACCESS_KEY_ID_CONFIG = "aws.access.key.id";
9597
public static final String AWS_SECRET_ACCESS_KEY_CONFIG = "aws.secret.access.key";
9698
public static final String AWS_CREDENTIALS_PROVIDER_CONFIG = "aws.credentials.provider";
99+
/**
100+
* not used in codebase
101+
*
102+
* @deprecated to be removed
103+
*/
104+
@Deprecated
97105
public static final String AWS_CREDENTIAL_PROVIDER_DEFAULT = "com.amazonaws.auth.DefaultAWSCredentialsProviderChain";
98106
public static final String AWS_S3_BUCKET_NAME_CONFIG = "aws.s3.bucket.name";
99107
public static final String AWS_S3_SSE_ALGORITHM_CONFIG = "aws.s3.sse.algorithm";
@@ -189,13 +197,10 @@ static void addAwsConfigGroup(final ConfigDef configDef) {
189197
ConfigDef.Importance.MEDIUM, "AWS Secret Access Key", GROUP_AWS, awsGroupCounter++,
190198
ConfigDef.Width.NONE, AWS_SECRET_ACCESS_KEY_CONFIG);
191199

192-
configDef.define(AWS_CREDENTIALS_PROVIDER_CONFIG, ConfigDef.Type.CLASS, AWS_CREDENTIAL_PROVIDER_DEFAULT,
193-
ConfigDef.Importance.MEDIUM,
194-
"When you initialize a new " + "service client without supplying any arguments, "
195-
+ "the AWS SDK for Java attempts to find temporary "
196-
+ "credentials by using the default credential " + "provider chain implemented by the "
197-
+ "DefaultAWSCredentialsProviderChain class.",
198-
200+
configDef.define(AWS_CREDENTIALS_PROVIDER_CONFIG, ConfigDef.Type.CLASS, null, ConfigDef.Importance.MEDIUM,
201+
"When you initialize a new service client without supplying any arguments "
202+
+ "the AWS SDK for Java attempts to find temporary credentials by using the default credential "
203+
+ "provider chain.",
199204
GROUP_AWS, awsGroupCounter++, ConfigDef.Width.NONE, AWS_CREDENTIALS_PROVIDER_CONFIG);
200205

201206
configDef.define(AWS_S3_BUCKET_NAME_CONFIG, ConfigDef.Type.STRING, null, new BucketNameValidator(),
@@ -355,7 +360,7 @@ public void validateCredentials() {
355360
final AwsBasicCredentials awsCredentialsV2 = getAwsCredentialsV2();
356361
if (awsCredentials == null && awsCredentialsV2 == null) {
357362
LOGGER.info(
358-
"Connector use {} as credential Provider, "
363+
"Connector uses {} as credential Provider, "
359364
+ "when configuration for {{}, {}} OR {{}, {}} are absent",
360365
AWS_CREDENTIALS_PROVIDER_CONFIG, AWS_ACCESS_KEY_ID_CONFIG, AWS_SECRET_ACCESS_KEY_CONFIG,
361366
AWS_STS_ROLE_ARN, AWS_STS_ROLE_SESSION_NAME);
@@ -541,12 +546,26 @@ public int getS3RetryBackoffMaxRetries() {
541546
return cfg.getInt(AWS_S3_RETRY_BACKOFF_MAX_RETRIES_CONFIG);
542547
}
543548

549+
/**
550+
* @return a V1 credentials provider
551+
* @deprecated use {@link #getAwsCredentialsV2()}
552+
*/
553+
@Deprecated
544554
public AWSCredentialsProvider getCustomCredentialsProvider() {
545-
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AWSCredentialsProvider.class);
555+
final AWSCredentialsProvider result = cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG,
556+
AWSCredentialsProvider.class);
557+
return result != null ? result : Utils.newInstance(com.amazonaws.auth.DefaultAWSCredentialsProviderChain.class);
546558
}
547559

560+
/**
561+
* Gets the Aws Credentials provider.
562+
*
563+
* @return the Aws Credentials provider.
564+
*/
548565
public AwsCredentialsProvider getCustomCredentialsProviderV2() {
549-
return cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG, AwsCredentialsProvider.class);
566+
final AwsCredentialsProvider result = cfg.getConfiguredInstance(AWS_CREDENTIALS_PROVIDER_CONFIG,
567+
AwsCredentialsProvider.class);
568+
return result != null ? result : DefaultCredentialsProvider.builder().build();
550569
}
551570

552571
public int getFetchPageSize() {

s3-commons/src/main/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactory.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,19 @@
2929
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
3030
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
3131
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
32+
import software.amazon.awssdk.services.sts.StsClient;
3233
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
3334
import software.amazon.awssdk.services.sts.model.AssumeRoleRequest;
3435

36+
/**
37+
* Creates AwsCredentialProviders.
38+
*/
3539
public class AwsCredentialProviderFactory {
3640

41+
/**
42+
* @deprecated use {@link #getAwsV2Provider(S3ConfigFragment)}
43+
*/
44+
@Deprecated
3745
public AWSCredentialsProvider getProvider(final S3ConfigFragment config) {
3846
if (config.hasAwsStsRole()) {
3947
return getStsProvider(config);
@@ -45,6 +53,10 @@ public AWSCredentialsProvider getProvider(final S3ConfigFragment config) {
4553
return new AWSStaticCredentialsProvider(awsCredentials);
4654
}
4755

56+
/**
57+
* @deprecated use {@link #getV2StsProvider(S3ConfigFragment)}
58+
*/
59+
@Deprecated
4860
private AWSCredentialsProvider getStsProvider(final S3ConfigFragment config) {
4961
final AwsStsRole awsstsRole = config.getStsRole();
5062
final AWSSecurityTokenService sts = securityTokenService(config);
@@ -55,6 +67,7 @@ private AWSCredentialsProvider getStsProvider(final S3ConfigFragment config) {
5567
.build();
5668
}
5769

70+
@Deprecated
5871
private AWSSecurityTokenService securityTokenService(final S3ConfigFragment config) {
5972
if (config.hasStsEndpointConfig()) {
6073
final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard();
@@ -64,6 +77,13 @@ private AWSSecurityTokenService securityTokenService(final S3ConfigFragment conf
6477
return AWSSecurityTokenServiceClientBuilder.defaultClient();
6578
}
6679

80+
/**
81+
* Gets an AWS V2 credential provider
82+
*
83+
* @param config
84+
* the S3Configuration fragment.
85+
* @return an AwsCredentialsProvider
86+
*/
6787
public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) {
6888

6989
if (config.hasAwsStsRole()) {
@@ -74,9 +94,15 @@ public AwsCredentialsProvider getAwsV2Provider(final S3ConfigFragment config) {
7494
return config.getCustomCredentialsProviderV2();
7595
}
7696
return StaticCredentialsProvider.create(awsCredentials);
77-
7897
}
7998

99+
/**
100+
* Gets a V2 STS Provider.
101+
*
102+
* @param config
103+
* the S3Configuration fragment.
104+
* @return an StsAssumeRoleCredentialsProvider
105+
*/
80106
private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment config) {
81107
if (config.hasAwsStsRole()) {
82108
return StsAssumeRoleCredentialsProvider.builder()
@@ -85,11 +111,10 @@ private StsAssumeRoleCredentialsProvider getV2StsProvider(final S3ConfigFragment
85111
// Maker this a unique identifier
86112
.roleSessionName("AwsV2SDKConnectorSession")
87113
.build())
114+
.stsClient(StsClient.builder().region(config.getAwsS3RegionV2()).build())
88115
.build();
89116
}
90-
91117
return StsAssumeRoleCredentialsProvider.builder().build();
92-
93118
}
94119

95120
}

s3-commons/src/test/java/io/aiven/kafka/connect/iam/AwsCredentialProviderFactoryTest.java

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,23 @@
2121
import java.util.HashMap;
2222
import java.util.Map;
2323

24+
import org.apache.kafka.common.Configurable;
25+
2426
import io.aiven.kafka.connect.config.s3.S3ConfigFragment;
2527
import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig;
2628

29+
import com.amazonaws.auth.AWSCredentials;
30+
import com.amazonaws.auth.AWSCredentialsProvider;
2731
import com.amazonaws.auth.AWSStaticCredentialsProvider;
2832
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
2933
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
3034
import com.amazonaws.regions.Regions;
3135
import org.junit.jupiter.api.BeforeEach;
3236
import org.junit.jupiter.api.Test;
3337

38+
/**
39+
* Tests the Credential provider factory generation of V1 credentials
40+
*/
3441
final class AwsCredentialProviderFactoryTest {
3542
private AwsCredentialProviderFactory factory;
3643
private Map<String, String> props;
@@ -77,4 +84,36 @@ void createDefaultCredentialsWhenNoCredentialsSpecified() {
7784
assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class);
7885
}
7986

87+
@Test
88+
void customCredentialProviderTest() {
89+
props.put(S3ConfigFragment.AWS_CREDENTIALS_PROVIDER_CONFIG, DummyCredentialsProvider.class.getName());
90+
final var config = new AwsCredentialBaseConfig(props);
91+
92+
final var credentialProvider = factory.getProvider(new S3ConfigFragment(config));
93+
assertThat(credentialProvider).isInstanceOf(DummyCredentialsProvider.class);
94+
assertThat(((DummyCredentialsProvider) credentialProvider).configured).isTrue();
95+
}
96+
97+
/**
98+
* A custom V1 credential provider for testing.
99+
*/
100+
public static class DummyCredentialsProvider implements AWSCredentialsProvider, Configurable {
101+
boolean configured;
102+
103+
@Override
104+
public AWSCredentials getCredentials() {
105+
return null;
106+
}
107+
108+
@Override
109+
public void refresh() {
110+
111+
}
112+
113+
@Override
114+
public void configure(final Map<String, ?> map) {
115+
configured = true;
116+
}
117+
}
118+
80119
}
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Copyright 2024 Aiven Oy
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package io.aiven.kafka.connect.iam;
18+
19+
import static org.assertj.core.api.Assertions.assertThat;
20+
21+
import java.util.HashMap;
22+
import java.util.Map;
23+
24+
import org.apache.kafka.common.Configurable;
25+
26+
import io.aiven.kafka.connect.config.s3.S3ConfigFragment;
27+
import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig;
28+
29+
import org.junit.jupiter.api.BeforeEach;
30+
import org.junit.jupiter.api.Test;
31+
import software.amazon.awssdk.auth.credentials.AwsCredentials;
32+
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
33+
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
34+
import software.amazon.awssdk.regions.Region;
35+
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;
36+
37+
/**
38+
* Tests the Credential provider factory generation of V2 credentials
39+
*/
40+
final class AwsCredentialV2ProviderFactoryTest {
41+
private AwsCredentialProviderFactory factory;
42+
private Map<String, String> props;
43+
44+
@BeforeEach
45+
public void setUp() {
46+
factory = new AwsCredentialProviderFactory();
47+
props = new HashMap<>();
48+
props.put(S3ConfigFragment.AWS_S3_BUCKET_NAME_CONFIG, "any-bucket");
49+
}
50+
51+
@Test
52+
void createsStsCredentialProviderIfSpecified() {
53+
54+
props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah");
55+
props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah");
56+
props.put(S3ConfigFragment.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
57+
props.put(S3ConfigFragment.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
58+
props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id());
59+
props.put(S3ConfigFragment.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com");
60+
61+
final var config = new AwsCredentialBaseConfig(props);
62+
63+
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
64+
assertThat(credentialProvider).isInstanceOf(StsAssumeRoleCredentialsProvider.class);
65+
}
66+
67+
@Test
68+
void createStaticCredentialProviderByDefault() {
69+
props.put(S3ConfigFragment.AWS_ACCESS_KEY_ID_CONFIG, "blah-blah-blah");
70+
props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah");
71+
72+
final var config = new AwsCredentialBaseConfig(props);
73+
74+
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
75+
assertThat(credentialProvider).isInstanceOf(StaticCredentialsProvider.class);
76+
}
77+
78+
@Test
79+
void createDefaultCredentialsWhenNoCredentialsSpecified() {
80+
final var config = new AwsCredentialBaseConfig(props);
81+
82+
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
83+
assertThat(credentialProvider).isInstanceOf(AwsCredentialsProvider.class);
84+
}
85+
86+
@Test
87+
void customCredentialProviderTest() {
88+
props.put(S3ConfigFragment.AWS_CREDENTIALS_PROVIDER_CONFIG, DummyCredentialsProvider.class.getName());
89+
final var config = new AwsCredentialBaseConfig(props);
90+
91+
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
92+
assertThat(credentialProvider).isInstanceOf(DummyCredentialsProvider.class);
93+
assertThat(((DummyCredentialsProvider) credentialProvider).configured).isTrue();
94+
}
95+
96+
/**
97+
* A custom V2 credential provider for testing.
98+
*/
99+
public static class DummyCredentialsProvider implements AwsCredentialsProvider, Configurable {
100+
boolean configured;
101+
102+
@Override
103+
public void configure(final Map<String, ?> map) {
104+
configured = true;
105+
}
106+
107+
@Override
108+
public AwsCredentials resolveCredentials() {
109+
return null;
110+
}
111+
}
112+
113+
}

s3-source-connector/src/main/java/io/aiven/kafka/connect/s3/source/config/S3ClientFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public S3Client createAmazonS3Client(final S3SourceConfig config) {
4747
.credentialsProvider(config.getAwsV2Provider())
4848
.build();
4949
} else {
50-
// TODO This is definitely used for testing but not sure if customers use it.
50+
// TODO This is definitely used for testing but not sure if customers use it. customers use it!
5151
return S3Client.builder()
5252
.overrideConfiguration(clientOverrideConfiguration)
5353
.region(config.getAwsS3Region())

0 commit comments

Comments
 (0)