From 5e3960dc6f3d622893d46a46b4d01ad0a6db8117 Mon Sep 17 00:00:00 2001 From: nachogiljaldo Date: Wed, 11 Sep 2024 12:30:16 +0200 Subject: [PATCH] Do not sort before sending to the listener. --- reader.go | 3 --- reader_test.go | 9 ++++++++- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/reader.go b/reader.go index f9521461..958a2ed8 100644 --- a/reader.go +++ b/reader.go @@ -311,9 +311,6 @@ func (r *Reader) run(cg *ConsumerGroup) { for _, partition := range partitions { assignedPartitions = append(assignedPartitions, partition.ID) } - sort.Slice(assignedPartitions, func(i, j int) bool { - return assignedPartitions[i] < assignedPartitions[j] - }) assignments = append(assignments, GroupMemberTopic{ Topic: topic, Partitions: assignedPartitions, diff --git a/reader_test.go b/reader_test.go index 6e97026a..54b13fab 100644 --- a/reader_test.go +++ b/reader_test.go @@ -10,6 +10,7 @@ import ( "net" "os" "reflect" + "sort" "strconv" "sync" "testing" @@ -891,7 +892,7 @@ func TestReaderConsumerGroup(t *testing.T) { } } -func TestAssignmentListener(t *testing.T) { +func TestPartitionAssignmentListener(t *testing.T) { // It appears that some of the tests depend on all these tests being // run concurrently to pass... this is brittle and should be fixed // at some point. @@ -917,6 +918,12 @@ func TestAssignmentListener(t *testing.T) { AssignmentListener: func(partitions []GroupMemberTopic) { lock.Lock() defer lock.Unlock() + // we sort the received partitions for easier comparison + for _, partition := range partitions { + sort.Slice(partition.Partitions, func(i, j int) bool { + return partition.Partitions[i] < partition.Partitions[j] + }) + } assignments = append(assignments, partitions) }, })