Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@
<artifactId>nifi-mock-record-utils</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-framework-nar-utils</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This dependency appears to be unused, can it be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.nifi.mock.MockComponentLogger is referenced in TestConsumerPartitionsUtil

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying. In that case, this dependency should be removed and MockComponentLogger should be replaced with MockComponentLog from nifi-mock to avoid referencing framework modules in extension modules.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I preserved this class from the older change. But actually we don't need either. We can just mock the logger with Mockito.

<version>2.7.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-services</artifactId>
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of static partition mapping, the consumers should be created eagerly for each partition in availablePartitionedPollingContexts (similar to recreateAssignedConsumers() in the old 2.6 implementation).

Without that, a consumer is created for the first partition, and additional consumers are created for the other partitions only when parallel task execution triggers it. This is because a consumer is always available in the pool until multiple threads request consumers in parallel (which is not guaranteed to happen, e.g. Concurrent Tasks = 1).

Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kafka.processors.common.KafkaUtils;
import org.apache.nifi.kafka.processors.consumer.ConsumerPartitionsUtil;
import org.apache.nifi.kafka.processors.consumer.OffsetTracker;
import org.apache.nifi.kafka.processors.consumer.ProcessingStrategy;
import org.apache.nifi.kafka.processors.consumer.bundle.ByteRecordBundler;
Expand Down Expand Up @@ -63,14 +66,18 @@
import org.apache.nifi.util.StringUtils;

import java.io.IOException;
import java.net.UnknownHostException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -334,6 +341,8 @@ public class ConsumeKafka extends AbstractProcessor implements VerifiableProcess
private final Queue<KafkaConsumerService> consumerServices = new LinkedBlockingQueue<>();
private final AtomicInteger activeConsumerCount = new AtomicInteger();

private final Map<KafkaConsumerService, PollingContext> consumerServiceToPartitionedPollingContext = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The map can be accessed by parallel threads so it should be ConcurrentHashMap or similar.


@Override
public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
Expand All @@ -351,6 +360,51 @@ public void onPropertyModified(final PropertyDescriptor descriptor, final String
}
}

private static final String PARTITIONS_PROPERTY_PREFIX = "partitions";
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
.description("Specifies from which partitions to consume on each host in the cluster in the form of 'partitions.host_name=partition_1[,partition_2...]'.")
.name(propertyDescriptorName)
.addValidator((subject, input, context) -> {
final ValidationResult.Builder builder = new ValidationResult.Builder();
builder.subject(subject);

if (subject.startsWith(PARTITIONS_PROPERTY_PREFIX)) {
builder.valid(true);
}

return builder.build();
})
.dynamic(true)
.expressionLanguageSupported(ExpressionLanguageScope.ENVIRONMENT)
.build();
}

@Override
protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
final Collection<ValidationResult> validationResults = new ArrayList<>();

final ValidationResult consumerPartitionsResult = ConsumerPartitionsUtil.validateConsumePartitions(validationContext.getAllProperties());
validationResults.add(consumerPartitionsResult);

final boolean explicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(validationContext.getAllProperties());
if (explicitPartitionMapping) {
final String topicType = validationContext.getProperty(TOPIC_FORMAT).getValue();
if (TOPIC_PATTERN.getValue().equals(topicType)) {
validationResults.add(new ValidationResult.Builder()
.subject(TOPIC_FORMAT.getDisplayName())
.input(TOPIC_PATTERN.getDisplayName())
.valid(false)
.explanation("It is not valid to explicitly assign topic partitions and also using a Topic Pattern. "
+ "Topic Partitions may be assigned only if explicitly specifying topic names also.")
.build());
}
}

return validationResults;
}

@OnScheduled
public void onScheduled(final ProcessContext context) {
pollingContext = createPollingContext(context);
Expand Down Expand Up @@ -395,6 +449,9 @@ public void onStopped() {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
final KafkaConsumerService consumerService = getConsumerService(context);
final PollingContext pollingContext = Optional.ofNullable(consumerServiceToPartitionedPollingContext.get(consumerService))
.orElse(this.pollingContext);

if (consumerService == null) {
getLogger().debug("No Kafka Consumer Service available; will yield and return immediately");
context.yield();
Expand Down Expand Up @@ -511,25 +568,31 @@ private void close(final KafkaConsumerService consumerService, final String reas
public List<ConfigVerificationResult> verify(final ProcessContext context, final ComponentLog verificationLogger, final Map<String, String> attributes) {
final List<ConfigVerificationResult> verificationResults = new ArrayList<>();

final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
final PollingContext pollingContext = createPollingContext(context);
final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext);

final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder()
.verificationStepName("Verify Topic Partitions");

try {
final List<PartitionState> partitionStates = consumerService.getPartitionStates();
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics()));
} catch (final Exception e) {
getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e);
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e));
final Collection<String> topics = pollingContext.getTopics();

if (!topics.isEmpty()) {
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
final PollingContext pollingContext = createPollingContext(context);
try (final KafkaConsumerService consumerService = connectionService.getConsumerService(pollingContext)) {
final ConfigVerificationResult.Builder verificationPartitions = new ConfigVerificationResult.Builder()
.verificationStepName("Verify Topic Partitions");

try {
final List<PartitionState> partitionStates = consumerService.getPartitionStatesByTopic().values().stream().findFirst().orElse(Collections.emptyList());
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL)
.explanation(String.format("Partitions [%d] found for Topics %s", partitionStates.size(), pollingContext.getTopics()));
} catch (final Exception e) {
getLogger().error("Topics {} Partition verification failed", pollingContext.getTopics(), e);
verificationPartitions
.outcome(ConfigVerificationResult.Outcome.FAILED)
.explanation(String.format("Topics %s Partition access failed: %s", pollingContext.getTopics(), e));
}
verificationResults.add(verificationPartitions.build());
} catch (IOException e) {
getLogger().warn("Couldn't close KafkaConsumerService after verification.", e);
}
}
verificationResults.add(verificationPartitions.build());

