Skip to content

Commit 2146020

Browse files
authored
INS-34515-2 (#18)
* INS-34515-2 * INS-34515-2 * Log error message from provider * Bugfix typo * Fixes for getting to talk from ontap * Add enablePathStyle option * Bugfix * Regression bugfixes * Rename classes after testing * Extra logging to figure out validating errors * Version bump * Codereview unnecessary check * Rename files
1 parent 11e8c58 commit 2146020

File tree

5 files changed

+31
-18
lines changed

5 files changed

+31
-18
lines changed

distribution.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@
1212

1313
<artifactId>distribution</artifactId>
1414
<packaging>pom</packaging>
15-
<version>0.1.4</version>
15+
<version>0.1.5</version>
1616

1717
<dependencies>
1818
<dependency>
1919
<groupId>com.instaclustr.kafkaconnect</groupId>
2020
<artifactId>instaclustr-s3-connector</artifactId>
21-
<version>0.1.4</version>
21+
<version>0.1.5</version>
2222
</dependency>
2323
<dependency>
2424
<groupId>com.instaclustr.kafkaconnect</groupId>

s3/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<modelVersion>4.0.0</modelVersion>
1111

1212
<artifactId>instaclustr-s3-connector</artifactId>
13-
<version>0.1.4</version>
13+
<version>0.1.5</version>
1414
<packaging>jar</packaging>
1515

1616
<dependencies>

s3/src/main/java/com/instaclustr/kafka/connect/s3/AwsStorageConnectorCommonConfig.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,13 @@
55
import com.amazonaws.services.s3.model.AmazonS3Exception;
66
import com.amazonaws.services.s3.model.Region;
77
import com.amazonaws.services.securitytoken.model.AWSSecurityTokenServiceException;
8+
import org.apache.commons.lang3.StringUtils;
89
import org.apache.kafka.common.config.Config;
910
import org.apache.kafka.common.config.ConfigDef;
1011
import org.apache.kafka.common.config.ConfigValue;
1112
import org.apache.kafka.connect.errors.ConnectException;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
1215

1316
import java.util.Map;
1417
import java.util.regex.Pattern;
@@ -19,18 +22,16 @@
1922

2023
public class AwsStorageConnectorCommonConfig {
2124
public static final String BUCKET = "aws.s3.bucket";
22-
2325
public static final String AWS_REGION = "aws.region";
24-
2526
public static final String S3_KEY_PREFIX = "prefix";
26-
2727
public static final String AWS_SECRET_KEY = "aws.secretKey";
28-
2928
public static final String AWS_ACCESS_KEY_ID = "aws.accessKeyId";
30-
29+
public static final String S3_ENDPOINT = "s3.endpoint";
3130
public static final String AWS_IAM_ROLE_ARN = "aws.role.arn";
31+
public static final String S3_ENABLE_PATH_STYLE = "s3.enablePathStyle";
3232

3333
public static final String DEFAULT_AWS_REGION = Regions.DEFAULT_REGION.getName();
34+
private static final Logger logger = LoggerFactory.getLogger(AwsStorageConnectorCommonConfig.class);
3435

3536
private AwsStorageConnectorCommonConfig() {}
3637

@@ -42,7 +43,9 @@ public static ConfigDef conf() {
4243
.define(AWS_ACCESS_KEY_ID, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "AWS access key id")
4344
.define(AWS_SECRET_KEY, ConfigDef.Type.PASSWORD, ConfigDef.Importance.HIGH, "AWS access secret key")
4445
.define(AWS_REGION, ConfigDef.Type.STRING, DEFAULT_AWS_REGION, ConfigDef.Importance.MEDIUM, String.format("AWS client region, if not set will use %s", DEFAULT_AWS_REGION))
45-
.define(AWS_IAM_ROLE_ARN, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "");
46+
.define(AWS_IAM_ROLE_ARN, ConfigDef.Type.STRING, "", ConfigDef.Importance.HIGH, "")
47+
.define(S3_ENDPOINT, ConfigDef.Type.STRING, "", ConfigDef.Importance.MEDIUM, "Optional, S3 endpoint URL used to make it compatible with certain storage endpoints")
48+
.define(S3_ENABLE_PATH_STYLE, ConfigDef.Type.BOOLEAN, false, ConfigDef.Importance.MEDIUM, "Optional, ensures the bucket name is in the URL path, making it compatible with certain storage endpoints");
4649
return configDef;
4750
}
4851

@@ -61,7 +64,7 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
6164
String awsRegion = sentConfigMap.get(AWS_REGION);
6265
AmazonS3 s3Client = TransferManagerProvider.getS3ClientBuilderWithRegionAndCredentials(sentConfigMap).build();
6366
if (s3Client.doesBucketExistV2(s3BucketName)) {
64-
if (awsRegion != null) {
67+
if (StringUtils.isBlank(sentConfigMap.get(S3_ENDPOINT)) && awsRegion != null) {
6568
String bucketRegion = Region.fromValue(s3Client.getBucketLocation(s3BucketName)).toAWSRegion().getName();
6669
if (!bucketRegion.equals(awsRegion)) {
6770
addErrorMessageToConfigObject(configObject, AWS_REGION, String.format("Defined region(%s) is not the same as the bucket region(%s)", awsRegion, bucketRegion));
@@ -95,7 +98,8 @@ public static void verifyS3CredentialsAndBucketInfo(final Map<String, String> se
9598
throw new ConnectException(String.format("Unknown Amazon S3 exception while validating config, %s", e.getErrorCode()), e);
9699
}
97100
} catch (IllegalArgumentException e) {
98-
addErrorMessageToConfigObject(configObject, AWS_REGION, "The defined aws.region is invalid");
101+
logger.info("Error whilst validating configurations, {}", e.getMessage());
102+
addErrorMessageToConfigObject(configObject, AWS_REGION, String.format("The defined aws.region is invalid %s", e.getMessage()));
99103
}
100104
}
101105

s3/src/main/java/com/instaclustr/kafka/connect/s3/TransferManagerProvider.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import com.amazonaws.auth.AWSStaticCredentialsProvider;
66
import com.amazonaws.auth.BasicAWSCredentials;
77
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
8+
import com.amazonaws.client.builder.AwsClientBuilder;
89
import com.amazonaws.regions.Regions;
910
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
1011
import com.amazonaws.services.s3.transfer.TransferManager;
@@ -17,7 +18,6 @@
1718

1819
import java.util.Map;
1920
import java.util.UUID;
20-
import java.util.concurrent.TimeUnit;
2121

2222
public class TransferManagerProvider {
2323
private TransferManager transferManager;
@@ -34,10 +34,11 @@ public TransferManagerProvider(final Map<String, String> config) {
3434
}
3535

3636
public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(final Map<String, String> config) {
37-
String accessKey = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_ACCESS_KEY_ID);
38-
String secret = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_SECRET_KEY);
37+
final String accessKey = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_ACCESS_KEY_ID);
38+
final String secret = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_SECRET_KEY);
3939
String region = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_REGION);
40-
String roleArn = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_IAM_ROLE_ARN);
40+
final String roleArn = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.AWS_IAM_ROLE_ARN);
41+
final String endpoint = getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.S3_ENDPOINT);
4142

