Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
Expand Down Expand Up @@ -306,6 +308,7 @@ public void buildAndOptimizeTopology() {
}

public void buildAndOptimizeTopology(final Properties props) {
flattenSourceNodesAndRearrange();
mergeDuplicateSourceNodes();
optimizeTopology(props);
enableVersionedSemantics();
Expand Down Expand Up @@ -376,6 +379,53 @@ private void rewriteRepartitionNodes() {
}
}

private void flattenSourceNodesAndRearrange() {
final List<StreamSourceNode<?, ?>> multiTopicStreamSources = new ArrayList<>();

for (final GraphNode graphNode : root.children()) {
if (graphNode instanceof StreamSourceNode) {
final StreamSourceNode<?, ?> source = (StreamSourceNode<?, ?>) graphNode;
if (source.topicNames().isPresent() &&
source.topicNames().get().size() > 1) {
multiTopicStreamSources.add(source);
}
}
}

if (multiTopicStreamSources.isEmpty()) {
return;
}

for (final StreamSourceNode<?, ?> previousSourceNode : multiTopicStreamSources) {
final Collection<String> topics = previousSourceNode.topicNames().get();

// Multiple single-topic sources derived from the same multi-topic source
// are assigned the same 'buildPriority' to ensure consistent relative ordering
// with respect to their downstream nodes (e.g., join/window optimizations).
final int buildPriority = previousSourceNode.buildPriority();
final Collection<GraphNode> children = previousSourceNode.children();
final ConsumedInternal<?, ?> consumedInternal = previousSourceNode.consumedInternal();

for (final String topic : topics) {
final KStreamImpl<?, ?> newKStream = (KStreamImpl<?, ?>) stream(
Collections.singleton(topic),
consumedInternal
);
final GraphNode newSourceNode = newKStream.graphNode;
if (newSourceNode instanceof StreamSourceNode) {
final StreamSourceNode<?, ?> newSource = (StreamSourceNode<?, ?>) newSourceNode;
newSource.setBuildPriority(buildPriority);
}

for (final GraphNode child : children) {
newSourceNode.addChild(child);
}
}

root.removeChild(previousSourceNode);
previousSourceNode.clearChildren();
}
}

private void mergeDuplicateSourceNodes() {
final Map<String, StreamSourceNode<?, ?>> topicsToSourceNodes = new HashMap<>();
Expand Down Expand Up @@ -407,19 +457,10 @@ private void mergeDuplicateSourceNodes() {
} else {
final StreamSourceNode<?, ?> mainSourceNode = topicsToSourceNodes.get(
topic);
// TODO we only merge source nodes if the subscribed topic(s) are an exact match, so it's still not
// possible to subscribe to topicA in one KStream and topicA + topicB in another. We could achieve
// this by splitting these source nodes into one topic per node and routing to the subscribed children
if (!mainSourceNode.topicNames()
.equals(currentSourceNode.topicNames())) {
LOG.error(
"Topic {} was found in subscription for non-equal source nodes {} and {}",
topic, mainSourceNode, currentSourceNode);
throw new TopologyException(
"Two source nodes are subscribed to overlapping but not equal input topics");
if (mainSourceNode.topicNames().equals(currentSourceNode.topicNames())) {
mainSourceNode.merge(currentSourceNode);
root.removeChild(graphNode);
}
mainSourceNode.merge(currentSourceNode);
root.removeChild(graphNode);
}
}
}
Expand Down
Loading