Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case of static partition mapping, the consumers should be created eagerly for each partition in availablePartitionedPollingContexts (similar to recreateAssignedConsumers() in the old 2.6 implementation).

Without that, a consumer is created for the first partition, and additional consumers are created for the other partitions only when parallel task execution triggers it. This is because a consumer is always available in the pool until multiple threads request consumers in parallel (which is not guaranteed to happen, e.g. Concurrent Tasks = 1).

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.nifi.kafka.processors.consumer;

import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.logging.ComponentLog;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class ConsumerPartitionsUtil {
public static final String PARTITION_PROPERTY_NAME_PREFIX = "partitions.";

public static int[] getPartitionsForHost(final Map<String, String> properties, final ComponentLog logger) throws UnknownHostException {
final Map<String, String> hostnameToPartitionString = mapHostnamesToPartitionStrings(properties);
final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionString);

if (partitionsByHost.isEmpty()) {
// Explicit partitioning is not enabled.
logger.debug("No explicit Consumer Partitions have been declared.");
return null;
}

logger.info("Found the following mapping of hosts to partitions: {}", hostnameToPartitionString);

// Determine the partitions based on hostname/IP.
int[] partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
if (partitionsForThisHost == null) {
throw new IllegalArgumentException("Could not find a partition mapping for host " + InetAddress.getLocalHost().getCanonicalHostName());
}

return partitionsForThisHost;
}

private static Map<String, int[]> mapPartitionValueToIntArrays(final Map<String, String> partitionValues) {
final Map<String, int[]> partitionsByHost = new HashMap<>();
for (final Map.Entry<String, String> entry : partitionValues.entrySet()) {
final String host = entry.getKey();
final int[] partitions = parsePartitions(host, entry.getValue());
partitionsByHost.put(host, partitions);
}

return partitionsByHost;
}

private static int[] getPartitionsForThisHost(final Map<String, int[]> partitionsByHost) throws UnknownHostException {
// Determine the partitions based on hostname/IP.
final InetAddress localhost = InetAddress.getLocalHost();
int[] partitionsForThisHost = partitionsByHost.get(localhost.getCanonicalHostName());
if (partitionsForThisHost != null) {
return partitionsForThisHost;
}

partitionsForThisHost = partitionsByHost.get(localhost.getHostName());
if (partitionsForThisHost != null) {
return partitionsForThisHost;
}

return partitionsByHost.get(localhost.getHostAddress());
}

private static Map<String, String> mapHostnamesToPartitionStrings(final Map<String, String> properties) {
final Map<String, String> hostnameToPartitionString = new HashMap<>();
for (final Map.Entry<String, String> entry : properties.entrySet()) {
final String propertyName = entry.getKey();
if (!propertyName.startsWith(PARTITION_PROPERTY_NAME_PREFIX)) {
continue;
}

if (propertyName.length() <= PARTITION_PROPERTY_NAME_PREFIX.length()) {
continue;
}

final String propertyNameAfterPrefix = propertyName.substring(PARTITION_PROPERTY_NAME_PREFIX.length());
hostnameToPartitionString.put(propertyNameAfterPrefix, entry.getValue());
}

return hostnameToPartitionString;
}

private static int[] parsePartitions(final String hostname, final String propertyValue) {
final String[] splits = propertyValue.split(",");
final List<Integer> partitionList = new ArrayList<>();
for (final String split : splits) {
if (split.isBlank()) {
continue;
}

try {
final int partition = Integer.parseInt(split.trim());
if (partition < 0) {
throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is negative");
}

partitionList.add(partition);
} catch (final NumberFormatException nfe) {
throw new IllegalArgumentException("Found invalid value for the partitions for hostname " + hostname + ": " + split + " is not an integer");
}
}

// Map out List<Integer> to int[]
return partitionList.stream().mapToInt(Integer::intValue).toArray();
}

