-
Notifications
You must be signed in to change notification settings - Fork 14.9k
KAFKA-16262: Add IQv2 to Kafka Streams documentation #21367
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from all commits
71473d1
aa79e13
8995ba0
a9c84a4
8470eec
6196acf
b865597
c921e8b
05425db
246047a
07c3398
33f788c
35a3b6b
b95c6a7
1f2961c
c267977
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -486,9 +486,106 @@ At this point the full state of the application is interactively queryable: | |
| * Collectively, this allows us to query the full state of the entire application. | ||
|
|
||
|
|
||
| ## Interactive Queries APIs | ||
|
|
||
| Kafka Streams currently provides two APIs for querying state: | ||
|
|
||
| - **Interactive Queries v2 (IQv2)** – the newer, query-based API | ||
| - **Interactive Queries v1 (IQv1)** – the original, store-access-based API | ||
|
|
||
| This documentation introduces **Interactive Queries v2** as the **new API**, while retaining IQv1 for backward compatibility. | ||
|
|
||
|
|
||
| **Interactive Queries v2 (IQv2)** introduces a **query-based API** for accessing Kafka Streams application state. | ||
| Instead of directly interacting with state store objects, applications **define structured queries** that are executed by Kafka Streams. | ||
|
|
||
| IQv2 improves API safety, extensibility, and error handling by: | ||
|
|
||
| - Decoupling query definition from store internals | ||
| - Returning structured query results instead of throwing exceptions | ||
| - Enabling clearer handling of partial failures in distributed environments | ||
|
|
||
| Queries are executed against local state stores on an application instance, with Kafka Streams managing: | ||
|
|
||
| - Query execution | ||
| - Validation | ||
| - Result and failure reporting | ||
|
|
||
| IQv2 is designed to evolve independently of specific state store implementations and serves as the **successor to the legacy Interactive Queries v1 API.** | ||
|
|
||
| ## How Interactive Queries v2 Works | ||
|
|
||
| Interactive Queries v2 works by allowing applications to **define explicit query objects** that describe what data to fetch from a state store. | ||
| These queries are submitted to Kafka Streams, which is responsible for **executing the query,** handling validation, and returning a **structured result.** | ||
|
|
||
| Instead of exposing state store internals, Kafka Streams processes the query and returns a **StateQueryResult**, which may contain either the requested data or detailed failure information. | ||
| This approach makes querying state safer, more extensible, and better suited for distributed environments. | ||
| To see an end-to-end application with interactive queries, review the demo applications. | ||
|
|
||
| ## Building and Executing a Query (IQv2) | ||
|
|
||
| In Interactive Queries v2, applications first **build a query object** that describes the data to retrieve from a specific state store (for example, a key lookup or range query). | ||
| This query is wrapped in a **StateQueryRequest**, which also specifies the target state store. | ||
|
|
||
| Once the request is created, it is executed using the **KafkaStreams#query()** method. | ||
| Kafka Streams validates the request, executes the query against the appropriate local state store, and returns a **StateQueryResult** containing either the query result or failure details. | ||
|
|
||
| This separation of **query construction** and **query execution** allows Kafka Streams to manage execution logic while keeping application code clean and extensible. | ||
|
|
||
| 1. **Build the Query** | ||
|
|
||
| ``` | ||
| // Build a query to fetch the value for a specific key | ||
| StateQueryRequest<KeyQuery<String, Long>> request = | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did you run this test? It needs to be |
||
| StateQueryRequest.inStore("counts-store") | ||
| .withQuery(KeyQuery.withKey("user-1")); | ||
|
|
||
| Here: | ||
|
|
||
| - `"counts-store"` is the name of the state store. | ||
| - `"user-1"` is the key you want to query. | ||
| - `KeyQuery` defines what kind of query you are performing. | ||
|
|
||
| 2. **Execute the Query** | ||
| ``` | ||
| StateQueryResult<Long> result = kafkaStreams.query(request); | ||
| Kafka Streams executes the query against the local state store and returns a result object. | ||
|
|
||
| 3. **Read the Result** | ||
| ``` | ||
| if (result.hasFailures()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| // Handle query failures | ||
| result.getFailures().forEach((host, failure) -> | ||
| System.out.println("Query failed on " + host + ": " + failure) | ||
| ); | ||
| } else { | ||
| // Get the successful query result | ||
| Long value = result.getResult(); | ||
| System.out.println("Query result: " + value); | ||
| } | ||
|
|
||
| ## Comparison : Interactive Queries (IQv1) and (IQv2) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: remove white-space before |
||
|
|
||
| | **Aspects** | **Interactive Queries v1 (Legacy API)** | **Interactive Queries v2 (New API)** | | ||
| |-------------|-----------------------------------------|--------------------------------------| | ||
| | API Style | Store-based API | Query-based API | | ||
| | How queries are defined | Direct access to state store objects | Explicit query objects (e.g., KeyQuery) | | ||
| | Interaction with state stores | Application interacts directly with store internals | Kafka Streams executes queries on behalf of the application | | ||
| | Result handling | Results returned directly or via iterators | Results wrapped in StateQueryResult | | ||
| | Error handling | Exception-based | Structured failures returned with results | | ||
| | Coupling to store implementation | Tightly coupled to store types | Decoupled from store internals | | ||
| | Extensibility | Harder to evolve without breaking changes | Designed to be extensible and future-proof | | ||
| | Partial failure visibility | Limited | Explicit visibility into per-host failures | | ||
| | Feature completeness | Feature complete and stable | Not yet feature complete | | ||
|
|
||
| ## Limitations of Interactive Queries v2 | ||
|
|
||
| Interactive Queries v2 is not yet feature complete. | ||
|
|
||
| **Some advanced query patterns available in the original Interactive Queries API may not yet be supported. | ||
| The API is expected to evolve, and user feedback will guide future improvements.** | ||
|
|
||
|
|
||
| * [Documentation](/documentation) | ||
| * [Kafka Streams](/documentation/streams) | ||
| * [Developer Guide](/documentation/streams/developer-guide/) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we add
returns metadata (prtitions) beside query results?