Skip to content

Conversation

@tanbt
Copy link
Collaborator

@tanbt tanbt commented Dec 18, 2025

S3 Source connector might failed to source compressed (gzip) avro files from S3 -> Kafka.
This change expects to decompress the input stream before processing it as Avro data stream.

[2025-12-18 10:43:18,773] ERROR [s3_source|task-0] Error trying to open inputStream: Not an Avro data file. (io.aiven.kafka.connect.common.source.input.AvroTransformer:157)
org.apache.avro.InvalidAvroMagicException: Not an Avro data file.
	at org.apache.avro.file.DataFileStream.initialize(DataFileStream.java:111)
	at org.apache.avro.file.DataFileStream.<init>(DataFileStream.java:90)
	at io.aiven.kafka.connect.common.source.input.AvroTransformer$1.inputOpened(AvroTransformer.java:59)
	at io.aiven.kafka.connect.common.source.input.Transformer$StreamSpliterator.tryAdvance(Transformer.java:155)
	at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.lambda$initPartialTraversalState$0(StreamSpliterators.java:297)
	at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.fillBuffer(StreamSpliterators.java:206)
	at java.base/java.util.stream.StreamSpliterators$AbstractWrappingSpliterator.doAdvance(StreamSpliterators.java:161)
	at java.base/java.util.stream.StreamSpliterators$WrappingSpliterator.tryAdvance(StreamSpliterators.java:303)
	at java.base/java.util.Spliterators$1Adapter.hasNext(Spliterators.java:669)
	at io.aiven.kafka.connect.common.source.AbstractSourceRecordIterator.hasNext(AbstractSourceRecordIterator.java:200)
	at io.aiven.kafka.connect.s3.source.S3SourceTask$1.hasNext(S3SourceTask.java:83)
	at org.apache.commons.collections4.iterators.FilterIterator.setNextObject(FilterIterator.java:174)
	at org.apache.commons.collections4.iterators.FilterIterator.hasNext(FilterIterator.java:86)
	at io.aiven.kafka.connect.common.source.AbstractSourceTask.tryAdd(AbstractSourceTask.java:187)
	at io.aiven.kafka.connect.common.source.AbstractSourceTask$2.run(AbstractSourceTask.java:131)
	at java.base/java.lang.Thread.run(Thread.java:1474)

After the changes, I tried again but couldn't reproduce the issue, and the data is available in the destination Kafka.

@tanbt tanbt requested a review from a team as a code owner December 18, 2025 15:26
@tanbt tanbt marked this pull request as draft December 18, 2025 15:30
@tanbt tanbt force-pushed the tanbt/decomporess-before-validate-avro-content branch from 4c800f2 to 1e3080e Compare December 19, 2025 07:43
@tanbt tanbt marked this pull request as ready for review December 19, 2025 08:55
@tanbt tanbt force-pushed the tanbt/decomporess-before-validate-avro-content branch from 1e3080e to 5eaf3b1 Compare December 19, 2025 09:07
protected void inputOpened(final InputStream input) throws IOException {
dataFileStream = new DataFileStream<>(input, datumReader);
InputStream decompressedInput = input;
if (!input.markSupported()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are mark/reset generally not supported by the input stream, or what are the conditions potentially leading to this?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, since BufferedInputStream always support mark/reset, no need to check for markSupported.

decompressedInput.reset();

// GZIP magic bytes: 0x1f 0x8b
if (bytesRead == 2 && magic[0] == (byte) 0x1f && magic[1] == (byte) 0x8b) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the connector is supposed to support other file compression types, e.g. snappy and zstd. wondering if we should handle all possible magic bytes here, or if conditionally instantiating the right type of input stream could be done in a much easier way ie. based on file.compression.type config, wdyt?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using file.compression.type is more explicit, I think this is a better approach than detecting magic bytes of all compression types.

decompressedInput.mark(2);
byte[] magic = new byte[2];
int bytesRead = decompressedInput.read(magic);
decompressedInput.reset();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the variable name decompressedInput sounds wrong at this point, indeed it might be an input that is not compressed at all, or one that will get identified as a compressedInput. since the purpose of this variable is to instantiate a BufferInputStream that supports mark/reset, why not call it just bufferedInput instead?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Renamed now.

Copy link
Contributor

@jclarysse jclarysse left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tanbt for your contribution!
We'd probably need to refine a few things please, see my comments.

@tanbt tanbt force-pushed the tanbt/decomporess-before-validate-avro-content branch from 88ee241 to 85060b0 Compare December 22, 2025 09:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants