Skip to content

Conversation

@chickenchickenlove
Copy link
Contributor

Background

This PR addresses the limitation where StreamsBuilder throws a TopologyException when multiple source nodes subscribe to overlapping but not identical sets of topics.

For example, the following code previously failed:

builder.stream("topicA");
builder.stream(Arrays.asList("topicA", "topicB"));

Error: Two source nodes are subscribed to overlapping but not equal input topics

Changes

To support this, I have implemented a mechanism to flatten multi-topic source nodes into individual single-topic source nodes before the topology optimization phase.

  • InternalStreamsBuilder#flattenSourceNodesAndRearrange(...)
    • Iterates through the root children to find StreamSourceNodes subscribed to multiple topics.
    • Splits these nodes into multiple single-topic source nodes.
    • Preserves the original buildPriority and child nodes (downstream processors) for each split node to ensure consistent topology construction.
  • InternalStreamsBuilder#mergeDuplicateSourceNodes(...)
    • Removed the check that threw TopologyException for overlapping but unequal topics.
    • Since nodes are now flattened to single topics first, the existing merging logic simply coalesces nodes subscribed to the exact same topic (e.g., the "topicA" node from the first stream and the "topicA" node derived from the second stream).

Note

I found the bug or non-determistic behavior.
More details are available in here (https://issues.apache.org/jira/browse/KAFKA-19923)

That issue has existed independently of this PR and has been present for quite some time.
In addition, once the topology actually starts running, it results in a ClassCastException, which immediately terminates the Kafka Streams application. Because of this fail-fast behavior, the bug is unlikely to affect any real-world, correctly configured Kafka Streams deployments.

While this PR does relax some of the previous constraints, I believe it remains highly unlikely for users to subscribe to the same topic with different ConsumedInternal configurations. Therefore, I do not expect this change to introduce any practical risk.

That said, it may still be helpful to document that all source nodes reading from the same topic should use semantically identical ConsumedInternal configurations.
Here, “identical” means that they should not differ in their TimestampExtractor or their key and value Serdes.

I hope this clarification is helpful.

Result

@github-actions github-actions bot added triage PRs from the community streams labels Nov 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

streams triage PRs from the community

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant