Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 68 additions & 7 deletions docs/streams/developer-guide/running-app.html
Original file line number Diff line number Diff line change
Expand Up @@ -134,17 +134,78 @@ <h3><a class="toc-backref" href="#id8">Determining how many application instance
<li>Data should be equally distributed across topic partitions. For example, if two topic partitions each have 1 million messages, this is better than a single partition with 2 million messages and none in the other.</li>
<li>Processing workload should be equally distributed across topic partitions. For example, if the time to process messages varies widely, then it is better to spread the processing-intensive messages across partitions rather than storing these messages within the same partition.</li>
</ul>
</div>
</div>
</div>
</div>
<h3>Available Listeners and Callbacks</h3>

<p>
Kafka Streams provides several listener and callback APIs that allow applications
to react to lifecycle events, handle errors, and observe state changes.
</p>

</div>
</div>
<div class="pagination">
<ul>
<li>
<b>UncaughtExceptionHandler</b><br/>
Copy link
Member

@mjsax mjsax Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<b>UncaughtExceptionHandler</b><br/>
<b>StreamsUncaughtExceptionHandler:</b><br/>

Register with <code>streams.setUncaughtExceptionHandler(...)</code> to handle
uncaught exceptions in stream threads. The handler decides whether to replace
the thread, shut down the client, or shut down the entire application by returning
a <code>StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse</code>.
</li>

<li>
<b>State Listener</b><br/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<b>State Listener</b><br/>
<b>State Listener:</b><br/>

Register with <code>streams.setStateListener(...)</code> to receive callbacks
when the KafkaStreams instance changes state
(e.g., <code>CREATED</code>, <code>RUNNING</code>, <code>REBALANCING</code>, <code>ERROR</code>).
</li>

<li>
<b>Global State Restore Listener</b><br/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<b>Global State Restore Listener</b><br/>
<b>Global State Restore Listener:</b><br/>

Register with <code>streams.setGlobalStateRestoreListener(...)</code> to observe
the progress of state store restoration when a task recovers from a changelog.
The <code>StateRestoreListener</code> interface provides methods:
<code>onRestoreStart</code>, <code>onBatchRestored</code>, and <code>onRestoreEnd</code>.
</li>

<li>
<b>ProductionExceptionHandler</b> and <b>DeserializationExceptionHandler</b><br/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
<b>ProductionExceptionHandler</b> and <b>DeserializationExceptionHandler</b><br/>
<b>ProductionExceptionHandler</b> and <b>DeserializationExceptionHandler</b>:<br/>

Configure via <code>StreamsConfig</code> to control how the application responds
to production or deserialization errors at runtime. You can plug in custom
implementations to log, continue, or fail the application.
</li>

<li>
<b>ConsumerRebalanceListener</b><br/>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one is not supported by Kafka Streams, but only by the KafkaConsumer, as the text actually says. Don't think it belongs into the KS part of the docs.

When constructing a <code>KafkaConsumer</code> manually or in advanced usage,
a custom <code>ConsumerRebalanceListener</code> can be supplied to observe
partition assignment and revocation events.
</li>
</ul>

<p>Example usage of exception and state restore listeners:</p>

<pre>
<code class="language-java">
// Handle uncaught exceptions
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// Handle uncaught exceptions
// Handle uncaught exceptions

Copy link
Member

@mjsax mjsax Sep 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This applies to all lines of the code example. To avoid weird formatting, we need to remove all these trailing spaces, otherwise they are rendered in the docs, and instead of this:

Example usage of exception and state restore listeners:

// Handle uncaught exceptions
...

we would get

Example usage of exception and state restore listeners:

                    // Handle uncaught exceptions
                    ...

What is rather weird.

streams.setUncaughtExceptionHandler((thread, e) -> {
log.error("Stream thread {} failed due to {}", thread, e.getMessage(), e);
return StreamThreadExceptionResponse.SHUTDOWN_APPLICATION;
});

// Observe state restoration
streams.setGlobalStateRestoreListener(new StateRestoreListener() {
public void onRestoreStart(...) { ... }
public void onBatchRestored(...) { ... }
public void onRestoreEnd(...) { ... }
});
</code>
</pre>

</div>
</div>
<div class="pagination">
<a href="/{{version}}/documentation/streams/developer-guide/memory-mgmt" class="pagination__btn pagination__btn__prev">Previous</a>
<a href="/{{version}}/documentation/streams/developer-guide/manage-topics" class="pagination__btn pagination__btn__next">Next</a>
</div>
</div>
</script>

<!--#include virtual="../../../includes/_header.htm" -->
Expand Down
Loading