public static ValidationResult validateConsumePartitions(final Map<String, String> properties) {
final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
if (hostnameToPartitionMapping.isEmpty()) {
// Partitions are not being explicitly assigned.
return new ValidationResult.Builder().valid(true).build();
}

final Set<Integer> partitionsClaimed = new HashSet<>();
final Set<Integer> duplicatePartitions = new HashSet<>();
for (final Map.Entry<String, String> entry : hostnameToPartitionMapping.entrySet()) {
final int[] partitions = parsePartitions(entry.getKey(), entry.getValue());
for (final int partition : partitions) {
final boolean added = partitionsClaimed.add(partition);
if (!added) {
duplicatePartitions.add(partition);
}
}
}

final List<Integer> partitionsMissing = new ArrayList<>();
for (int i = 0; i < partitionsClaimed.size(); i++) {
if (!partitionsClaimed.contains(i)) {
partitionsMissing.add(i);
}
}

if (!partitionsMissing.isEmpty()) {
return new ValidationResult.Builder()
.subject("Partitions")
.input(partitionsClaimed.toString())
.valid(false)
.explanation("The following partitions were not mapped to any node: " + partitionsMissing.toString())
.build();
}

if (!duplicatePartitions.isEmpty()) {
return new ValidationResult.Builder()
.subject("Partitions")
.input(partitionsClaimed.toString())
.valid(false)
.explanation("The following partitions were mapped to multiple nodes: " + duplicatePartitions.toString())
.build();
}

final Map<String, int[]> partitionsByHost = mapPartitionValueToIntArrays(hostnameToPartitionMapping);
final int[] partitionsForThisHost;
try {
partitionsForThisHost = getPartitionsForThisHost(partitionsByHost);
} catch (UnknownHostException e) {
return new ValidationResult.Builder()
.valid(false)
.subject("Partition Assignment")
.explanation("Unable to determine hostname of localhost")
.build();
}

if (partitionsForThisHost == null) {
return new ValidationResult.Builder()
.subject("Partition Assignment")
.valid(false)
.explanation("No assignment was given for this host")
.build();
}

return new ValidationResult.Builder().valid(true).build();
}

public static boolean isPartitionAssignmentExplicit(final Map<String, String> properties) {
final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
return !hostnameToPartitionMapping.isEmpty();
}

