Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
Expand All @@ -44,9 +43,6 @@

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.RegionUtils;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.internal.BucketNameUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -152,6 +148,7 @@ public final class S3ConfigFragment extends ConfigFragment {
// issues during delay calculation.
// in other words we can't use values greater than 30
public static final int S3_RETRY_BACKOFF_MAX_RETRIES_DEFAULT = 3;

/**
* Constructor.
*
Expand Down Expand Up @@ -447,8 +444,9 @@ public void validateBucket() {

// Custom Validators
protected static class AwsRegionValidator implements ConfigDef.Validator {
private static final String SUPPORTED_AWS_REGIONS = Arrays.stream(Regions.values())
.map(Regions::getName)
private static final String SUPPORTED_AWS_REGIONS = Region.regions()
.stream()
.map(Region::id)
.collect(Collectors.joining(", "));

@Override
Expand Down Expand Up @@ -504,16 +502,6 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return new AwsStsEndpointConfig(cfg.getString(AWS_STS_CONFIG_ENDPOINT), cfg.getString(AWS_S3_REGION_CONFIG));
}

/**
* @deprecated getAwsEndpointConfiguration uses the AWS SDK 1.X which is deprecated and out of maintenance in
* December 2025 After upgrading to use SDK 2.X this no longer is required.
*/
@Deprecated
public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
final AwsStsEndpointConfig config = getStsEndpointConfig();
return new AwsClientBuilder.EndpointConfiguration(config.getServiceEndpoint(), config.getSigningRegion());
}

/**
* @deprecated Use {@link #getAwsCredentialsV2} instead getAwsCredentials uses the AWS SDK 1.X which is deprecated
* and out of maintenance in December 2025
Expand Down Expand Up @@ -544,6 +532,7 @@ public AwsBasicCredentials getAwsCredentialsV2() {
&& Objects.nonNull(cfg.getPassword(AWS_SECRET_ACCESS_KEY))) {
LOGGER.warn("Config options {} and {} are not supported for this Connector", AWS_ACCESS_KEY_ID,
AWS_SECRET_ACCESS_KEY);

}
return null;
}
Expand All @@ -554,23 +543,6 @@ public String getAwsS3EndPoint() {
: cfg.getString(AWS_S3_ENDPOINT);
}

/**
* @deprecated Use {@link #getAwsS3RegionV2} instead getAwsS3Region uses the AWS SDK 1.X which is deprecated and out
* of maintenance in December 2025
*/
@Deprecated
public com.amazonaws.regions.Region getAwsS3Region() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
if (Objects.nonNull(cfg.getString(AWS_S3_REGION_CONFIG))) {
return RegionUtils.getRegion(cfg.getString(AWS_S3_REGION_CONFIG));
} else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) {
return RegionUtils.getRegion(cfg.getString(AWS_S3_REGION));
} else {
return RegionUtils.getRegion(Regions.US_EAST_1.getName());
}
}

public Region getAwsS3RegionV2() {
// we have priority of properties if old one not set or both old and new one set
// the new property value will be selected
Expand All @@ -579,7 +551,7 @@ public Region getAwsS3RegionV2() {
} else if (Objects.nonNull(cfg.getString(AWS_S3_REGION))) {
return Region.of(cfg.getString(AWS_S3_REGION));
} else {
return Region.of(Regions.US_EAST_1.getName());
return Region.of(Region.US_EAST_1.id());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@
import io.aiven.kafka.connect.iam.AwsStsEndpointConfig;
import io.aiven.kafka.connect.iam.AwsStsRole;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.regions.Region;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
@SuppressWarnings({ "PMD.ExcessiveImports", "PMD.TooManyStaticImports" })
public class S3SinkBaseConfig extends SinkCommonConfig {
private final S3ConfigFragment s3ConfigFragment;
Expand Down Expand Up @@ -92,20 +91,16 @@ public AwsStsEndpointConfig getStsEndpointConfig() {
return s3ConfigFragment.getStsEndpointConfig();
}

public AwsClientBuilder.EndpointConfiguration getAwsEndpointConfiguration() {
return s3ConfigFragment.getAwsEndpointConfiguration();
}

public BasicAWSCredentials getAwsCredentials() {
return s3ConfigFragment.getAwsCredentials();
public AwsBasicCredentials getAwsCredentials() {
return s3ConfigFragment.getAwsCredentialsV2();
}

public String getAwsS3EndPoint() {
return s3ConfigFragment.getAwsS3EndPoint();
}

public Region getAwsS3Region() {
return s3ConfigFragment.getAwsS3Region();
return s3ConfigFragment.getAwsS3RegionV2();
}

public String getAwsS3BucketName() {
Expand Down Expand Up @@ -136,7 +131,7 @@ public int getS3RetryBackoffMaxRetries() {
return s3ConfigFragment.getS3RetryBackoffMaxRetries();
}

public AWSCredentialsProvider getCustomCredentialsProvider() {
return s3ConfigFragment.getCustomCredentialsProvider();
public AwsCredentialsProvider getCustomCredentialsProvider() {
return s3ConfigFragment.getCustomCredentialsProviderV2();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,6 @@

import io.aiven.kafka.connect.config.s3.S3ConfigFragment;

import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.services.securitytoken.AWSSecurityTokenService;
import com.amazonaws.services.securitytoken.AWSSecurityTokenServiceClientBuilder;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
Expand All @@ -38,45 +32,6 @@
*/
public class AwsCredentialProviderFactory {

/**
* @deprecated use {@link #getAwsV2Provider(S3ConfigFragment)}
*/
@Deprecated
public AWSCredentialsProvider getProvider(final S3ConfigFragment config) {
if (config.hasAwsStsRole()) {
return getStsProvider(config);
}
final BasicAWSCredentials awsCredentials = config.getAwsCredentials();
if (Objects.isNull(awsCredentials)) {
return config.getCustomCredentialsProvider();
}
return new AWSStaticCredentialsProvider(awsCredentials);
}

/**
* @deprecated use {@link #getV2StsProvider(S3ConfigFragment)}
*/
@Deprecated
private AWSCredentialsProvider getStsProvider(final S3ConfigFragment config) {
final AwsStsRole awsstsRole = config.getStsRole();
final AWSSecurityTokenService sts = securityTokenService(config);
return new STSAssumeRoleSessionCredentialsProvider.Builder(awsstsRole.getArn(), awsstsRole.getSessionName())
.withStsClient(sts)
.withExternalId(awsstsRole.getExternalId())
.withRoleSessionDurationSeconds(awsstsRole.getSessionDurationSeconds())
.build();
}

@Deprecated
private AWSSecurityTokenService securityTokenService(final S3ConfigFragment config) {
if (config.hasStsEndpointConfig()) {
final AWSSecurityTokenServiceClientBuilder stsBuilder = AWSSecurityTokenServiceClientBuilder.standard();
stsBuilder.setEndpointConfiguration(config.getAwsEndpointConfiguration());
return stsBuilder.build();
}
return AWSSecurityTokenServiceClientBuilder.defaultClient();
}

/**
* Gets an AWS V2 credential provider
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,23 @@

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

import org.apache.kafka.common.Configurable;

import io.aiven.kafka.connect.config.s3.S3ConfigFragment;
import io.aiven.kafka.connect.tools.AwsCredentialBaseConfig;

import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.DefaultAWSCredentialsProviderChain;
import com.amazonaws.auth.STSAssumeRoleSessionCredentialsProvider;
import com.amazonaws.regions.Regions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsCredentials;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.identity.spi.AwsCredentialsIdentity;
import software.amazon.awssdk.identity.spi.ResolveIdentityRequest;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sts.auth.StsAssumeRoleCredentialsProvider;

/**
* Tests the Credential provider factory generation of V1 credentials
Expand All @@ -56,13 +59,13 @@ void createsStsCredentialProviderIfSpecified() {
props.put(S3ConfigFragment.AWS_SECRET_ACCESS_KEY_CONFIG, "blah-blah-blah");
props.put(S3ConfigFragment.AWS_STS_ROLE_ARN, "arn:aws:iam::12345678910:role/S3SinkTask");
props.put(S3ConfigFragment.AWS_STS_ROLE_SESSION_NAME, "SESSION_NAME");
props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Regions.US_EAST_1.getName());
props.put(S3ConfigFragment.AWS_S3_REGION_CONFIG, Region.US_EAST_1.id());
props.put(S3ConfigFragment.AWS_STS_CONFIG_ENDPOINT, "https://sts.us-east-1.amazonaws.com");

final var config = new AwsCredentialBaseConfig(props);

final var credentialProvider = factory.getProvider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(STSAssumeRoleSessionCredentialsProvider.class);
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(StsAssumeRoleCredentialsProvider.class);
}

@Test
Expand All @@ -72,47 +75,52 @@ void createStaticCredentialProviderByDefault() {

final var config = new AwsCredentialBaseConfig(props);

final var credentialProvider = factory.getProvider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(AWSStaticCredentialsProvider.class);
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(StaticCredentialsProvider.class);
}

@Test
void createDefaultCredentialsWhenNoCredentialsSpecified() {
final var config = new AwsCredentialBaseConfig(props);

final var credentialProvider = factory.getProvider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(DefaultAWSCredentialsProviderChain.class);
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(DefaultCredentialsProvider.class);
}

@Test
void customCredentialProviderTest() {
props.put(S3ConfigFragment.AWS_CREDENTIALS_PROVIDER_CONFIG, DummyCredentialsProvider.class.getName());
final var config = new AwsCredentialBaseConfig(props);

final var credentialProvider = factory.getProvider(new S3ConfigFragment(config));
final var credentialProvider = factory.getAwsV2Provider(new S3ConfigFragment(config));
assertThat(credentialProvider).isInstanceOf(DummyCredentialsProvider.class);
assertThat(((DummyCredentialsProvider) credentialProvider).configured).isTrue();
}

/**
* A custom V1 credential provider for testing.
*/
public static class DummyCredentialsProvider implements AWSCredentialsProvider, Configurable {
public static class DummyCredentialsProvider implements AwsCredentialsProvider, Configurable {
boolean configured;

@Override
public AWSCredentials getCredentials() {
return null;
public void configure(final Map<String, ?> map) {
configured = true;
}

@Override
public void refresh() {
public AwsCredentials resolveCredentials() {
return null;
}

@Override
public Class<AwsCredentialsIdentity> identityType() {
return AwsCredentialsProvider.super.identityType();
}

@Override
public void configure(final Map<String, ?> map) {
configured = true;
public CompletableFuture<AwsCredentialsIdentity> resolveIdentity(final ResolveIdentityRequest request) {
return AwsCredentialsProvider.super.resolveIdentity(request);
}
}

Expand Down
4 changes: 2 additions & 2 deletions s3-sink-connector/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ dependencies {

implementation(tools.spotbugs.annotations)
implementation(logginglibs.slf4j)
implementation(amazonoldawssdk.s3)
implementation(amazonoldawssdk.sts)
implementation(amazonawssdk.s3)
implementation(amazonawssdk.sts)

testImplementation(apache.commons.io)
testImplementation(testFixtures(project(":commons")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@
import io.aiven.kafka.connect.s3.AivenKafkaConnectS3SinkConnector;
import io.aiven.kafka.connect.s3.testutils.BucketAccessor;

import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -44,6 +39,10 @@
import org.testcontainers.junit.jupiter.Container;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.utility.DockerImageName;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;

@Testcontainers
@SuppressWarnings("PMD.MutableStaticState")
Expand Down Expand Up @@ -76,13 +75,12 @@ static LocalStackContainer createS3Container() {
.withServices(LocalStackContainer.Service.S3);
}

static AmazonS3 createS3Client(final LocalStackContainer localStackContainer) {
return AmazonS3ClientBuilder.standard()
.withEndpointConfiguration(new AwsClientBuilder.EndpointConfiguration(
localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3).toString(),
localStackContainer.getRegion()))
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(
localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
static S3Client createS3Client(final LocalStackContainer localStackContainer) {
return S3Client.builder()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the use of the aws sdk 1.X client for integration tests.

.endpointOverride(localStackContainer.getEndpointOverride(LocalStackContainer.Service.S3))
.region(Region.of(localStackContainer.getRegion()))
.credentialsProvider(StaticCredentialsProvider.create(AwsBasicCredentials
.create(localStackContainer.getAccessKey(), localStackContainer.getSecretKey())))
.build();
}

Expand All @@ -94,9 +92,10 @@ static String topicName(final TestInfo testInfo) {
static void setUpAll() {
s3Prefix = COMMON_PREFIX + ZonedDateTime.now().format(DateTimeFormatter.ISO_LOCAL_DATE_TIME) + "/";

final AmazonS3 s3Client = createS3Client(LOCALSTACK);
s3Endpoint = LOCALSTACK.getEndpoint().toString();
testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME);
try (S3Client s3Client = createS3Client(LOCALSTACK)) {
s3Endpoint = LOCALSTACK.getEndpoint().toString();
testBucketAccessor = new BucketAccessor(s3Client, TEST_BUCKET_NAME);
}
}

@BeforeEach
Expand Down
Loading
Loading