4243
AWSStaticCredentialsProvider awsStaticCredentialsProvider = new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secret));
4344
AWSCredentialsProvider awsCredentialsProvider;
@@ -64,12 +65,21 @@ public static AmazonS3ClientBuilder getS3ClientBuilderWithRegionAndCredentials(f
6465
AmazonS3ClientBuilder clientBuilder = AmazonS3ClientBuilder.standard()
6566
.withCredentials(awsCredentialsProvider);
6667

67-
if (region == null) {
68+
if (region == null && StringUtils.isBlank(endpoint)) {
6869
region = AwsStorageConnectorCommonConfig.DEFAULT_AWS_REGION;
6970
clientBuilder.enableForceGlobalBucketAccess();
7071
log.info("No region defined. Using {} and force global bucket access", AwsStorageConnectorCommonConfig.DEFAULT_AWS_REGION);
7172
}
72-
clientBuilder.withRegion(Regions.fromName(region).getName()); //using fromName to validate the region value
73+
74+
if (StringUtils.isNotBlank(endpoint)) {
75+
final boolean isPathStyleAccessEnabled = Boolean.parseBoolean(getFromConfigOrEnvironment(config, AwsStorageConnectorCommonConfig.S3_ENABLE_PATH_STYLE));
76+
77+
AwsClientBuilder.EndpointConfiguration endpointConfiguration = new AwsClientBuilder.EndpointConfiguration(endpoint, region);
78+
clientBuilder.withEndpointConfiguration(endpointConfiguration).withPathStyleAccessEnabled(isPathStyleAccessEnabled);
79+
} else {
80+
clientBuilder.withRegion(Regions.fromName(region).getName()); //using fromName to validate the region value
81+
}
82+
7383
return clientBuilder;
7484
}
7585

s3/src/main/java/com/instaclustr/kafka/connect/s3/source/AwsStorageSourceTask.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import com.amazonaws.AmazonClientException;
44
import com.google.common.util.concurrent.RateLimiter;
55
import com.google.common.util.concurrent.UncheckedExecutionException;
6-
import com.google.common.util.concurrent.UncheckedTimeoutException;
76
import com.instaclustr.kafka.connect.s3.AwsConnectorStringFormats;
87
import com.instaclustr.kafka.connect.s3.AwsStorageConnectorCommonConfig;
98
import com.instaclustr.kafka.connect.s3.TransferManagerProvider;

0 commit comments

Comments
 (0)