return verificationResults;
}
Expand All @@ -540,6 +603,47 @@ private KafkaConsumerService getConsumerService(final ProcessContext context) {
return consumerService;
}

final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);

consumerServiceToPartitionedPollingContext.clear();

final boolean isExplicitPartitionMapping = ConsumerPartitionsUtil.isPartitionAssignmentExplicit(context.getAllProperties());

if (isExplicitPartitionMapping) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assignment of the partitions must happen exactly once after the processor is started.
This code is called from onTrigger() which can cause concurrency issues when Concurrent Tasks > 1:

  1. Multiple threads are trying to initialize the consumers in parallel
  2. If there are configured more threads than partitions, then a thread can reinitalize the assignment if there is no available consumer in the pool

Ideally, the assigment should happen in onScheduled(). Isn't it possible?
Also, consumerServiceToPartitionedPollingContext should be cleared in onStopped().

final int[] assignedPartitions;
try {
assignedPartitions = ConsumerPartitionsUtil.getPartitionsForHost(context.getAllProperties(), getLogger());
} catch (final UnknownHostException uhe) {
throw new ProcessException("Could not determine localhost's hostname", uhe);
}

for (int partition : assignedPartitions) {
PollingContext partitionedPollingContext = createPollingContext(context, partition);

KafkaConsumerService partitionedConsumerService = connectionService.getConsumerService(partitionedPollingContext);
consumerServices.offer(partitionedConsumerService);
consumerServiceToPartitionedPollingContext.put(partitionedConsumerService, partitionedPollingContext);
}

final int numAssignedPartitions = ConsumerPartitionsUtil.getPartitionAssignmentCount(context.getAllProperties());
final int partitionCount = getPartitionCount(connectionService);

if (partitionCount != numAssignedPartitions) {
context.yield();

KafkaConsumerService service;
while ((service = consumerServices.poll()) != null) {
close(service, "Not all partitions are assigned");
}

throw new ProcessException("Illegal Partition Assignment: There are "
+ numAssignedPartitions + " partitions statically assigned using the " + PARTITIONS_PROPERTY_PREFIX + ".* property names,"
+ " but the Kafka topic(s) have " + partitionCount + " partitions");
}

return consumerServices.poll();
}

final int activeCount = activeConsumerCount.incrementAndGet();
if (activeCount > getMaxConsumerCount()) {
getLogger().trace("No Kafka Consumer Service available; have already reached max count of {} so will not create a new one", getMaxConsumerCount());
Expand All @@ -548,10 +652,34 @@ private KafkaConsumerService getConsumerService(final ProcessContext context) {
}

getLogger().info("No Kafka Consumer Service available; creating a new one. Active count: {}", activeCount);
final KafkaConnectionService connectionService = context.getProperty(CONNECTION_SERVICE).asControllerService(KafkaConnectionService.class);
return connectionService.getConsumerService(pollingContext);
}

public int getPartitionCount(final KafkaConnectionService connectionService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor:

Suggested change
public int getPartitionCount(final KafkaConnectionService connectionService) {
private int getPartitionCount(final KafkaConnectionService connectionService) {

Collection<String> topics = this.pollingContext.getTopics();

if (topics.isEmpty()) {
return -1;
}

int partitionsEachTopic = 0;
try (KafkaConsumerService kafkaConsumerService = connectionService.getConsumerService(this.pollingContext)) {
Map<String, List<PartitionState>> topicToPartitionStates = kafkaConsumerService.getPartitionStatesByTopic();
for (List<PartitionState> partitionStatesForTopic : topicToPartitionStates.values()) {
final int partitionsThisTopic = partitionStatesForTopic.size();
if (partitionsEachTopic != 0 && partitionsThisTopic != partitionsEachTopic) {
throw new IllegalStateException("The specific topic names do not have the same number of partitions");
}

partitionsEachTopic = partitionsThisTopic;
}
} catch (IOException e) {
getLogger().warn("Couldn't close KafkaConsumerService after partition assignment check.", e);
}

return partitionsEachTopic;
}

private int getMaxConsumerCount() {
return maxConsumerCount;
}
Expand Down Expand Up @@ -617,6 +745,10 @@ private void processInputFlowFile(final ProcessSession session, final OffsetTrac
}

private PollingContext createPollingContext(final ProcessContext context) {
return createPollingContext(context, null);
}

private PollingContext createPollingContext(final ProcessContext context, Integer partition) {
final String groupId = context.getProperty(GROUP_ID).getValue();
final String offsetReset = context.getProperty(AUTO_OFFSET_RESET).getValue();
final AutoOffsetReset autoOffsetReset = AutoOffsetReset.valueOf(offsetReset.toUpperCase());
Expand All @@ -629,7 +761,7 @@ private PollingContext createPollingContext(final ProcessContext context) {
pollingContext = new PollingContext(groupId, topicPattern, autoOffsetReset);
} else if (topicFormat.equals(TOPIC_NAME.getValue())) {
final Collection<String> topicList = KafkaUtils.toTopicList(topics);
pollingContext = new PollingContext(groupId, topicList, autoOffsetReset);
pollingContext = new PollingContext(groupId, topicList, autoOffsetReset, partition);
} else {
throw new ProcessException(String.format("Topic Format [%s] not supported", topicFormat));
}
Expand Down
Loading
Loading