Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.kafka.common.requests.ApiError;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -89,15 +88,17 @@ public DescribeProducersRequest.Builder buildBatchedRequest(
Set<TopicPartition> topicPartitions
) {
DescribeProducersRequestData request = new DescribeProducersRequestData();
DescribeProducersRequest.Builder builder = new DescribeProducersRequest.Builder(request);

CollectionUtils.groupPartitionsByTopic(
topicPartitions,
builder::addTopic,
(topicRequest, partitionId) -> topicRequest.partitionIndexes().add(partitionId)
);
for (TopicPartition tp : topicPartitions) {
DescribeProducersRequestData.TopicRequest topicRequest = request.topics().find(tp.topic());
if (topicRequest == null) {
topicRequest = new DescribeProducersRequestData.TopicRequest().setName(tp.topic());
request.topics().add(topicRequest);
}
topicRequest.partitionIndexes().add(tp.partition());
}

return builder;
return new DescribeProducersRequest.Builder(request);
}

private void handlePartitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ListOffsetsRequest;
import org.apache.kafka.common.requests.ListOffsetsResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;
Expand Down Expand Up @@ -80,17 +79,15 @@ public AdminApiLookupStrategy<TopicPartition> lookupStrategy() {

@Override
ListOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<TopicPartition> keys) {
Map<String, ListOffsetsTopic> topicsByName = CollectionUtils.groupPartitionsByTopic(
keys,
topicName -> new ListOffsetsTopic().setName(topicName),
(listOffsetsTopic, partitionId) -> {
TopicPartition topicPartition = new TopicPartition(listOffsetsTopic.name(), partitionId);
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
listOffsetsTopic.partitions().add(
new ListOffsetsPartition()
.setPartitionIndex(partitionId)
.setTimestamp(offsetTimestamp));
});
Map<String, ListOffsetsTopic> topicsByName = new HashMap<>();
for (TopicPartition topicPartition : keys) {
ListOffsetsTopic topic = topicsByName.computeIfAbsent(
topicPartition.topic(), t -> new ListOffsetsTopic().setName(t));
long offsetTimestamp = offsetTimestampsByPartition.get(topicPartition);
topic.partitions().add(new ListOffsetsPartition()
.setPartitionIndex(topicPartition.partition())
.setTimestamp(offsetTimestamp));
}
boolean supportsMaxTimestamp = keys
.stream()
.anyMatch(key -> offsetTimestampsByPartition.get(key) == ListOffsetsRequest.MAX_TIMESTAMP);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,16 @@
import org.apache.kafka.common.protocol.types.Schema;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.protocol.types.Type;
import org.apache.kafka.common.utils.CollectionUtils;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

/**
* <p>The sticky assignor serves two purposes. First, it guarantees an assignment that is as balanced as possible, meaning either:
Expand Down Expand Up @@ -230,7 +231,8 @@ protected MemberData memberData(Subscription subscription) {
static ByteBuffer serializeTopicPartitionAssignment(MemberData memberData) {
Struct struct = new Struct(STICKY_ASSIGNOR_USER_DATA_V1);
List<Struct> topicAssignments = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(memberData.partitions).entrySet()) {
Map<String, List<Integer>> partitionsByTopic = groupPartitionsByTopic(memberData.partitions);
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(TOPIC_ASSIGNMENT);
topicAssignment.set(TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
Expand Down Expand Up @@ -272,4 +274,9 @@ private static MemberData deserializeTopicPartitionAssignment(ByteBuffer buffer)
Optional<Integer> generation = struct.hasField(GENERATION_KEY_NAME) ? Optional.of(struct.getInt(GENERATION_KEY_NAME)) : Optional.empty();
return new MemberData(partitions, generation);
}

private static Map<String, List<Integer>> groupPartitionsByTopic(Collection<TopicPartition> partitions) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you inline it?

return partitions.stream()
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,10 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import javax.security.auth.callback.Callback;

import static org.apache.kafka.common.utils.CollectionUtils.subtractMap;

/**
* A {@code Callback} for use by the {@code SaslServer} implementation when it
* needs to validate the SASL extensions for the OAUTHBEARER mechanism
Expand Down Expand Up @@ -90,6 +89,12 @@ public Map<String, String> ignoredExtensions() {
return Collections.unmodifiableMap(subtractMap(subtractMap(inputExtensions.map(), invalidExtensions), validatedExtensions));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original logic will create map repeatedly. Could you use for-loop to streamline it?

public Map<String, String> ignoredExtensions() {
    Map<String, String> ignored = new HashMap<>();
    for (Map.Entry<String, String> entry : inputExtensions.map().entrySet()) {
        String key = entry.getKey();
        if (!invalidExtensions.containsKey(key) && !validatedExtensions.containsKey(key)) {
            ignored.put(key, entry.getValue());
        }
    }
    return Collections.unmodifiableMap(ignored);
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the subtractMap is more Functional Programming style, more declarative, and this path is unlikely a hot path, which is not suitable for repeating memory allocation, but your approach is okay for me, I'll change it.

}

private static Map<String, String> subtractMap(Map<String, String> minuend, Map<String, String> subtrahend) {
return minuend.entrySet().stream()
.filter(entry -> !subtrahend.containsKey(entry.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

/**
* Validates a specific extension in the original {@code inputExtensions} map
* @param extensionName - the name of the extension which was validated
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
"fields": [
{ "name": "Topics", "type": "[]TopicRequest", "versions": "0+",
"about": "The topics to list producers for.", "fields": [
{ "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName",
{ "name": "Name", "type": "string", "versions": "0+", "mapKey": true, "entityType": "topicName",
"about": "The topic name." },
{ "name": "PartitionIndexes", "type": "[]int32", "versions": "0+",
"about": "The indexes of the partitions to list producers for." }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.DescribeProducersRequest;
import org.apache.kafka.common.requests.DescribeProducersResponse;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.LogContext;

import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -120,7 +119,7 @@ public void testBuildRequest() {
int brokerId = 3;
DescribeProducersRequest.Builder request = handler.buildBatchedRequest(brokerId, topicPartitions);

List<DescribeProducersRequestData.TopicRequest> topics = request.data.topics();
DescribeProducersRequestData.TopicRequestCollection topics = request.data.topics();

assertEquals(Set.of("foo", "bar"), topics.stream()
.map(DescribeProducersRequestData.TopicRequest::name)
Expand Down Expand Up @@ -308,7 +307,7 @@ private DescribeProducersResponse describeProducersResponse(
) {
DescribeProducersResponseData response = new DescribeProducersResponseData();
Map<String, Map<Integer, PartitionResponse>> partitionResponsesByTopic =
CollectionUtils.groupPartitionDataByTopic(partitionResponses);
groupPartitionDataByTopic(partitionResponses);

for (Map.Entry<String, Map<Integer, PartitionResponse>> topicEntry : partitionResponsesByTopic.entrySet()) {
String topic = topicEntry.getKey();
Expand All @@ -327,4 +326,12 @@ private DescribeProducersResponse describeProducersResponse(
return new DescribeProducersResponse(response);
}

private static Map<String, Map<Integer, PartitionResponse>> groupPartitionDataByTopic(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use such complex structure in tests actually. Maybe we could use describeProducersResponse(TopicPartition partition, PartitionResponse partitionResponse) instead

Map<TopicPartition, PartitionResponse> partitionResponses
) {
return partitionResponses.entrySet().stream()
.collect(Collectors.groupingBy(
e -> e.getKey().topic(),
Collectors.toMap(e -> e.getKey().partition(), Map.Entry::getValue)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.protocol.types.Struct;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.CollectionUtils;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -368,7 +367,11 @@ public void testAssignorWithOldVersionSubscriptions() {
private Subscription buildSubscriptionWithOldSchema(List<String> topics, List<TopicPartition> partitions, int consumerIndex) {
Struct struct = new Struct(StickyAssignor.STICKY_ASSIGNOR_USER_DATA_V0);
List<Struct> topicAssignments = new ArrayList<>();
for (Map.Entry<String, List<Integer>> topicEntry : CollectionUtils.groupPartitionsByTopic(partitions).entrySet()) {
Map<String, List<Integer>> partitionsByTopic = new HashMap<>();
for (TopicPartition tp : partitions) {
partitionsByTopic.computeIfAbsent(tp.topic(), t -> new ArrayList<>()).add(tp.partition());
}
for (Map.Entry<String, List<Integer>> topicEntry : partitionsByTopic.entrySet()) {
Struct topicAssignment = new Struct(StickyAssignor.TOPIC_ASSIGNMENT);
topicAssignment.set(StickyAssignor.TOPIC_KEY_NAME, topicEntry.getKey());
topicAssignment.set(StickyAssignor.PARTITIONS_KEY_NAME, topicEntry.getValue().toArray());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.test.api.Flaky;
import org.apache.kafka.common.utils.CollectionUtils;
import org.apache.kafka.common.utils.Utils;

import org.junit.jupiter.api.BeforeEach;
Expand All @@ -36,6 +35,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -1428,6 +1428,11 @@ private String pad(int num, int digits) {
return "0".repeat(Math.max(0, digits - iDigits)) + num;
}

private static Map<String, List<Integer>> groupPartitionsByTopic(Collection<TopicPartition> partitions) {
return partitions.stream()
.collect(Collectors.groupingBy(TopicPartition::topic, Collectors.mapping(TopicPartition::partition, Collectors.toList())));
}

protected static List<String> topics(String... topics) {
return Arrays.asList(topics);
}
Expand Down Expand Up @@ -1522,8 +1527,8 @@ protected void verifyValidityAndBalance(Map<String, Subscription> subscriptions,
if (Math.abs(len - otherLen) <= 1)
continue;

Map<String, List<Integer>> map = CollectionUtils.groupPartitionsByTopic(partitions);
Map<String, List<Integer>> otherMap = CollectionUtils.groupPartitionsByTopic(otherPartitions);
Map<String, List<Integer>> map = groupPartitionsByTopic(partitions);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could streamline the test.

Set<String> otherTopics = otherPartitions.stream()
    .map(TopicPartition::topic)
    .collect(Collectors.toSet());

for (TopicPartition tp : partitions) {
    assertFalse(otherTopics.contains(tp.topic()), 
        "Error: Some partitions can be moved...");
}

Map<String, List<Integer>> otherMap = groupPartitionsByTopic(otherPartitions);

int moreLoaded = len > otherLen ? i : j;
int lessLoaded = len > otherLen ? j : i;
Expand Down
Loading
Loading