Open
Description
Feature Request / Improvement
I am trying to instantiate the Iceberg Kafka Connect, but the Kafka Cluster has specific ACLs that authorize the users with transactional ids following naming conventions with a specific prefix. I tried to set the property "iceberg.coordinator.transactional.suffix", but did not work properly.
Error Log
[2025-02-19 16:16:55,690] INFO [iceberg-sink-connector|task-1] [Producer clientId=c9e05765-a588-49fe-a894-e377f3319282, transactionalId=committer-txn-be446910-f4bb-4b08-a0e7-3985d8da1dc4-1] Transiting to fatal error state due to org.apache.kafka.common.errors.TransactionalIdAuthorizationException: Transactional Id authorization failed. (org.apache.kafka.clients.producer.internals.TransactionManager:455)
Analysis
Looking at the code in Channel.java the transactional ID is currently constructed by appending the transactional.suffix to the name.
Channel(
String name,
String consumerGroupId,
IcebergSinkConfig config,
KafkaClientFactory clientFactory,
SinkTaskContext context) {
this.controlTopic = config.controlTopic();
this.connectGroupId = config.connectGroupId();
this.context = context;
String transactionalId = name + config.transactionalSuffix();
this.producer = clientFactory.createProducer(transactionalId);
this.consumer = clientFactory.createConsumer(consumerGroupId);
this.admin = clientFactory.createAdmin();
this.producerId = UUID.randomUUID().toString();
}
The constructor of Channel class is invoked by its subclasses CommitterImpl.java and Coordinator.java.
Proposed Changes
- Modify the Channel constructor to include the transactionalId as an explicit parameter:
Channel(
String name,
String consumerGroupId,
String transactionalId,
IcebergSinkConfig config,
KafkaClientFactory clientFactory,
SinkTaskContext context) {
this.controlTopic = config.controlTopic();
this.connectGroupId = config.connectGroupId();
this.context = context;
this.producer = clientFactory.createProducer(transactionalId);
this.consumer = clientFactory.createConsumer(consumerGroupId);
this.admin = clientFactory.createAdmin();
this.producerId = UUID.randomUUID().toString();
}
- Introduce new configuration properties in IcebergSinkConfig.java:
iceberg.coordinator.transactionalId
iceberg.committer.transactionalId
- Update CommitterImpl.java to use the new configuration parameter in the superclass constructor:
CommitterImpl(
SinkTaskContext context,
IcebergSinkConfig config,
KafkaClientFactory clientFactory,
CoordinatorThreadFactory coordinatorThreadFactory) {
// pass transient consumer group ID to which we never commit offsets
super(
"committer",
IcebergSinkConfig.DEFAULT_CONTROL_GROUP_PREFIX + UUID.randomUUID(),
config.committerTransactionalId(),
config,
clientFactory);
- Update Coordinator.java to use the new configuration parameter in the superclass constructor:
public Coordinator(
Catalog catalog,
IcebergSinkConfig config,
Collection<MemberDescription> members,
KafkaClientFactory clientFactory) {
// pass consumer group ID to which we commit low watermark offsets
super("coordinator",
config.controlGroupId() + "-coord",
config.coordinatorTransactionalId(),
config,
clientFactory);
Query engine
Kafka Connect
Willingness to contribute
- I can contribute this improvement/feature independently
- I would be willing to contribute this improvement/feature with guidance from the Iceberg community
- I cannot contribute this improvement/feature at this time