diff --git a/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java b/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java index 86be42f725..6e0aac3475 100644 --- a/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java +++ b/fluss-common/src/main/java/org/apache/fluss/utils/ExceptionUtils.java @@ -32,8 +32,12 @@ import java.io.PrintWriter; import java.io.StringWriter; import java.lang.reflect.Field; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.HashSet; import java.util.Locale; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutionException; import java.util.function.Function; @@ -365,10 +369,67 @@ public static T firstOrSuppressed(T newException, @Nullabl if (previous == null || previous == newException) { return newException; - } else { - previous.addSuppressed(newException); + } + + // If the exceptions already reference each other through the suppression or cause chains, + // return the previous exception to avoid introducing cycles. + if (existsInExceptionChain(newException, previous) + || existsInExceptionChain(previous, newException)) { return previous; } + + previous.addSuppressed(newException); + return previous; + } + + /** + * Checks whether the given {@code targetException} exists anywhere within the exception chain + * of {@code exceptionChain}. This includes both the cause chain and all suppressed exceptions. + * A visited set is used to avoid cycles and redundant traversal. + * + * @param targetException The throwable exception to search for. + * @param exceptionChain The previous throwable exception chain to search in. + * @return {@code true}, if the exception is found within the exception chain (suppressed or + * cause), {@code false} otherwise. + */ + private static boolean existsInExceptionChain( + Throwable targetException, Throwable exceptionChain) { + if (targetException == null || exceptionChain == null) { + return false; + } + if (targetException == exceptionChain) { + return true; + } + + // Apply cycle prevention through a graph-like traversal of existing + // suppressed or cause chain exceptions + Set visitedExceptions = new HashSet<>(); + Deque exceptionStack = new ArrayDeque<>(); + exceptionStack.push(exceptionChain); + + while (!exceptionStack.isEmpty()) { + Throwable currentException = exceptionStack.pop(); + if (!visitedExceptions.add(currentException)) { + continue; + } + + if (currentException == targetException) { + return true; + } + + // Traverse suppression chain + for (Throwable suppressed : currentException.getSuppressed()) { + exceptionStack.push(suppressed); + } + + // Traverse cause-chain + Throwable cause = currentException.getCause(); + if (cause != null) { + exceptionStack.push(cause); + } + } + + return false; } /** diff --git a/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java b/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java index 04b4d951fc..29447ac5b5 100644 --- a/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java +++ b/fluss-common/src/test/java/org/apache/fluss/utils/ExceptionUtilsTest.java @@ -108,7 +108,7 @@ public void testFindThrowableByType() { @Test void testFirstOrSuppressed() { - // tet first exception + // test first exception Exception exception = new Exception("exception"); assertThat(ExceptionUtils.firstOrSuppressed(exception, null)).isEqualTo(exception); @@ -121,6 +121,110 @@ void testFirstOrSuppressed() { assertThat(suppressedException.getSuppressed()).isEqualTo(new Throwable[] {newException}); } + @Test + void testFirstOrSuppressedCyclePrevention() { + // create two test exceptions (assuming thrown during shutdown, etc.) + Exception exceptionA = new Exception("Exception A"); + Exception exceptionB = new Exception("Exception B"); + + // associate the suppressions (creating a suppression chain) + ExceptionUtils.firstOrSuppressed(exceptionB, exceptionA); + assertThat(exceptionA.getSuppressed()).contains(exceptionB); + + // attempt to create a suppression cycle (A -> B; B -> A) + Exception result = ExceptionUtils.firstOrSuppressed(exceptionA, exceptionB); + assertThat(result).isEqualTo(exceptionB); + + // verify the exception cycle was prevented (no bidirectional reference) + assertThat(exceptionA.getSuppressed()).contains(exceptionB); + assertThat(exceptionB.getSuppressed()).doesNotContain(exceptionA); + assertThat(exceptionB.getSuppressed()).isEmpty(); + + // verify that processing suppressed exceptions no longer causes StackOverflowError + Throwable thrown = null; + try { + recursivelyProcessSuppressedExceptions(exceptionA); + } catch (Throwable t) { + thrown = t; + } + assertThat(thrown).isNull(); + } + + @Test + void testFirstOrSuppressedCyclePreventionThroughCauseChain() { + // create two test exceptions linked through cause chain + Exception exceptionA = new Exception("Exception A"); + Exception exceptionB = new Exception("Exception B"); + + // associate through cause chain (A has B as cause) + exceptionA.initCause(exceptionB); + assertThat(exceptionA.getCause()).isEqualTo(exceptionB); + + // attempt to create a cycle by trying to suppress A into B + // Since B is already in A's cause chain, this should be prevented + Exception result = ExceptionUtils.firstOrSuppressed(exceptionA, exceptionB); + assertThat(result).isEqualTo(exceptionB); + + // verify the exception cycle was prevented (B should not have A suppressed) + assertThat(exceptionB.getSuppressed()).doesNotContain(exceptionA); + assertThat(exceptionA.getCause()).isEqualTo(exceptionB); + + // verify that processing the cause chain no longer causes StackOverflowError + Throwable thrown = null; + try { + recursivelyProcessCauseChain(exceptionA); + } catch (Throwable t) { + thrown = t; + } + assertThat(thrown).isNull(); + } + + @Test + void testFirstOrSuppressedCyclePreventionThroughBidirectionalCauseChain() { + // create two test exceptions + Exception exceptionA = new Exception("Exception A"); + Exception exceptionB = new Exception("Exception B"); + + // create a bidirectional cause chain scenario + // A has B as cause, then attempt to suppress A into B + exceptionA.initCause(exceptionB); + assertThat(exceptionA.getCause()).isEqualTo(exceptionB); + + // attempt to create a cycle (B -> A via cause, then A -> B via suppression) + Exception result = ExceptionUtils.firstOrSuppressed(exceptionA, exceptionB); + assertThat(result).isEqualTo(exceptionB); + + // verify the cycle was prevented + assertThat(exceptionA.getCause()).isEqualTo(exceptionB); + assertThat(exceptionB.getSuppressed()).doesNotContain(exceptionA); + + // Now test the reverse: B has A as cause, then attempt to suppress B into A + Exception exceptionC = new Exception("Exception C"); + Exception exceptionD = new Exception("Exception D"); + exceptionD.initCause(exceptionC); + + // attempt to create a cycle (C -> D via cause, then D -> C via suppression) + Exception result2 = ExceptionUtils.firstOrSuppressed(exceptionD, exceptionC); + assertThat(result2).isEqualTo(exceptionC); + + // verify the cycle was prevented + assertThat(exceptionD.getCause()).isEqualTo(exceptionC); + assertThat(exceptionC.getSuppressed()).doesNotContain(exceptionD); + } + + private void recursivelyProcessSuppressedExceptions(Throwable throwable) { + for (Throwable suppressed : throwable.getSuppressed()) { + recursivelyProcessSuppressedExceptions(suppressed); + } + } + + private void recursivelyProcessCauseChain(Throwable throwable) { + Throwable cause = throwable.getCause(); + if (cause != null) { + recursivelyProcessCauseChain(cause); + } + } + @Test public void testExceptionStripping() { final Exception expectedException = new Exception("test exception");