Skip to content

Commit 1ee80b8

Browse files
committed
NIFI-15226 Review suggestions
1 parent fc94db2 commit 1ee80b8

File tree

4 files changed

+35
-25
lines changed

4 files changed

+35
-25
lines changed

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/kafka/processors/ConsumeKafka.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,13 @@
7373
import java.util.ArrayList;
7474
import java.util.Collection;
7575
import java.util.Collections;
76-
import java.util.HashMap;
7776
import java.util.Iterator;
7877
import java.util.List;
7978
import java.util.Map;
8079
import java.util.Optional;
8180
import java.util.Queue;
8281
import java.util.Set;
82+
import java.util.concurrent.ConcurrentHashMap;
8383
import java.util.concurrent.LinkedBlockingQueue;
8484
import java.util.concurrent.TimeUnit;
8585
import java.util.concurrent.atomic.AtomicInteger;
@@ -343,7 +343,7 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
343343
private final AtomicInteger activeConsumerCount = new AtomicInteger();
344344

345345
private final Queue<PollingContext> availablePartitionedPollingContexts = new LinkedBlockingQueue<>();
346-
private final Map<KafkaConsumerService, PollingContext> consumerServiceToPartitionedPollingContext = new HashMap<>();
346+
private final Map<KafkaConsumerService, PollingContext> consumerServiceToPartitionedPollingContext = new ConcurrentHashMap<>();
347347

