Skip to content

Feat: S3-Sink upgrade - Upgrade S3 Sink to AWS 2.0 SDK #422

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: feat-S3-Sink-upgrade
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ gradle-wrapper.properties
# Build files
build/
/azure-sink-connector/.jqwik-database

**/out/
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package io.aiven.kafka.connect.common.config;

import java.util.Arrays;
import java.util.stream.Collectors;

/**
* Definitions and conversions between IEC 80000-13:2025 units.
*
* @see <a href='https://en.wikipedia.org/wiki/Binary_prefix'>Binary Prefixes</a>
* @see <a href='https://iec.ch/prefixes-binary-multiples'>Prefixes for binary multiples</a>
*/
public enum DataStorageUnit {
BYTES("B") {
public long toBytes(long d) {
return d;
}

public long toKibibytes(long d) {
return (d / 1024L);
}

public long toMebibytes(long d) {
return (d / (1024L * 1024));
}

public long toGibibytes(long d) {
return (d / (1024L * 1024 * 1024));
}

public long convert(long source, DataStorageUnit sourceUnit) {
return sourceUnit.toBytes(source);
}
},
KIBIBYTES("KiB") {
public long toBytes(long d) {
return x(d, 1024L, (MAX / 1024L));
}

public long toKibibytes(long d) {
return d;
}

public long toMebibytes(long d) {
return (d / 1024L);
}

public long toGibibytes(long d) {
return (d / (1024L * 1024));
}

public long convert(long source, DataStorageUnit sourceUnit) {
return sourceUnit.toKibibytes(source);
}
},
MEBIBYTES("MiB") {
public long toBytes(long d) {
return x(d, (1024L * 1024), MAX / (1024L * 1024));
}

public long toKibibytes(long d) {
return x(d, 1024L, (MAX / 1024L));
}

public long toMebibytes(long d) {
return d;
}

public long toGibibytes(long d) {
return (d / 1024L);
}

public long convert(long source, DataStorageUnit sourceUnit) {
return sourceUnit.toMebibytes(source);
}
},
GIBIBYTES("GiB") {
public long toBytes(long d) {
return x(d, (1024L * 1024 * 1024), MAX / (1024L * 1024 * 1024));
}

public long toKibibytes(long d) {
return x(d, (1024L * 1024), MAX / (1024L * 1024));
}

public long toMebibytes(long d) {
return x(d, 1024L, (MAX / 1024L));
}

public long toGibibytes(long d) {
return d;
}

public long convert(long source, DataStorageUnit sourceUnit) {
return sourceUnit.toGibibytes(source);
}
};

/**
* Scale d by m, checking for overflow. This has a short name to make above code more readable.
*/
static long x(long d, long m, long over) {
assert (over > 0) && (over < (MAX - 1L)) && (over == (MAX / m));

if (d > over)
return Long.MAX_VALUE;
return Math.multiplyExact(d, m);
}

/**
* @param symbol the unit symbol
* @return the memory unit corresponding to the given symbol
*/
public static DataStorageUnit fromSymbol(String symbol) {
for (DataStorageUnit value : values()) {
if (value.symbol.equalsIgnoreCase(symbol))
return value;
}
throw new IllegalArgumentException(String.format("Unsupported data storage unit: %s. Supported units are: %s",
symbol, Arrays.stream(values())
.map(u -> u.symbol)
.collect(Collectors.joining(", "))));
}

static final long MAX = Long.MAX_VALUE;

/**
* The unit symbol
*/
private final String symbol;

DataStorageUnit(String symbol) {
this.symbol = symbol;
}

public long toBytes(long d) {
throw new AbstractMethodError();
}

public long toKibibytes(long d) {
throw new AbstractMethodError();
}

public long toMebibytes(long d) {
throw new AbstractMethodError();
}

public long toGibibytes(long d) {
throw new AbstractMethodError();
}

public long convert(long source, DataStorageUnit sourceUnit) {
throw new AbstractMethodError();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
public final class FileNameFragment extends ConfigFragment {

// package private so that testing can access.
static final String GROUP_FILE = "File";
public static final String GROUP_FILE = "File";
static final String FILE_COMPRESSION_TYPE_CONFIG = "file.compression.type";
static final String FILE_MAX_RECORDS = "file.max.records";
static final String FILE_NAME_TIMESTAMP_TIMEZONE = "file.name.timestamp.timezone";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,19 @@ public class SinkCommonConfig extends CommonConfig {
* OutputFormatFragment to handle Output format base configuration queries.
*/
protected final OutputFormatFragment outputFormatFragment;
/**
* CompressionFragment to handle compression options.
*/
protected final CompressionFragment compressionFragment;


@SuppressFBWarnings("CT_CONSTRUCTOR_THROW")
public SinkCommonConfig(ConfigDef definition, Map<?, ?> originals) { // NOPMD
super(definition, originals);
// Construct FileNameFragment
fileNameFragment = new FileNameFragment(this);
outputFormatFragment = new OutputFormatFragment(this);
compressionFragment = new CompressionFragment(this);
// TODO: calls getOutputFields, can be overridden in subclasses.
validate(); // NOPMD ConstructorCallsOverridableMethod
}
Expand All @@ -55,6 +61,12 @@ private void validate() {
fileNameFragment.validate();
}

/**
* @deprecated use {@link OutputFormatFragment#update(ConfigDef, OutputFieldType)}
* @param configDef the configuration to update
* @param defaultFieldType the default field type
*/
@Deprecated
protected static void addOutputFieldsFormatConfigGroup(final ConfigDef configDef,
final OutputFieldType defaultFieldType) {
OutputFormatFragment.update(configDef, defaultFieldType);
Expand All @@ -70,7 +82,7 @@ protected static void addCompressionTypeConfig(final ConfigDef configDef,
}

public CompressionType getCompressionType() {
return new CompressionFragment(this).getCompressionType();
return compressionFragment.getCompressionType();
}

public Boolean envelopeEnabled() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package io.aiven.kafka.connect.common.utils;

public final class Size {
public static final long KB = 1024;
public static final long MB = KB * 1024;
public static final long GB = MB * 1024;
public static final long TB = GB * 1024L;

public static long ofKB(final int kb) {
return kb * KB;
}

public static long ofMB(final int mb) {
return mb * MB;
}
public static long ofGB(final int gb) {
return gb * GB;
}
public static long ofTB(final int tb) {
return tb * TB;
}

public static long toKB(final long size) {
return size * KB;
}

public static int toMB(final long size) {
return (int) (size / MB);
}

public static int toGB(final long size) {
return (int) (size / GB);
}

public static int toTB(final long size) {
return (int) (size / TB);
}

}
5 changes: 1 addition & 4 deletions s3-commons/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,9 @@

plugins { id("aiven-apache-kafka-connectors-all.java-conventions") }

val amazonS3Version by extra("1.12.777")
val amazonSTSVersion by extra("1.12.777")

dependencies {
implementation("com.amazonaws:aws-java-sdk-s3:$amazonS3Version")
implementation("com.amazonaws:aws-java-sdk-sts:$amazonSTSVersion")
implementation(amazonawssdk.s3)
implementation(amazonawssdk.authentication)
implementation(amazonawssdk.sts)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package io.aiven.kafka.connect.config.s3;
/*
This class is based on code from com.amazonaws.services.s3.internal.BucketNameValidator
with modifications for use within AWS V2 client.
*/
/*
* Copyright 2010-2025 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/


import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;

import java.util.Optional;
import java.util.regex.Pattern;

/**
* Utilities for working with Amazon S3 bucket names, such as validation and
* checked to see if they are compatible with DNS addressing.
*/
public class BucketNameValidator implements ConfigDef.Validator {
private static final int MIN_BUCKET_NAME_LENGTH = 3;
private static final int MAX_BUCKET_NAME_LENGTH = 63;

private static final Pattern ipAddressPattern = Pattern.compile("(\\d+\\.){3}\\d+");


@Override
public void ensureValid(final String name, final Object value) {
if (value != null) {
isValidV2BucketName((String) value).ifPresent(msg -> {
throw new ConfigException("Illegal bucket name: " + msg);
});
}
}

/**
* Validate whether the given input is a valid bucket name. If throwOnError
* is true, throw an IllegalArgumentException if validation fails. If
* false, simply return 'false'.
*
* @param bucketName the name of the bucket
* @return Optional error message or empty if no issue.
*/
public Optional<String> isValidV2BucketName(final String bucketName) {

if (bucketName == null) {
return Optional.of("Bucket name cannot be null");
}

if (bucketName.length() < MIN_BUCKET_NAME_LENGTH ||
bucketName.length() > MAX_BUCKET_NAME_LENGTH) {

return Optional.of("Bucket name should be between " + MIN_BUCKET_NAME_LENGTH + " and " + MAX_BUCKET_NAME_LENGTH +" characters long"
);
}

if (ipAddressPattern.matcher(bucketName).matches()) {
return Optional.of("Bucket name must not be formatted as an IP Address"
);
}

char previous = '\0';

for (int i = 0; i < bucketName.length(); ++i) {
char next = bucketName.charAt(i);

if (next >= 'A' && next <= 'Z') {
return Optional.of("Bucket name should not contain uppercase characters"
);
}

if (next == ' ' || next == '\t' || next == '\r' || next == '\n') {
return Optional.of("Bucket name should not contain white space"
);
}

if (next == '.') {
if (previous == '\0') {
return Optional.of("Bucket name should not begin with a period"
);
}
if (previous == '.') {
return Optional.of("Bucket name should not contain two adjacent periods"
);
}
if (previous == '-') {
return Optional.of("Bucket name should not contain dashes next to periods"
);
}
} else if (next == '-') {
if (previous == '.') {
return Optional.of("Bucket name should not contain dashes next to periods"
);
}
if (previous == '\0') {
return Optional.of("Bucket name should not begin with a '-'"
);
}
} else if ((next < '0')
|| (next > '9' && next < 'a')
|| (next > 'z')) {

return Optional.of("Bucket name should not contain '" + next + "'"
);
}

previous = next;
}

if (previous == '.' || previous == '-') {
return Optional.of("Bucket name should not end with '-' or '.'"
);
}

return Optional.empty();
}
}
Loading