- 
                Notifications
    
You must be signed in to change notification settings  - Fork 41
 
Description
What happened?
We tested v1.1.1 of the tiered storage plugin with an S3 table bucket as the Iceberg object storage. We created an Iceberg topic, produced 1000 messages to it, and could view the table successfully in the query engine (Athena). We then proceeded to consume from it using the Kafka CLI consumer with a command like kafka-console-consumer.sh --bootstrap-server <ip>:9092 --topic people --from-beginning. It would then just infinitely consume (way beyond 1000 messages). Weirdly too, if we produced just 50 messages to the topic, then only 20 would be consumed.
To further verify this, we ran the same test on the provided Iceberg demo here.
cd demo/icebergmake plugindocker compose -f docker-compose.yml upclients/gradlew run -p clientskafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic people --from-beginning- The number of records consumed is NOT 1000 - it might be something like 850, or just keep consuming infinitely.
From trying to replicate the issue, I may get multiple runs in a row where it consumes something like 850 (+/- a bit). Other times, it consumes records infinitely (the number of records depends on when I manually stop the consumer). It seems to be inconsistent in whether it consumes too little or too many records. 
This doesn't happen on say a normal tiered storage topic (e.g. via this demo). Consuming from this behaved as expected.
I had a brief look into the code, and found that there may be some problematic parts in the code. One example is in IcebergRemoteStorageManager.java, specifically fetchLogSegment(...). There is a part .filter(Expressions.greaterThanOrEqual("kafka.batch_byte_offset", remoteLogSegmentMetadata.startOffset())) which compares kafka.batch_byte_offset (a byte) to remoteLogSegmentMetadata.startOffset() (a logical offset - not representing a physical byte).
Any insights would be much appreciated, thanks!
What did you expect to happen?
When consuming from the Iceberg topic (with --from-beginning), we should expect the exact messages present in the topic to be consumed (which should match up with what is inside the Iceberg table according to the query engine).
What else do we need to know?
From my initial testing, I was using:
- S3 tables for storage
 - Kafka 4.0.0
 - Karapace Schema Registry 5.0.0
 - v1.1.1 of Aiven tiered storage plugin
 - broker configs
 
rsm.config.segment.format=iceberg
rsm.config.structure.provider.class=io.aiven.kafka.tieredstorage.iceberg.AvroSchemaRegistryStructureProvider
rsm.config.structure.provider.serde.schema.registry.url=http://127.0.0.1:8086
rsm.config.iceberg.namespace=default
rsm.config.iceberg.catalog.class=org.apache.iceberg.rest.RESTCatalog
rsm.config.iceberg.catalog.uri=https://s3tables.us-east-1.amazonaws.com/iceberg
rsm.config.iceberg.catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO
rsm.config.iceberg.catalog.warehouse=arn:aws:s3tables:....
rsm.config.iceberg.catalog.s3.endpoint=https://s3.us-east-1.amazonaws.com
rsm.config.iceberg.catalog.client.region=us-east-1
rsm.config.iceberg.catalog.rest.sigv4-enabled=true
rsm.config.iceberg.catalog.rest.signing-region=us-east-1
rsm.config.iceberg.catalog.rest.signing-name=s3tables
remote.fetch.max.wait.ms=5000