348348
@Override
349349
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -445,11 +445,6 @@ public void onScheduled(final ProcessContext context) {
445445
if (partitionCount != numAssignedPartitions) {
446446
context.yield();
447447

448-
KafkaConsumerService service;
449-
while ((service = consumerServices.poll()) != null) {
450-
close(service, "Not all partitions are assigned");
451-
}
452-
453448
throw new ProcessException("Illegal Partition Assignment: There are "
454449
+ numAssignedPartitions + " partitions statically assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names,"
455450
+ " but the Kafka topic(s) have " + partitionCount + " partitions");
@@ -463,9 +458,9 @@ public void onScheduled(final ProcessContext context) {
463458
}
464459

465460
for (int partition : assignedPartitions) {
466-
PollingContext partitionedPollingContext = createPollingContext(context, partition);
467-
468-
availablePartitionedPollingContexts.add(partitionedPollingContext);
461+
final PollingContext partitionedPollingContext = createPollingContext(context, partition);
462+
final KafkaConsumerService partitionedConsumerService = obtainPartitionedConsumerService(partitionedPollingContext);
463+
consumerServices.add(partitionedConsumerService);
469464
}
470465
}
471466
}
@@ -658,9 +653,7 @@ private KafkaConsumerService getConsumerService(final ProcessContext context) {
658653

659654
getLogger().info("No Partitioned Kafka Consumer Service available; creating a new one.");
660655

661-
final KafkaConsumerService partitionedConsumerService = connectionService.getConsumerService(partitionedPollingContext);
662-
663-
consumerServiceToPartitionedPollingContext.put(partitionedConsumerService, partitionedPollingContext);
656+
final KafkaConsumerService partitionedConsumerService = obtainPartitionedConsumerService(partitionedPollingContext);
664657

665658
return partitionedConsumerService;
666659
} else {
@@ -701,6 +694,14 @@ private int getPartitionCount(final KafkaConnectionService connectionService) {
701694
return partitionsEachTopic;
702695
}
703696

697+
private KafkaConsumerService obtainPartitionedConsumerService(PollingContext partitionedPollingContext) {
698+
final KafkaConsumerService partitionedConsumerService = connectionService.getConsumerService(partitionedPollingContext);
699+
700+
consumerServiceToPartitionedPollingContext.put(partitionedConsumerService, partitionedPollingContext);
701+
702+
return partitionedConsumerService;
703+
}
704+
704705
private int getMaxConsumerCount() {
705706
return maxConsumerCount;
706707
}

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/Kafka3ConnectionService.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,6 @@ public KafkaConsumerService getConsumerService(final PollingContext pollingConte
240240
final ByteArrayDeserializer deserializer = new ByteArrayDeserializer();
241241
final Consumer<byte[], byte[]> consumer = new KafkaConsumer<>(properties, deserializer, deserializer);
242242

243-
Integer partition = pollingContext.getPartition();
244-
if (partition != null) {
245-
List<TopicPartition> topicPartitions = pollingContext.getTopics().stream()
246-
.map(topic -> new TopicPartition(topic, partition))
247-
.collect(Collectors.toList());
248-
consumer.assign(topicPartitions);
249-
}
250-
251243
return new Kafka3ConsumerService(getLogger(), consumer, subscription);
252244
}
253245

@@ -258,7 +250,7 @@ private Subscription createSubscription(final PollingContext pollingContext) {
258250

259251
return topicPatternFound
260252
.map(pattern -> new Subscription(groupId, pattern, autoOffsetReset))
261-
.orElseGet(() -> new Subscription(groupId, pollingContext.getTopics(), autoOffsetReset));
253+
.orElseGet(() -> new Subscription(groupId, pollingContext.getPartition(), pollingContext.getTopics(), autoOffsetReset));
262254
}
263255

264256
@Override

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Kafka3ConsumerService.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public Kafka3ConsumerService(final ComponentLog componentLog, final Consumer<byt
6262
this.consumer = consumer;
6363
this.subscription = subscription;
6464

65-
if (consumer.assignment().isEmpty()) {
65+
final Integer partition = subscription.getPartition();
66+
if (partition == null) {
6667
final Optional<Pattern> topicPatternFound = subscription.getTopicPattern();
6768
if (topicPatternFound.isPresent()) {
6869
final Pattern topicPattern = topicPatternFound.get();
@@ -71,6 +72,11 @@ public Kafka3ConsumerService(final ComponentLog componentLog, final Consumer<byt
7172
final Collection<String> topics = subscription.getTopics();
7273
consumer.subscribe(topics, this);
7374
}
75+
} else {
76+
List<TopicPartition> topicPartitions = subscription.getTopics().stream()
77+
.map(topic -> new TopicPartition(topic, partition))
78+
.collect(Collectors.toList());
79+
consumer.assign(topicPartitions);
7480
}
7581
}
7682

nifi-extension-bundles/nifi-kafka-bundle/nifi-kafka-service-shared/src/main/java/org/apache/nifi/kafka/service/consumer/Subscription.java

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,19 +31,22 @@ public class Subscription {
3131

3232
private final String groupId;
3333
private final Collection<String> topics;
34+
private final Integer partition;
3435
private final Pattern topicPattern;
3536
private final AutoOffsetReset autoOffsetReset;
3637

37-
public Subscription(final String groupId, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
38+
public Subscription(final String groupId, final Integer partition, final Collection<String> topics, final AutoOffsetReset autoOffsetReset) {
3839
this.groupId = Objects.requireNonNull(groupId, "Group ID required");
3940
this.topics = Collections.unmodifiableCollection(Objects.requireNonNull(topics, "Topics required"));
41+
this.partition = partition;
4042
this.topicPattern = null;
4143
this.autoOffsetReset = Objects.requireNonNull(autoOffsetReset, "Auto Offset Reset required");
4244
}
4345

4446
public Subscription(final String groupId, final Pattern topicPattern, final AutoOffsetReset autoOffsetReset) {
4547
this.groupId = Objects.requireNonNull(groupId, "Group ID required");
4648
this.topics = Collections.emptyList();
49+
this.partition = null;
4750
this.topicPattern = Objects.requireNonNull(topicPattern, "Topic Pattern required");
4851
this.autoOffsetReset = Objects.requireNonNull(autoOffsetReset, "Auto Offset Reset required");
4952
}
@@ -56,6 +59,10 @@ public Collection<String> getTopics() {
5659
return topics;
5760
}
5861

62+
public Integer getPartition() {
63+
return partition;
64+
}
65+
5966
public Optional<Pattern> getTopicPattern() {
6067
return Optional.ofNullable(topicPattern);
6168
}
@@ -94,7 +101,11 @@ public String toString() {
94101
}
95102

96103
private boolean isTopicSubscriptionMatched(final Subscription subscription) {
97-
if (topics.size() == subscription.topics.size() && topics.containsAll(subscription.topics)) {
104+
if (
105+
topics.size() == subscription.topics.size()
106+
&& topics.containsAll(subscription.topics)
107+
&& Objects.equals(this.partition, subscription.partition)
108+
) {
98109
final String regexLeft = (topicPattern == null ? null : topicPattern.pattern());
99110
final String regexRight = (subscription.topicPattern == null ? null : subscription.topicPattern.pattern());
100111
return Objects.equals(regexLeft, regexRight);

0 commit comments

Comments
 (0)