diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java index d42f2de916714..550651383fec2 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java @@ -47,6 +47,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Collection; import java.util.Collections; import java.util.Comparator; @@ -306,6 +308,7 @@ public void buildAndOptimizeTopology() { } public void buildAndOptimizeTopology(final Properties props) { + flattenSourceNodesAndRearrange(); mergeDuplicateSourceNodes(); optimizeTopology(props); enableVersionedSemantics(); @@ -376,6 +379,53 @@ private void rewriteRepartitionNodes() { } } + private void flattenSourceNodesAndRearrange() { + final List> multiTopicStreamSources = new ArrayList<>(); + + for (final GraphNode graphNode : root.children()) { + if (graphNode instanceof StreamSourceNode) { + final StreamSourceNode source = (StreamSourceNode) graphNode; + if (source.topicNames().isPresent() && + source.topicNames().get().size() > 1) { + multiTopicStreamSources.add(source); + } + } + } + + if (multiTopicStreamSources.isEmpty()) { + return; + } + + for (final StreamSourceNode previousSourceNode : multiTopicStreamSources) { + final Collection topics = previousSourceNode.topicNames().get(); + + // Multiple single-topic sources derived from the same multi-topic source + // are assigned the same 'buildPriority' to ensure consistent relative ordering + // with respect to their downstream nodes (e.g., join/window optimizations). + final int buildPriority = previousSourceNode.buildPriority(); + final Collection children = previousSourceNode.children(); + final ConsumedInternal consumedInternal = previousSourceNode.consumedInternal(); + + for (final String topic : topics) { + final KStreamImpl newKStream = (KStreamImpl) stream( + Collections.singleton(topic), + consumedInternal + ); + final GraphNode newSourceNode = newKStream.graphNode; + if (newSourceNode instanceof StreamSourceNode) { + final StreamSourceNode newSource = (StreamSourceNode) newSourceNode; + newSource.setBuildPriority(buildPriority); + } + + for (final GraphNode child : children) { + newSourceNode.addChild(child); + } + } + + root.removeChild(previousSourceNode); + previousSourceNode.clearChildren(); + } + } private void mergeDuplicateSourceNodes() { final Map> topicsToSourceNodes = new HashMap<>(); @@ -407,19 +457,10 @@ private void mergeDuplicateSourceNodes() { } else { final StreamSourceNode mainSourceNode = topicsToSourceNodes.get( topic); - // TODO we only merge source nodes if the subscribed topic(s) are an exact match, so it's still not - // possible to subscribe to topicA in one KStream and topicA + topicB in another. We could achieve - // this by splitting these source nodes into one topic per node and routing to the subscribed children - if (!mainSourceNode.topicNames() - .equals(currentSourceNode.topicNames())) { - LOG.error( - "Topic {} was found in subscription for non-equal source nodes {} and {}", - topic, mainSourceNode, currentSourceNode); - throw new TopologyException( - "Two source nodes are subscribed to overlapping but not equal input topics"); + if (mainSourceNode.topicNames().equals(currentSourceNode.topicNames())) { + mainSourceNode.merge(currentSourceNode); + root.removeChild(graphNode); } - mainSourceNode.merge(currentSourceNode); - root.removeChild(graphNode); } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index b839df53167e2..6ba8e0ce88f7e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.streams; import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; @@ -45,6 +46,7 @@ import org.apache.kafka.streams.kstream.TableJoined; import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.kstream.internals.ConsumedInternal; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.api.Processor; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -89,6 +91,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.Properties; import java.util.Random; import java.util.Set; @@ -108,6 +111,7 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -2306,9 +2310,27 @@ public void shouldAllowReadingFromSameCollectionOfTopics() { } @Test - public void shouldNotAllowReadingFromOverlappingAndUnequalCollectionOfTopics() { + public void shouldAllowReadingFromOverlappingAndUnequalCollectionOfTopics() { builder.stream(Collections.singletonList("topic")); builder.stream(asList("topic", "anotherTopic")); + assertDoesNotThrow(() -> builder.build()); + } + + @Test + public void shouldAllowReadingFromOverlappingAndUnequalCollectionOfTopicsMoreComplexCase() { + builder.stream(Collections.singletonList("topic")); + builder.stream(asList("topic", "anotherTopic")); + builder.stream(Collections.singletonList("hello-topic")); + builder.stream(asList("hello-topic", "merge-topic")); + assertDoesNotThrow(() -> builder.build()); + } + + @Test + public void shouldAllowReadingFromOverlappingAndUnequalCollectionOfTopicsMoreComplexCaseButDifferentConsumedWith() { + builder.stream(Collections.singletonList("topic")); + builder.stream(asList("topic", "anotherTopic")); + builder.stream(Collections.singletonList("hello-topic")); + builder.stream(asList("hello-topic", "merge-topic"), Consumed.with(AutoOffsetReset.EARLIEST)); assertThrows(TopologyException.class, builder::build); } @@ -2881,6 +2903,265 @@ void shouldThrowWhenCoGroupWithoutRepartitionNameAndMaterializedName() { assertTrue(e.getMessage().contains("Following repartition topic(s) has not been named: COGROUPKSTREAM-AGGREGATE-STATE-STORE-0000000003-repartition")); } + @Test + public void shouldAllowDifferentConsumedInstanceBetweenSourceNodesWithSubset() { + // Given + builder.stream(Collections.singleton("input1"), Consumed.with(Serdes.String(), Serdes.String())); + builder.stream(Collections.singleton("input1"), Consumed.with(Serdes.ByteArray(), Serdes.ByteArray())); + + // When + Then + assertDoesNotThrow(() -> builder.build()); + } + + @Test + public void shouldAllowDifferentConsumedInstanceBetweenSourceNodesWithSubsetInMoreComplexCase() { + // Given + final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(null, null)); + final KStream stream1 = builder.stream(Collections.singleton("input1"), consumed); + final KStream stream2 = builder.stream(Collections.singleton("input2"), consumed); + stream1.merge(stream2); + builder.stream(Set.of("input1", "input3", "input4"), + Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()) + ); + + // When + Then + assertDoesNotThrow(() -> builder.build()); + } + + @Test + public void shouldFlattenNodes() { + // GIVEN + final StreamsBuilder builder = new StreamsBuilder(); + final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(null, null)); + builder.stream(Set.of("t1", "t2", "t3", "t4", "t5"), consumed); + + // WHEN + final Topology topology = builder.build(); + + // THEN + final String expectedDescribe = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000001 (topics: [t4])\n" + + " --> none\n\n" + + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [t3])\n" + + " --> none\n\n" + + " Sub-topology: 2\n" + + " Source: KSTREAM-SOURCE-0000000004 (topics: [t2])\n" + + " --> none\n\n" + + " Sub-topology: 3\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [t1])\n" + + " --> none\n\n" + + " Sub-topology: 4\n" + + " Source: KSTREAM-SOURCE-0000000002 (topics: [t5])\n" + + " --> none\n\n"; + assertEquals(expectedDescribe, topology.describe().toString()); + } + + @Test + public void complexTopologyFlattenTest() { + final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(null, null)); + props.put(StreamsConfig.TOPOLOGY_OPTIMIZATION_CONFIG, StreamsConfig.OPTIMIZE); + + // For SubTopology 0 + final KStream stream1 = builder.stream(Set.of("t1", "t2", "t3"), consumed); + final KStream stream2 = builder.stream(Set.of("t1", "t4", "t5"), consumed); + final KStream mergeInSubTopology1 = stream1.merge(stream2); + + // In here, only SubTopology 0 exists. + final Topology topology1 = builder.build(); + final String sourceIdShouldBeLaterInDescription1 = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [t1])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000004 (topics: [t2])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [t3])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [t4])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [t5])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Processor: KSTREAM-MERGE-0000000002 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000004, KSTREAM-SOURCE-0000000005, KSTREAM-SOURCE-0000000006, KSTREAM-SOURCE-0000000007\n\n"; + assertEquals(sourceIdShouldBeLaterInDescription1, topology1.describe().toString()); + + // Add SubTopology 1 + final KStream stream3 = builder.stream(Set.of("a1", "a2", "a3"), consumed); + final KStream stream4 = builder.stream(Set.of("a4", "a5", "a6"), consumed); + final KStream lastNodeInSubTop1 = stream3.merge(stream4); + + // In here, SubTopology 0 and SubTopology 1 exist. + final Topology topology2 = builder.build(); + final String descriptionForSubTopology2 = + " Sub-topology: 1\n" + + " Source: KSTREAM-SOURCE-0000000012 (topics: [a1])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000013 (topics: [a2])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000014 (topics: [a3])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [a4])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000016 (topics: [a5])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000017 (topics: [a6])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000012, KSTREAM-SOURCE-0000000013, KSTREAM-SOURCE-0000000014, KSTREAM-SOURCE-0000000015, " + + "KSTREAM-SOURCE-0000000016, KSTREAM-SOURCE-0000000017\n\n"; + + final String expectDescription1 = sourceIdShouldBeLaterInDescription1 + descriptionForSubTopology2; + assertEquals(expectDescription1, topology2.describe().toString()); + + // Add new StreamSource to SubTopology 0. + final KStream stream5 = builder.stream(Set.of("t1", "t7", "t8"), consumed); + final KStream lastNodeInSubTop0 = mergeInSubTopology1.merge(stream5); + + // In here, SubTopology 0 is updated. + final Topology topology3 = builder.build(); + final String descriptionForSubTopology0AfterNewStreamAdded = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [t1])\n" + + " --> KSTREAM-MERGE-0000000002, KSTREAM-MERGE-0000000019\n" + + " Source: KSTREAM-SOURCE-0000000004 (topics: [t2])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [t3])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [t4])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [t5])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Processor: KSTREAM-MERGE-0000000002 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " <-- KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000004, KSTREAM-SOURCE-0000000005, KSTREAM-SOURCE-0000000006, KSTREAM-SOURCE-0000000007\n" + + " Source: KSTREAM-SOURCE-0000000020 (topics: [t7])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " Source: KSTREAM-SOURCE-0000000021 (topics: [t8])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " Processor: KSTREAM-MERGE-0000000019 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-SOURCE-0000000003, KSTREAM-MERGE-0000000002, KSTREAM-SOURCE-0000000020, KSTREAM-SOURCE-0000000021\n\n"; + + final String sourceIdShouldBeLaterInDescription3 = + descriptionForSubTopology0AfterNewStreamAdded + descriptionForSubTopology2; + assertEquals(sourceIdShouldBeLaterInDescription3, topology3.describe().toString()); + + // Merge SubTopology 0 and SubTopology 1 + lastNodeInSubTop0.merge(lastNodeInSubTop1); + final Topology lastTopology = builder.build(); + + // Only SubTopology 0 exists. + final String expectedFinalTopologyDescription = + "Topologies:\n" + + " Sub-topology: 0\n" + + " Source: KSTREAM-SOURCE-0000000003 (topics: [t1])\n" + + " --> KSTREAM-MERGE-0000000002, KSTREAM-MERGE-0000000019\n" + + " Source: KSTREAM-SOURCE-0000000004 (topics: [t2])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000005 (topics: [t3])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000006 (topics: [t4])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Source: KSTREAM-SOURCE-0000000007 (topics: [t5])\n" + + " --> KSTREAM-MERGE-0000000002\n" + + " Processor: KSTREAM-MERGE-0000000002 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " <-- KSTREAM-SOURCE-0000000003, KSTREAM-SOURCE-0000000004, KSTREAM-SOURCE-0000000005, KSTREAM-SOURCE-0000000006, KSTREAM-SOURCE-0000000007\n" + + " Source: KSTREAM-SOURCE-0000000012 (topics: [a1])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000013 (topics: [a2])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000014 (topics: [a3])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000015 (topics: [a4])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000016 (topics: [a5])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000017 (topics: [a6])\n" + + " --> KSTREAM-MERGE-0000000011\n" + + " Source: KSTREAM-SOURCE-0000000020 (topics: [t7])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " Source: KSTREAM-SOURCE-0000000021 (topics: [t8])\n" + + " --> KSTREAM-MERGE-0000000019\n" + + " Processor: KSTREAM-MERGE-0000000011 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000023\n" + + " <-- KSTREAM-SOURCE-0000000012, KSTREAM-SOURCE-0000000013, KSTREAM-SOURCE-0000000014, KSTREAM-SOURCE-0000000015, KSTREAM-SOURCE-0000000016, KSTREAM-SOURCE-0000000017\n" + + " Processor: KSTREAM-MERGE-0000000019 (stores: [])\n" + + " --> KSTREAM-MERGE-0000000023\n" + + " <-- KSTREAM-SOURCE-0000000003, KSTREAM-MERGE-0000000002, KSTREAM-SOURCE-0000000020, KSTREAM-SOURCE-0000000021\n" + + " Processor: KSTREAM-MERGE-0000000023 (stores: [])\n" + + " --> none\n" + + " <-- KSTREAM-MERGE-0000000019, KSTREAM-MERGE-0000000011\n\n"; + assertEquals(expectedFinalTopologyDescription, lastTopology.describe().toString()); + } + + @Test + public void shouldForbiddenWhenTopicAlreadySubscribedInOtherNodes() { + // GIVEN + final StreamsBuilder builder = new StreamsBuilder(); + final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(null, null)); + + // It can be merged. + builder.stream(Set.of("t1"), consumed); + builder.stream(Set.of("t1", "t2", "t3", "t4", "t5"), consumed); + + // Because of this pattern, it should throw errors. + builder.stream(Pattern.compile("t.*"), consumed); + builder.globalTable("t1", consumed); + + // WHEN + Then + assertThrows(TopologyException.class, builder::build); + } + + @Test + public void shouldReadFromBothTopicsWhenOverlappingSources() { + // GIVEN + final StreamsBuilder builder = new StreamsBuilder(); + final Serde keySered = Serdes.String(); + final Serde valueSered = Serdes.String(); + final Consumed consumed = Consumed.with(Serdes.String(), Serdes.String()); + + final KStream s1 = builder.stream("t1", consumed); + final KStream s2 = builder.stream(Arrays.asList("t1", "t2"), consumed); + + s1.merge(s2).to("out"); + final Topology topology = builder.build(); + + try (TopologyTestDriver driver = new TopologyTestDriver(topology, props)) { + final TestInputTopic t1 = driver.createInputTopic( + "t1", + keySered.serializer(), + valueSered.serializer() + ); + final TestInputTopic t2 = driver.createInputTopic( + "t2", + keySered.serializer(), + valueSered.serializer() + ); + final TestOutputTopic out = driver.createOutputTopic( + "out", + keySered.deserializer(), + valueSered.deserializer() + ); + + // WHEN + t1.pipeInput("k1", "v1"); + t2.pipeInput("k2", "v2"); + + // THEN + assertEquals(new KeyValue<>("k1", "v1"), out.readKeyValue()); + assertEquals(new KeyValue<>("k2", "v2"), out.readKeyValue()); + assertThrows(NoSuchElementException.class, out::readKeyValue); + } + } + + private StreamsBuilder buildCoGroupTopology(final Grouped grouped, final Materialized> materialized) {