Skip to content

Commit 11e8c58

Browse files
implement IAM role support for S3 connector (#9)
* implement IAM role support for S3 connector
1 parent 1611f45 commit 11e8c58

File tree

4 files changed

+53
-8
lines changed

4 files changed

+53
-8
lines changed

distribution.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212

1313
<artifactId>distribution</artifactId>
1414
<packaging>pom</packaging>
15-
<version>0.1.3</version>
15+
<version>0.1.4</version>
1616

1717
<dependencies>
1818
<dependency>
1919
<groupId>com.instaclustr.kafkaconnect</groupId>
2020
<artifactId>instaclustr-s3-connector</artifactId>
21-
<version>0.1.3</version>
21+
<version>0.1.4</version>
2222
</dependency>
2323
<dependency>
2424
<groupId>com.instaclustr.kafkaconnect</groupId>

s3/pom.xml

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,19 @@
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>instaclustr-s3-connector</artifactId>
13-
<version>0.1.3</version>
13+
<version>0.1.4</version>
1414
<packaging>jar</packaging>
1515

1616
<dependencies>
1717
<dependency>
1818
<groupId>com.amazonaws</groupId>
1919
<artifactId>aws-java-sdk-s3</artifactId>
20-
<version>1.11.725</version>
20+
<version>1.12.39</version>
21+
</dependency>
22+
<dependency>
23+
<groupId>com.amazonaws</groupId>
24+
<artifactId>aws-java-sdk-sts</artifactId>
25+
<version>1.12.39</version>
2126
</dependency>
2227
<dependency>
2328
<groupId>ch.qos.logback</groupId>
@@ -32,7 +37,7 @@
3237
<dependency>
3338
<groupId>com.google.guava</groupId>
3439
<artifactId>guava</artifactId>
35-
<version>24.1.1-jre</version>
40+
<version>29.0-jre</version>
3641
</dependency>
3742
</dependencies>
3843

s3/src/main/java/com/instaclustr/kafka/connect/s3/AwsStorageConnectorCommonConfig.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import com.amazonaws.services.s3.AmazonS3;
55
import com.amazonaws.services.s3.model.AmazonS3Exception;
66
import com.amazonaws.services.s3.model.Region;
7+
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
78
import org.apache.kafka.common.config.Config;
89
import org.apache.kafka.common.config.ConfigDef;
910
import org.apache.kafka.common.config.ConfigValue;
@@ -27,6 +28,8 @@ public class AwsStorageConnectorCommonConfig {
2728

2829
public static final String AWS_ACCESS_KEY_ID = "aws.accessKeyId";
2930

31+
public static final String AWS_IAM_ROLE_ARN = "aws.role.arn";
32+
3033
public static final String DEFAULT_AWS_REGION = Regions.DEFAULT_REGION.getName();
3134

3235
private AwsStorageConnectorCommonConfig() {}
@@ -38,7 +41,8 @@ public static ConfigDef conf() {
3841
ConfigDef.Importance.HIGH, "Path prefix for the objects written into S3")
3942
.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "AWS access key id")
4043
.define(AWS_SECRET_KEY, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "AWS access secret key")
41-
.define(AWS_REGION, ConfigDef.Type.STRING, DEFAULT_AWS_REGION, ConfigDef.Importance.MEDIUM, String.format("AWS client region, if not set will use %s", DEFAULT_AWS_REGION));
44+
.define(AWS_REGION, ConfigDef.Type.STRING, DEFAULT_AWS_REGION, ConfigDef.Importance.MEDIUM, String.format("AWS client region, if not set will use %s", DEFAULT_AWS_REGION))
45+
.define(AWS_IAM_ROLE_ARN, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "");
4246
return configDef;
4347
}
4448

@@ -67,7 +71,7 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
6771
addErrorMessageToConfigObject(configObject, BUCKET, "The defined bucket name does not exist");
6872
}
6973
s3Client.shutdown();
70-
} catch (AmazonS3Exception e) {
74+
} catch (AmazonS3Exception | AWSSecurityTokenServiceException e) {
7175
switch (e.getErrorCode()) {
7276
case "InvalidAccessKeyId":
7377
addErrorMessageToConfigObject(configObject, AWS_ACCESS_KEY_ID, "The defined aws.accessKeyId is invalid");
@@ -81,6 +85,12 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
8185
case "IllegalLocationConstraintException":
8286
addErrorMessageToConfigObject(configObject, AWS_REGION, String.format("Defined region(%s) is not the same as the bucket region", sentConfigMap.get(AWS_REGION)));
8387
break;
88+
case "AccessDenied":
89+
addErrorMessageToConfigObject(configObject, AWS_IAM_ROLE_ARN, "The user and/or role hasn't been setup correctly with the required permissions");
90+
break;
91+
case "ValidationError":
92+
addErrorMessageToConfigObject(configObject, AWS_IAM_ROLE_ARN, "The defined aws.role.arn is invalid");
93+
break;
8494
default:
8595
throw new ConnectException(String.format("Unknown Amazon S3 exception while validating config, %s", e.getErrorCode()), e);
8696
}

s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,23 @@
11
package com.instaclustr.kafka.connect.s3;
22

33
import com.amazonaws.ClientConfiguration;
4+
import com.amazonaws.auth.AWSCredentialsProvider;
45
import com.amazonaws.auth.AWSStaticCredentialsProvider;
56
import com.amazonaws.auth.BasicAWSCredentials;
7+
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
68
import com.amazonaws.regions.Regions;
79
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
810
import com.amazonaws.services.s3.transfer.TransferManager;
911
import com.amazonaws.services.s3.transfer.TransferManagerBuilder;
12+
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
13+
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
14+
import org.apache.commons.lang3.StringUtils;
1015
import org.slf4j.Logger;
1116
import org.slf4j.LoggerFactory;
1217

1318
import java.util.Map;
19+
import java.util.UUID;
20+
import java.util.concurrent.TimeUnit;
1421

1522
public class TransferManagerProvider {
1623
private TransferManager transferManager;
@@ -30,9 +37,32 @@ public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(f
3037
String accessKey = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_ACCESS_KEY_ID);
3138
String secret = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_SECRET_KEY);
3239
String region = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_REGION);
40+
String roleArn = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_IAM_ROLE_ARN);
41+
42+
AWSStaticCredentialsProvider awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret));
43+
AWSCredentialsProvider awsCredentialsProvider;
44+
45+
if (StringUtils.isBlank(roleArn)) {
46+
// when IAM user has direct access to the S3 bucket
47+
awsCredentialsProvider = awsStaticCredentialsProvider;
48+
} else {
49+
// when the IAM user needs to assume the role to access the S3 bucket
50+
AWSSecurityTokenService awsSecurityTokenService = AWSSecurityTokenServiceClientBuilder.standard()
51+
.withCredentials(awsStaticCredentialsProvider)
52+
.build();
53+
54+
STSAssumeRoleSessionCredentialsProvider.Builder assumeRoleBuilder =
55+
new STSAssumeRoleSessionCredentialsProvider.Builder(
56+
roleArn,
57+
UUID.randomUUID().toString().substring(0, 32));
58+
59+
awsCredentialsProvider = assumeRoleBuilder
60+
.withStsClient(awsSecurityTokenService)
61+
.build();
62+
}
3363

3464
AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard()
35-
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret)));
65+
.withCredentials(awsCredentialsProvider);
3666

3767
if (region == null) {
3868
region = AwsStorageConnectorCommonConfig.DEFAULT_AWS_REGION;

0 commit comments

Comments
 (0)