Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion docs/streams/developer-guide/config-streams.html
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,10 @@ <h4><a class="toc-backref" href="#id30">probing.rebalance.interval.ms</a><a clas
<div><p>The processing exception handler allows you to manage exceptions triggered during the processing of a record. The implemented exception
handler needs to return a <code>FAIL</code> or <code>CONTINUE</code> depending on the record and the exception thrown. Returning
<code>FAIL</code> will signal that Streams should shut down and <code>CONTINUE</code> will signal that Streams should ignore the issue
and continue processing. The following library built-in exception handlers are available:</p>
and continue processing.</p>
<p><strong>Note:</strong> This handler applies only to regular stream processing tasks. It does not apply to global state store updates
(global threads). Exceptions occurring in global threads will bubble up to the configured uncaught exception handler.</p>
<p>The following library built-in exception handlers are available:</p>
<ul class="simple">
<li><a class="reference external" href="/{{version}}/javadoc/org/apache/kafka/streams/errors/LogAndContinueProcessingExceptionHandler.html">LogAndContinueProcessingExceptionHandler</a>:
This handler logs the processing exception and then signals the processing pipeline to continue processing more records.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,8 @@ public class StreamsConfig extends AbstractConfig {
public static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_CONFIG = "errors.dead.letter.queue.topic.name";

private static final String ERRORS_DEAD_LETTER_QUEUE_TOPIC_NAME_DOC = "If not null, the default exception handler will build and send a Dead Letter Queue record to the topic with the provided name if an error occurs.\n" +
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.";
"If a custom deserialization/production or processing exception handler is set, this parameter is ignored for this handler.\n" +
"Note: This configuration applies only to regular stream processing tasks. It does not apply to global state store updates (global threads).";

/** {@code log.summary.interval.ms} */
public static final String LOG_SUMMARY_INTERVAL_MS_CONFIG = "log.summary.interval.ms";
Expand Down Expand Up @@ -652,7 +653,9 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_CONFIG = "processing.exception.handler";
@Deprecated
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface.";
public static final String PROCESSING_EXCEPTION_HANDLER_CLASS_DOC = "Exception handling class that implements the <code>org.apache.kafka.streams.errors.ProcessingExceptionHandler</code> interface. " +
"Note: This handler applies only to regular stream processing tasks. It does not apply to global state store updates (global threads). " +
"Exceptions occurring in global threads will bubble up to the configured uncaught exception handler.";

/** {@code processing.guarantee} */
@SuppressWarnings("WeakerAccess")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,15 @@ public void process(final Record<KIn, VIn> record) {
// while Java distinguishes checked vs unchecked exceptions, other languages
// like Scala or Kotlin do not, and thus we need to catch `Exception`
// (instead of `RuntimeException`) to work well with those languages

// If the processing exception handler is not set (e.g., for global threads),
// rethrow the exception to let it bubble up to the uncaught exception handler.
// The processing exception handler is only set for regular stream tasks, not for
// global state update tasks which use a different error handling mechanism.
if (processingExceptionHandler == null) {
throw processingException;
}

final ErrorHandlerContext errorHandlerContext = new DefaultErrorHandlerContext(
null, // only required to pass for DeserializationExceptionHandler
internalProcessorContext.recordContext().topic(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,24 @@ public void shouldNotThrowFailedProcessingExceptionWhenProcessingExceptionHandle
assertDoesNotThrow(() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));
}

@Test
public void shouldRethrowExceptionWhenProcessingExceptionHandlerIsNull() {
// This simulates the global thread case where no ProcessingExceptionHandler is set
final ProcessorNode<Object, Object, Object, Object> node =
new ProcessorNode<>(NAME, new IgnoredInternalExceptionsProcessor(), Collections.emptySet());

final InternalProcessorContext<Object, Object> internalProcessorContext = mockInternalProcessorContext();
// Initialize without a ProcessingExceptionHandler (simulates global thread initialization)
node.init(internalProcessorContext);

// The exception should be rethrown since there's no handler to process it
final RuntimeException exception = assertThrows(RuntimeException.class,
() -> node.process(new Record<>(KEY, VALUE, TIMESTAMP)));

assertEquals("Processing exception should be caught and handled by the processing exception handler.",
exception.getMessage());
}

@ParameterizedTest
@CsvSource({
"FailedProcessingException,java.lang.RuntimeException,Fail processing",
Expand Down