LogAndContinueProcessingExceptionHandler:
This handler logs the processing exception and then signals the processing pipeline to continue processing more records.
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 1e04c1fd418c0..402f6f37d2c9b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -574,7 +574,8 @@ public class StreamsConfig extends AbstractConfig {
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";
private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
- "If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";
+ "If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.\n" +
+ "Note: This configuration applies only to regular stream processing tasks. It does not apply to global state store updates (global threads).";
/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
@@ -652,7 +653,9 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
@Deprecated
- public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface.";
+ public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the org.apache.kafka.streams.errors.ProcessingExceptionHandler interface. " +
+ "Note: This handler applies only to regular stream processing tasks. It does not apply to global state store updates (global threads). " +
+ "Exceptions occurring in global threads will bubble up to the configured uncaught exception handler.";
/** {@code processing.guarantee} */
@SuppressWarnings("WeakerAccess")
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index bbf82ff9033ba..f121c1626eabd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -208,6 +208,15 @@ public void process(final Record record) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages
+
+ // If the processing exception handler is not set (e.g., for global threads),
+ // rethrow the exception to let it bubble up to the uncaught exception handler.
+ // The processing exception handler is only set for regular stream tasks, not for
+ // global state update tasks which use a different error handling mechanism.
+ if (processingExceptionHandler == null) {
+ throw processingException;
+ }
+
final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.recordContext().topic(),
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 565d6dcf8f9e9..21e670ef2cdaa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -133,6 +133,24 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
}
+ @Test
+ public void shouldRethrowExceptionWhenProcessingExceptionHandlerIsNull() {
+ // This simulates the global thread case where no ProcessingExceptionHandler is set
+ final ProcessorNode