Skip to content

Commit be58522

Browse files
authored
fix: Improved error handling in SeekableStreamIndexTaskRunner. (#19218)
The main improvement is that "persist" is moved out of a finally block, and now only happens on the normal path. This has two benefits. First, there is no point in persisting on the error path, and the in-memory index might be in a bad state anyway at that point. Second, moving the persist call out of "finally" fixes an issue where an exception thrown from "persist" would cause an exception thrown from "add" to be lost. This can come up in production when the in-memory index grows too large, causing the main code to throw an OutOfMemoryError, and then something goes wrong with the persist too. In this situation the original OutOfMemoryError would not have been logged. A secondary improvement is that we catch Throwable rather than Exception to trigger cleanup and when handling errors that occur during cleanup. This ensures we don't miss cleanup tasks when an Error is thrown by the main code, and that we don't lose the original exception if an Error is thrown by the cleanup code.
1 parent f5ec8be commit be58522

2 files changed

Lines changed: 15 additions & 27 deletions

File tree

extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1752,8 +1752,8 @@ public void testMultipleParseExceptionsFailure() throws Exception
17521752

17531753
final SegmentGenerationMetrics observedSegmentGenerationMetrics = task.getRunner().getSegmentGenerationMetrics();
17541754
Assert.assertTrue(observedSegmentGenerationMetrics.isProcessingDone());
1755-
Assert.assertEquals(3, observedSegmentGenerationMetrics.rowOutput());
1756-
Assert.assertEquals(1, observedSegmentGenerationMetrics.numPersists());
1755+
Assert.assertEquals(0, observedSegmentGenerationMetrics.rowOutput());
1756+
Assert.assertEquals(0, observedSegmentGenerationMetrics.numPersists());
17571757
Assert.assertEquals(0, observedSegmentGenerationMetrics.handOffCount());
17581758
}
17591759

indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java

Lines changed: 13 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -445,7 +445,7 @@ private TaskStatus runInternal(TaskToolbox toolbox) throws Exception
445445
)
446446
);
447447

448-
Throwable caughtExceptionOuter = null;
448+
Throwable caughtException = null;
449449

450450
//milliseconds waited for created segments to be handed off
451451
long handoffWaitMs = 0L;
@@ -607,8 +607,6 @@ public void run()
607607
// Could eventually support leader/follower mode (for keeping replicas more in sync)
608608
boolean stillReading = !assignment.isEmpty();
609609
status = Status.READING;
610-
Throwable caughtExceptionInner = null;
611-
612610
try {
613611
while (stillReading) {
614612
if (possiblyPause()) {
@@ -809,9 +807,8 @@ public void onFailure(Throwable t)
809807
}
810808
}
811809
}
812-
catch (Exception e) {
810+
catch (Throwable e) {
813811
// (1) catch all exceptions while reading from kafka
814-
caughtExceptionInner = e;
815812
if (Throwables.getRootCause(e) instanceof InterruptedException) {
816813
// Suppress InterruptedException stack trace to avoid flooding the logs
817814
log.error("Encounted InterrupedException in run() before persisting");
@@ -821,20 +818,11 @@ public void onFailure(Throwable t)
821818
throw e;
822819
}
823820
finally {
824-
try {
825-
// To handle cases where tasks stop reading due to stop request or exceptions
826-
segmentGenerationMetrics.markProcessingDone();
827-
driver.persist(committerSupplier.get()); // persist pending data
828-
}
829-
catch (Exception e) {
830-
if (caughtExceptionInner != null) {
831-
caughtExceptionInner.addSuppressed(e);
832-
} else {
833-
throw e;
834-
}
835-
}
821+
segmentGenerationMetrics.markProcessingDone();
836822
}
837823

824+
driver.persist(committerSupplier.get()); // persist pending data
825+
838826
synchronized (statusLock) {
839827
if (stopRequested.get() && !publishOnStop.get()) {
840828
throw new InterruptedException("Stopping without publishing");
@@ -861,7 +849,7 @@ public void onFailure(Throwable t)
861849
// Committer is built.)
862850
sequenceMetadata.updateAssignments(currOffsets, this::isMoreToReadAfterReadingRecord);
863851
publishingSequences.add(sequenceMetadata.getSequenceName());
864-
// persist already done in finally, so directly add to publishQueue
852+
// persist already done above, so directly add to publishQueue
865853
publishAndRegisterHandoff(sequenceMetadata);
866854
}
867855
}
@@ -913,15 +901,15 @@ public void onFailure(Throwable t)
913901
catch (InterruptedException | RejectedExecutionException e) {
914902
// (2) catch InterruptedException and RejectedExecutionException thrown for the whole ingestion steps including
915903
// the final publishing.
916-
caughtExceptionOuter = e;
904+
caughtException = e;
917905
try {
918906
Futures.allAsList(publishWaitList).cancel(true);
919907
Futures.allAsList(handOffWaitList).cancel(true);
920908
if (appenderator != null) {
921909
appenderator.closeNow();
922910
}
923911
}
924-
catch (Exception e2) {
912+
catch (Throwable e2) {
925913
e.addSuppressed(e2);
926914
}
927915

@@ -937,17 +925,17 @@ public void onFailure(Throwable t)
937925
throw e;
938926
}
939927
}
940-
catch (Exception e) {
928+
catch (Throwable e) {
941929
// (3) catch all other exceptions thrown for the whole ingestion steps including the final publishing.
942-
caughtExceptionOuter = e;
930+
caughtException = e;
943931
try {
944932
Futures.allAsList(publishWaitList).cancel(true);
945933
Futures.allAsList(handOffWaitList).cancel(true);
946934
if (appenderator != null) {
947935
appenderator.closeNow();
948936
}
949937
}
950-
catch (Exception e2) {
938+
catch (Throwable e2) {
951939
e.addSuppressed(e2);
952940
}
953941
throw e;
@@ -966,8 +954,8 @@ public void onFailure(Throwable t)
966954
rejectionPeriodUpdaterExec.shutdown();
967955
}
968956
catch (Throwable e) {
969-
if (caughtExceptionOuter != null) {
970-
caughtExceptionOuter.addSuppressed(e);
957+
if (caughtException != null) {
958+
caughtException.addSuppressed(e);
971959
} else {
972960
throw e;
973961
}

0 commit comments

Comments
 (0)