Skip to content

Conversation

@tpalfy
Copy link
Contributor

@tpalfy tpalfy commented Nov 17, 2025

Summary

NIFI-15226

Tracking

Please complete the following tracking steps prior to pull request creation.

Issue Tracking

Pull Request Tracking

  • Pull Request title starts with Apache NiFi Jira issue number, such as NIFI-00000
  • Pull Request commit message starts with Apache NiFi Jira issue number, as such NIFI-00000

Pull Request Formatting

  • Pull Request based on current revision of the main branch
  • Pull Request refers to a feature branch with one commit containing changes

Verification

Please indicate the verification steps performed prior to pull request creation.

Build

  • Build completed using ./mvnw clean install -P contrib-check
    • JDK 21
    • JDK 25

Licensing

  • New dependencies are compatible with the Apache License 2.0 according to the License Policy
  • New dependencies are documented in applicable LICENSE and NOTICE files

Documentation

  • Documentation formatting appears as expected in rendered files

</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-utils</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency appears to be unused, can it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.nifi.mock.MockComponentLogger is referenced in TestConsumerPartitionsUtil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. In that case, this dependency should be removed and MockComponentLogger should be replaced with MockComponentLog from nifi-mock to avoid referencing framework modules in extension modules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved this class from the older change. But actually we don't need either. We can just mock the logger with Mockito.

Copy link
Contributor

@exceptionfactory exceptionfactory left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The initial build fails on the following tests:

Error:    ConsumeKafkaTest.testVerifyFailed:116 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null
Error:    ConsumeKafkaTest.testVerifySuccessful:99 » NullPointer Cannot invoke "org.apache.nifi.kafka.service.api.consumer.PollingContext.getTopics()" because "this.pollingContext" is null

Comment on lines 608 to 612
consumerServiceToPartitionedPollingContext.clear();

final boolean isExplicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());

if (isExplicitPartitionMapping) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment of the partitions must happen exactly once after the processor is started.
This code is called from onTrigger() which can cause concurrency issues when Concurrent Tasks > 1:

  1. Multiple threads are trying to initialize the consumers in parallel
  2. If there are configured more threads than partitions, then a thread can reinitalize the assignment if there is no available consumer in the pool

Ideally, the assigment should happen in onScheduled(). Isn't it possible?
Also, consumerServiceToPartitionedPollingContext should be cleared in onStopped().

return connectionService.getConsumerService(pollingContext);
}

public int getPartitionCount(final KafkaConnectionService connectionService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
public int getPartitionCount(final KafkaConnectionService connectionService) {
private int getPartitionCount(final KafkaConnectionService connectionService) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of static partition mapping, the consumers should be created eagerly for each partition in availablePartitionedPollingContexts (similar to recreateAssignedConsumers() in the old 2.6 implementation).

Without that, a consumer is created for the first partition, and additional consumers are created for the other partitions only when parallel task execution triggers it. This is because a consumer is always available in the pool until multiple threads request consumers in parallel (which is not guaranteed to happen, e.g. Concurrent Tasks = 1).

private final AtomicInteger activeConsumerCount = new AtomicInteger();

private final Queue<PollingContext> availablePartitionedPollingContexts = new LinkedBlockingQueue<>();
private final Map<KafkaConsumerService, PollingContext> consumerServiceToPartitionedPollingContext = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map can be accessed by parallel threads so it should be ConcurrentHashMap or similar.

Comment on lines 448 to 451
KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
close(service, "Not all partitions are assigned");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can never happen because no consumers have been created so far.

Comment on lines 243 to 250
Integer partition = pollingContext.getPartition();
if (partition != null) {
List<TopicPartition> topicPartitions = pollingContext.getTopics().stream()
.map(topic -> new TopicPartition(topic, partition))
.collect(Collectors.toList());
consumer.assign(topicPartitions);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest moving it to Kafka3ConsumerService's constructor. That way, the assign/subscribe logic would be more straightforward (a single if-else instead of the separate if statements at different points in the code).
The partition id can be passed in the Subscription parameter.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants