Skip to content

Commit cd7016e

Browse files
committed
test(share-group): add IT verifying broker auto-rebalances new partitions to share consumers
1 parent ce68d9b commit cd7016e

1 file changed

Lines changed: 259 additions & 0 deletions

File tree

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.indexing;
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.druid.data.input.impl.CsvInputFormat;
24+
import org.apache.druid.data.input.impl.DimensionsSpec;
25+
import org.apache.druid.data.input.impl.TimestampSpec;
26+
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
27+
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
28+
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
29+
import org.apache.druid.indexing.kafka.ShareGroupIndexTask;
30+
import org.apache.druid.indexing.kafka.ShareGroupIndexTaskIOConfig;
31+
import org.apache.druid.jackson.DefaultObjectMapper;
32+
import org.apache.druid.java.util.common.granularity.Granularities;
33+
import org.apache.druid.query.DruidMetrics;
34+
import org.apache.druid.segment.indexing.DataSchema;
35+
import org.apache.druid.testing.embedded.EmbeddedBroker;
36+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
37+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
38+
import org.apache.druid.testing.embedded.EmbeddedHistorical;
39+
import org.apache.druid.testing.embedded.EmbeddedIndexer;
40+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
41+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
42+
import org.junit.jupiter.api.Assertions;
43+
import org.junit.jupiter.api.Test;
44+
45+
import java.nio.charset.StandardCharsets;
46+
import java.util.ArrayList;
47+
import java.util.List;
48+
import java.util.Map;
49+
50+
/**
51+
* Verifies that the Kafka broker automatically rebalances new partitions to share-group consumers
52+
* without any Druid-side intervention (no supervisor, no task restart).
53+
*/
54+
public class ShareGroupPartitionRebalancingIT extends EmbeddedClusterTestBase
55+
{
56+
private static final long SHARE_CONSUMER_READY_DELAY_MS = 3_000L;
57+
private static final long PARTITION_REBALANCE_DELAY_MS = 2_000L;
58+
private static final String COL_TIMESTAMP = "__time";
59+
private static final String COL_ITEM = "item";
60+
private static final String COL_VALUE = "value";
61+
62+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
63+
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
64+
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
65+
private final EmbeddedHistorical historical = new EmbeddedHistorical();
66+
private final EmbeddedBroker broker = new EmbeddedBroker();
67+
68+
private ShareGroupKafkaResource kafkaServer;
69+
private final ObjectMapper mapper = new DefaultObjectMapper();
70+
71+
@Override
72+
public EmbeddedDruidCluster createCluster()
73+
{
74+
kafkaServer = new ShareGroupKafkaResource();
75+
final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
76+
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
77+
cluster.addExtension(KafkaIndexTaskModule.class)
78+
.addResource(kafkaServer)
79+
.useLatchableEmitter()
80+
.useDefaultTimeoutForLatchableEmitter(60)
81+
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
82+
.addServer(coordinator)
83+
.addServer(overlord)
84+
.addServer(indexer)
85+
.addServer(historical)
86+
.addServer(broker);
87+
return cluster;
88+
}
89+
90+
@Test
91+
public void test_singleTask_partitionsIncrease_newPartitionsAutomaticallyConsumed() throws Exception
92+
{
93+
final String topic = dataSource + "_rebalance_single_topic";
94+
final String groupId = "rebalance-single-group";
95+
kafkaServer.createTopicWithPartitions(topic, 2);
96+
kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest");
97+
98+
final String taskId = submitTask(topic, groupId);
99+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
100+
101+
final int batchA = 10;
102+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchA, 0, "2025-11-01"));
103+
104+
waitForRowsProcessed(batchA);
105+
106+
// Increase from 2 to 4 partitions; broker rebalances automatically.
107+
kafkaServer.increasePartitionsInTopic(topic, 4);
108+
Thread.sleep(PARTITION_REBALANCE_DELAY_MS);
109+
110+
final int batchB = 10;
111+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchB, batchA, "2025-11-02"));
112+
113+
waitForRowsProcessed(batchA + batchB);
114+
115+
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId));
116+
cluster.callApi().waitForTaskToFinish(taskId, overlord.latchableEmitter());
117+
118+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
119+
120+
final long rowCount = Long.parseLong(cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));
121+
Assertions.assertTrue(
122+
rowCount >= batchA + batchB,
123+
"Expected at least [" + (batchA + batchB) + "] rows but got [" + rowCount + "]"
124+
);
125+
}
126+
127+
@Test
128+
public void test_multiTask_partitionsIncrease_brokerDistributesNewPartitions() throws Exception
129+
{
130+
final String topic = dataSource + "_rebalance_multi_topic";
131+
final String groupId = "rebalance-multi-group";
132+
kafkaServer.createTopicWithPartitions(topic, 2);
133+
kafkaServer.setShareGroupAutoOffsetReset(groupId, "earliest");
134+
135+
final String taskId1 = submitTask(topic, groupId);
136+
final String taskId2 = submitTask(topic, groupId);
137+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
138+
139+
final int batchA = 10;
140+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchA, 0, "2025-12-01"));
141+
142+
waitForRowsProcessed(batchA);
143+
144+
// Increase from 2 to 4 partitions; broker distributes the 2 new partitions across both consumers.
145+
kafkaServer.increasePartitionsInTopic(topic, 4);
146+
Thread.sleep(PARTITION_REBALANCE_DELAY_MS);
147+
148+
final int batchB = 20;
149+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchB, batchA, "2025-12-02"));
150+
151+
waitForRowsProcessed(batchA + batchB);
152+
153+
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId1));
154+
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId2));
155+
cluster.callApi().waitForTaskToFinish(taskId1, overlord.latchableEmitter());
156+
cluster.callApi().waitForTaskToFinish(taskId2, overlord.latchableEmitter());
157+
158+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
159+
160+
final long rowCount = Long.parseLong(cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));
161+
Assertions.assertTrue(
162+
rowCount >= batchA + batchB,
163+
"Expected at least [" + (batchA + batchB) + "] rows but got [" + rowCount + "]"
164+
);
165+
}
166+
167+
private String submitTask(String topic, String groupId)
168+
{
169+
final Map<String, Object> consumerProps = kafkaServer.consumerProperties();
170+
final ShareGroupIndexTaskIOConfig ioConfig = new ShareGroupIndexTaskIOConfig(
171+
topic,
172+
groupId,
173+
consumerProps,
174+
new CsvInputFormat(
175+
List.of(COL_TIMESTAMP, COL_ITEM, COL_VALUE),
176+
null,
177+
null,
178+
false,
179+
0,
180+
false
181+
),
182+
null
183+
);
184+
final DataSchema dataSchema = DataSchema.builder()
185+
.withDataSource(dataSource)
186+
.withTimestamp(new TimestampSpec(COL_TIMESTAMP, "auto", null))
187+
.withDimensions(
188+
DimensionsSpec.builder()
189+
.setDimensions(
190+
DimensionsSpec.getDefaultSchemas(
191+
List.of(COL_ITEM, COL_VALUE)
192+
)
193+
)
194+
.build()
195+
)
196+
.withGranularity(
197+
new UniformGranularitySpec(
198+
Granularities.DAY,
199+
Granularities.NONE,
200+
null
201+
)
202+
)
203+
.build();
204+
final ShareGroupIndexTask task = new ShareGroupIndexTask(
205+
null,
206+
null,
207+
dataSchema,
208+
new KafkaIndexTaskTuningConfig(
209+
null,
210+
null,
211+
null,
212+
null,
213+
null,
214+
null,
215+
null,
216+
null,
217+
null,
218+
null,
219+
null,
220+
null,
221+
null,
222+
null,
223+
null,
224+
null,
225+
null,
226+
null,
227+
null,
228+
null,
229+
null,
230+
null
231+
),
232+
ioConfig,
233+
null,
234+
mapper
235+
);
236+
cluster.callApi().submitTask(task);
237+
return task.getId();
238+
}
239+
240+
private void waitForRowsProcessed(long expected)
241+
{
242+
indexer.latchableEmitter().waitForEventAggregate(
243+
event -> event.hasMetricName("ingest/events/processed")
244+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
245+
agg -> agg.hasSumAtLeast(expected)
246+
);
247+
}
248+
249+
private List<byte[]> csvRecords(int count, int startIndex, String dateStr)
250+
{
251+
final List<byte[]> records = new ArrayList<>();
252+
for (int i = 0; i < count; i++) {
253+
final String csv = dateStr + "T00:" + String.format("%02d", (startIndex + i) % 60) + ":00Z"
254+
+ ",item" + (startIndex + i) + "," + (startIndex + i);
255+
records.add(csv.getBytes(StandardCharsets.UTF_8));
256+
}
257+
return records;
258+
}
259+
}

0 commit comments

Comments
 (0)