Skip to content

Commit 2807675

Browse files
committed
fix(share-group): move subscribe/startJob inside try so runLoop finally always guards appenderator
1 parent cd7016e commit 2807675

1 file changed

Lines changed: 2 additions & 3 deletions

File tree

extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/ShareGroupIndexTaskRunner.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -275,9 +275,6 @@ TaskStatus runLoop(
275275
TaskToolbox toolbox
276276
) throws Exception
277277
{
278-
recordSupplier.subscribe(Collections.singleton(ioConfig.getTopic()));
279-
driver.startJob(segmentId -> true);
280-
281278
// Share groups manage delivery state on the broker; the Committer is a
282279
// no-op placeholder required by the appenderator driver contract.
283280
final Supplier<Committer> committerSupplier = () -> new Committer()
@@ -302,6 +299,8 @@ public void run()
302299
boolean appenderatorClosedNormally = false;
303300

304301
try {
302+
recordSupplier.subscribe(Collections.singleton(ioConfig.getTopic()));
303+
driver.startJob(segmentId -> true);
305304
while (!task.isStopRequested()) {
306305

307306
final List<OrderedPartitionableRecord<KafkaTopicPartition, Long, KafkaRecordEntity>> records;

0 commit comments

Comments
 (0)