public static int getPartitionAssignmentCount(final Map<String, String> properties) {
final Map<String, String> hostnameToPartitionMapping = mapHostnamesToPartitionStrings(properties);
final Map<String, int[]> partitions = mapPartitionValueToIntArrays(hostnameToPartitionMapping);

int count = 0;
for (final int[] partitionArray : partitions.values()) {
count += partitionArray.length;
}

return count;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public PollingSummary getPollingSummary(final PollingContext pollingContext) {
pollingContext.getAutoOffsetReset(), offsets);
} else {
pollingSummary = new PollingSummary(pollingContext.getGroupId(), pollingContext.getTopics(),
pollingContext.getAutoOffsetReset(), offsets);
pollingContext.getAutoOffsetReset(), pollingContext.getPartition(), offsets);
}
return pollingSummary;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,3 +175,45 @@ Here is an example of FlowFile content that is emitted by JsonRecordSetWriter wh
}
]
```

### Consumer Partition Assignment

By default, this processor will subscribe to one or more Kafka topics in such a way that the topics to consume from are randomly
assigned to the nodes in the NiFi cluster. Consider a scenario where a single Kafka topic has 8 partitions and the consuming
NiFi cluster has 3 nodes. In this scenario, Node 1 may be assigned partitions 0, 1, and 2. Node 2 may be assigned partitions 3, 4, and 5.
Node 3 will then be assigned partitions 6 and 7.

In this scenario, if Node 3 somehow fails or stops pulling data from Kafka, partitions 6 and 7 may then be reassigned to the other two nodes.
For most use cases, this is desirable. It provides fault tolerance and allows the remaining nodes to pick up the slack. However, there are cases
where this is undesirable.

One such case is when using NiFi to consume Change Data Capture (CDC) data from Kafka. Consider again the above scenario. Consider that Node 3
has pulled 1,000 messages from Kafka but has not yet delivered them to their final destination. NiFi is then stopped and restarted, and that takes
15 minutes to complete. In the meantime, Partitions 6 and 7 have been reassigned to the other nodes. Those nodes then proceeded to pull data from
Kafka and deliver it to the desired destination. After 15 minutes, Node 3 rejoins the cluster and then continues to deliver its 1,000 messages that
it has already pulled from Kafka to the destination system. Now, those records have been delivered out of order.

The solution for this, then, is to assign partitions statically instead of dynamically. In this way, we can assign Partitions 6 and 7 to Node 3 specifically.
Then, if Node 3 is restarted, the other nodes will not pull data from Partitions 6 and 7. The data will remain queued in Kafka until Node 3 is restarted. By
using this approach, we can ensure that the data that already was pulled can be processed (assuming First In First Out Prioritizers are used) before newer messages
are handled.

In order to provide a static mapping of node to Kafka partition(s), one or more user-defined properties must be added using the naming scheme
<code>partitions.&lt;hostname&gt;</code> with the value being a comma-separated list of Kafka partitions to use. For example,
<code>partitions.nifi-01=0, 3, 6, 9</code>, <code>partitions.nifi-02=1, 4, 7, 10</code>, and <code>partitions.nifi-03=2, 5, 8, 11</code>.
The hostname that is used can be the fully qualified hostname, the "simple" hostname, or the IP address. There must be an entry for each node in
the cluster, or the Processor will become invalid. If it is desirable for a node to not have any partitions assigned to it, a Property may be
added for the hostname with an empty string as the value.

NiFi cannot readily validate that all Partitions have been assigned before the Processor is scheduled to run. However, it can validate that no
partitions have been skipped. As such, if partitions 0, 1, and 3 are assigned but not partition 2, the Processor will not be valid. However,
if partitions 0, 1, and 2 are assigned, the Processor will become valid, even if there are 4 partitions on the Topic. When the Processor is
started, the Processor will immediately start to fail, logging errors, and avoid pulling any data until the Processor is updated to account
for all partitions. Once running, if the number of partitions is changed, the Processor will continue to run but not pull data from the newly
added partitions. Once stopped, it will begin to error until all partitions have been assigned. Additionally, if partitions that are assigned
do not exist (e.g., partitions 0, 1, 2, 3, 4, 5, 6, and 7 are assigned, but the Topic has only 4 partitions), then the Processor will begin
to log errors on startup and will not pull data.

In order to use a static mapping of Kafka partitions, the "Topic Name Format" must be set to "names" rather than "pattern." Additionally, all
Topics that are to be consumed must have the same number of partitions. If multiple Topics are to be consumed and have a different number of
partitions, multiple Processors must be used so that each Processor consumes only from Topics with the same number of partitions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.util.Collections;
import java.util.List;
import java.util.Map;

import static org.apache.nifi.kafka.processors.ConsumeKafka.CONNECTION_SERVICE;
import static org.apache.nifi.kafka.processors.ConsumeKafka.GROUP_ID;
Expand Down Expand Up @@ -88,7 +89,7 @@ public void testProperties() throws InitializationException {
public void testVerifySuccessful() throws InitializationException {
final PartitionState firstPartitionState = new PartitionState(TEST_TOPIC_NAME, FIRST_PARTITION);
final List<PartitionState> partitionStates = Collections.singletonList(firstPartitionState);
when(kafkaConsumerService.getPartitionStates()).thenReturn(partitionStates);
when(kafkaConsumerService.getPartitionStatesByTopic()).thenReturn(Map.of(TEST_TOPIC_NAME, partitionStates));
setConnectionService();
when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService);

Expand All @@ -105,7 +106,7 @@ public void testVerifySuccessful() throws InitializationException {

@Test
public void testVerifyFailed() throws InitializationException {
when(kafkaConsumerService.getPartitionStates()).thenThrow(new IllegalStateException());
when(kafkaConsumerService.getPartitionStatesByTopic()).thenThrow(new IllegalStateException());
when(kafkaConnectionService.getConsumerService(any())).thenReturn(kafkaConsumerService);
setConnectionService();

Expand Down
Loading
Loading