Skip to content

Commit ce68d9b

Browse files
committed
test(share-group): add IT verifying at-least-once semantics after Indexer JVM crash
1 parent 872d6f3 commit ce68d9b

1 file changed

Lines changed: 272 additions & 0 deletions

File tree

Lines changed: 272 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,272 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.druid.testing.embedded.indexing;
21+
22+
import com.fasterxml.jackson.databind.ObjectMapper;
23+
import org.apache.druid.data.input.impl.CsvInputFormat;
24+
import org.apache.druid.data.input.impl.DimensionsSpec;
25+
import org.apache.druid.data.input.impl.TimestampSpec;
26+
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
27+
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
28+
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
29+
import org.apache.druid.indexing.kafka.ShareGroupIndexTask;
30+
import org.apache.druid.indexing.kafka.ShareGroupIndexTaskIOConfig;
31+
import org.apache.druid.jackson.DefaultObjectMapper;
32+
import org.apache.druid.java.util.common.granularity.Granularities;
33+
import org.apache.druid.query.DruidMetrics;
34+
import org.apache.druid.segment.indexing.DataSchema;
35+
import org.apache.druid.testing.embedded.EmbeddedBroker;
36+
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
37+
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
38+
import org.apache.druid.testing.embedded.EmbeddedHistorical;
39+
import org.apache.druid.testing.embedded.EmbeddedIndexer;
40+
import org.apache.druid.testing.embedded.EmbeddedOverlord;
41+
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
42+
import org.junit.jupiter.api.Assertions;
43+
import org.junit.jupiter.api.Test;
44+
45+
import java.nio.charset.StandardCharsets;
46+
import java.util.ArrayList;
47+
import java.util.List;
48+
import java.util.Map;
49+
50+
/**
51+
* Verifies at-least-once semantics: uncommitted records are redelivered after Indexer crash.
52+
*/
53+
public class ShareGroupIndexerCrashIT extends EmbeddedClusterTestBase
54+
{
55+
private static final long SHARE_CONSUMER_READY_DELAY_MS = 3_000L;
56+
private static final String COL_TIMESTAMP = "__time";
57+
private static final String COL_ITEM = "item";
58+
private static final String COL_VALUE = "value";
59+
private static final String GROUP_ID = "crash-it-group";
60+
61+
private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
62+
private final EmbeddedOverlord overlord = new EmbeddedOverlord();
63+
private final EmbeddedIndexer indexer = new EmbeddedIndexer();
64+
private final EmbeddedHistorical historical = new EmbeddedHistorical();
65+
private final EmbeddedBroker broker = new EmbeddedBroker();
66+
67+
private ShareGroupKafkaResource kafkaServer;
68+
private final ObjectMapper mapper = new DefaultObjectMapper();
69+
70+
@Override
71+
public EmbeddedDruidCluster createCluster()
72+
{
73+
kafkaServer = new ShareGroupKafkaResource();
74+
final EmbeddedDruidCluster cluster = EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
75+
indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
76+
cluster.addExtension(KafkaIndexTaskModule.class)
77+
.addResource(kafkaServer)
78+
.useLatchableEmitter()
79+
.useDefaultTimeoutForLatchableEmitter(60)
80+
.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.1s")
81+
.addServer(coordinator)
82+
.addServer(overlord)
83+
.addServer(indexer)
84+
.addServer(historical)
85+
.addServer(broker);
86+
return cluster;
87+
}
88+
89+
@Test
90+
public void test_indexerCrash_midIngestion_noDataLossAfterRecovery() throws Exception
91+
{
92+
final String topic = dataSource + "_crash_topic";
93+
kafkaServer.createTopicWithPartitions(topic, 1);
94+
kafkaServer.setShareGroupAutoOffsetReset(GROUP_ID, "earliest");
95+
96+
// Produce all records before starting task so Indexer is polling them when it crashes.
97+
final int batchA = 20;
98+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchA, 0, "2025-09-01"));
99+
100+
// Task 1: starts ingesting but Indexer crashes before it completes.
101+
final String taskId1 = submitTask(topic);
102+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
103+
104+
// Wait for at least some rows to be processed (but not all published to segments).
105+
indexer.latchableEmitter().waitForEventAggregate(
106+
event -> event.hasMetricName("ingest/events/processed")
107+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
108+
agg -> agg.hasSumAtLeast(1)
109+
);
110+
111+
// Simulate JVM crash: stop the indexer abruptly (no graceful shutdown).
112+
indexer.stop();
113+
114+
// Restart the indexer.
115+
indexer.start();
116+
117+
// Task 2: same group-id. Broker redelivers any unacknowledged records.
118+
final String taskId2 = submitTask(topic);
119+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
120+
121+
// Produce a second batch to ensure Task 2 is actively consuming.
122+
final int batchB = 10;
123+
kafkaServer.publishRecordsToTopic(topic, csvRecords(batchB, batchA, "2025-09-02"));
124+
125+
// Wait until all batchA + batchB records are processed (at-least-once: batchA may arrive twice).
126+
indexer.latchableEmitter().waitForEventAggregate(
127+
event -> event.hasMetricName("ingest/events/processed")
128+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
129+
agg -> agg.hasSumAtLeast(batchA + batchB)
130+
);
131+
132+
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId2));
133+
cluster.callApi().waitForTaskToFinish(taskId2, overlord.latchableEmitter());
134+
135+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
136+
137+
// At-least-once guarantee: row count >= batchA + batchB (no loss); may be slightly higher due to redelivery.
138+
final long rowCount = Long.parseLong(cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));
139+
Assertions.assertTrue(
140+
rowCount >= batchA + batchB,
141+
"Expected at least [" + (batchA + batchB) + "] rows but got [" + rowCount + "]"
142+
);
143+
}
144+
145+
@Test
146+
public void test_indexerCrash_beforeAnyAck_allRecordsRedelivered() throws Exception
147+
{
148+
final String topic = dataSource + "_crash_no_ack_topic";
149+
kafkaServer.createTopicWithPartitions(topic, 1);
150+
kafkaServer.setShareGroupAutoOffsetReset(GROUP_ID + "-noack", "earliest");
151+
152+
final int numRecords = 10;
153+
kafkaServer.publishRecordsToTopic(topic, csvRecords(numRecords, 0, "2025-10-01"));
154+
155+
// Task 1: starts but is killed before its first publish (no ack committed).
156+
submitTask(topic, GROUP_ID + "-noack");
157+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
158+
159+
indexer.stop();
160+
indexer.start();
161+
162+
// Task 2 (same group): broker must redeliver all numRecords because none were acked.
163+
final String taskId2 = submitTask(topic, GROUP_ID + "-noack");
164+
Thread.sleep(SHARE_CONSUMER_READY_DELAY_MS);
165+
166+
indexer.latchableEmitter().waitForEventAggregate(
167+
event -> event.hasMetricName("ingest/events/processed")
168+
.hasDimension(DruidMetrics.DATASOURCE, dataSource),
169+
agg -> agg.hasSumAtLeast(numRecords)
170+
);
171+
172+
cluster.callApi().onLeaderOverlord(o -> o.cancelTask(taskId2));
173+
cluster.callApi().waitForTaskToFinish(taskId2, overlord.latchableEmitter());
174+
175+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
176+
177+
final long rowCount = Long.parseLong(cluster.runSql("SELECT COUNT(*) FROM %s", dataSource));
178+
Assertions.assertTrue(
179+
rowCount >= numRecords,
180+
"Expected at least [" + numRecords + "] rows after crash recovery but got [" + rowCount + "]"
181+
);
182+
}
183+
184+
private String submitTask(String topic)
185+
{
186+
return submitTask(topic, GROUP_ID);
187+
}
188+
189+
private String submitTask(String topic, String groupId)
190+
{
191+
final Map<String, Object> consumerProps = kafkaServer.consumerProperties();
192+
final ShareGroupIndexTaskIOConfig ioConfig = new ShareGroupIndexTaskIOConfig(
193+
topic,
194+
groupId,
195+
consumerProps,
196+
new CsvInputFormat(
197+
List.of(COL_TIMESTAMP, COL_ITEM, COL_VALUE),
198+
null,
199+
null,
200+
false,
201+
0,
202+
false
203+
),
204+
null
205+
);
206+
final DataSchema dataSchema = DataSchema.builder()
207+
.withDataSource(dataSource)
208+
.withTimestamp(new TimestampSpec(COL_TIMESTAMP, "auto", null))
209+
.withDimensions(
210+
DimensionsSpec.builder()
211+
.setDimensions(
212+
DimensionsSpec.getDefaultSchemas(
213+
List.of(COL_ITEM, COL_VALUE)
214+
)
215+
)
216+
.build()
217+
)
218+
.withGranularity(
219+
new UniformGranularitySpec(
220+
Granularities.DAY,
221+
Granularities.NONE,
222+
null
223+
)
224+
)
225+
.build();
226+
final ShareGroupIndexTask task = new ShareGroupIndexTask(
227+
null,
228+
null,
229+
dataSchema,
230+
new KafkaIndexTaskTuningConfig(
231+
null,
232+
null,
233+
null,
234+
null,
235+
null,
236+
null,
237+
null,
238+
null,
239+
null,
240+
null,
241+
null,
242+
null,
243+
null,
244+
null,
245+
null,
246+
null,
247+
null,
248+
null,
249+
null,
250+
null,
251+
null,
252+
null
253+
),
254+
ioConfig,
255+
null,
256+
mapper
257+
);
258+
cluster.callApi().submitTask(task);
259+
return task.getId();
260+
}
261+
262+
private List<byte[]> csvRecords(int count, int startIndex, String dateStr)
263+
{
264+
final List<byte[]> records = new ArrayList<>();
265+
for (int i = 0; i < count; i++) {
266+
final String csv = dateStr + "T00:" + String.format("%02d", (startIndex + i) % 60) + ":00Z"
267+
+ ",item" + (startIndex + i) + "," + (startIndex + i);
268+
records.add(csv.getBytes(StandardCharsets.UTF_8));
269+
}
270+
return records;
271+
}
272+
}

0 commit comments

Comments
 (0)