@@ -9,20 +9,27 @@ namespace Streaming
99{
1010namespace
1111{
12+ enum class JoinChangelogMode
13+ {
14+ None,
15+ Try,
16+ Force,
17+ };
18+
1219DataStreamSemanticEx calculateDataStreamSemanticForJoin (
1320 DataStreamSemanticEx left_input_data_stream_semantic,
1421 DataStreamSemanticEx right_input_data_stream_semantic,
1522 std::pair<JoinKind, JoinStrictness> kind_and_strictness,
1623 SelectQueryInfo & query_info,
17- bool allow_emit_changelog_join_result )
24+ JoinChangelogMode join_changelog_mode )
1825{
1926 assert (left_input_data_stream_semantic.streaming );
2027
21- // / Speical handling
28+ // / Special handling
2229 // / 1) for <stream> join <table>, the right inputs don't support changelog semantic
2330 if (!right_input_data_stream_semantic.streaming )
2431 {
25- // / Left stream semantic (Keep the original semantic)
32+ // / Left stream semantic
2633 query_info.left_input_tracking_changes = isChangelogDataStream (left_input_data_stream_semantic);
2734
2835 // / Right stream semantic (Append or VersionedKV or ChangelogKV)
@@ -36,14 +43,16 @@ DataStreamSemanticEx calculateDataStreamSemanticForJoin(
3643 throw Exception (ErrorCodes::NOT_IMPLEMENTED , " The filled join data doesn't support changelog processing" );
3744 }
3845
39- if (query_info.force_emit_changelog && !isChangelogDataStream (left_input_data_stream_semantic))
46+ // / Try/Force emitting changelog
47+ if (join_changelog_mode != JoinChangelogMode::None && !isChangelogDataStream (left_input_data_stream_semantic))
4048 {
4149 if (canTrackChangesFromInput (left_input_data_stream_semantic))
4250 {
4351 query_info.left_input_tracking_changes = true ;
4452 return DataStreamSemantic::Changelog;
4553 }
46- else
54+
55+ if (join_changelog_mode == JoinChangelogMode::Force)
4756 throw Exception (ErrorCodes::NOT_IMPLEMENTED , " Not implemented for emit changelog from append stream join table result" );
4857 }
4958
@@ -72,7 +81,20 @@ DataStreamSemanticEx calculateDataStreamSemanticForJoin(
7281 kind_and_strictness.first ,
7382 kind_and_strictness.second );
7483
75- return DataStreamSemantic::Append;
84+ // / Try/Force emitting changelog
85+ if (join_changelog_mode != JoinChangelogMode::None && !isChangelogDataStream (left_input_data_stream_semantic))
86+ {
87+ if (canTrackChangesFromInput (left_input_data_stream_semantic))
88+ {
89+ query_info.left_input_tracking_changes = true ;
90+ return DataStreamSemantic::Changelog;
91+ }
92+
93+ if (join_changelog_mode == JoinChangelogMode::Force)
94+ throw Exception (ErrorCodes::NOT_IMPLEMENTED , " Not implemented for emit changelog from asof/any/cross join results" );
95+ }
96+
97+ return left_input_data_stream_semantic;
7698 }
7799
78100 // / Left stream semantic
@@ -99,19 +121,21 @@ DataStreamSemanticEx calculateDataStreamSemanticForJoin(
99121 right_input_data_stream_semantic = DataStreamSemantic::Append;
100122 }
101123
102- if (isJoinResultChangelog (left_input_data_stream_semantic, right_input_data_stream_semantic))
124+ bool join_result_is_changelog = isJoinResultChangelog (left_input_data_stream_semantic, right_input_data_stream_semantic);
125+
126+ // / Try/Force emitting changelog
127+ if (join_changelog_mode != JoinChangelogMode::None)
103128 {
104- if (allow_emit_changelog_join_result )
129+ if (join_result_is_changelog )
105130 return DataStreamSemantic::Changelog;
106- else
107- // / NOTE: If the current layer doesn't need joined result emit changelog,
108- // / we shall emit append-only stream with mutable semantic(MutableStream).
109- // / In this way, the outer layer can still track changes if needed.
110- // / For example: select count() from (select * from kv join kv2 on kv.key = kv2.key)
111- return DataStreamSemanticEx{StorageSemantic::NativeKV};
131+
132+ if (join_changelog_mode == JoinChangelogMode::Force)
133+ throw Exception (ErrorCodes::NOT_IMPLEMENTED , " Not implemented for emit changelog from non-changelog join results" );
112134 }
113- else
114- return DataStreamSemantic::Append;
135+
136+ // / When not emitting changelog, preserve NativeKV semantic so the outer layer can still track changes if needed.
137+ // / e.g. select count() from (select * from kv join kv2 on kv.key = kv2.key)
138+ return join_result_is_changelog ? DataStreamSemanticEx{StorageSemantic::NativeKV} : DataStreamSemantic::Append;
115139}
116140}
117141
@@ -146,7 +170,12 @@ DataStreamSemanticPair calculateDataStreamSemantic(
146170 DataStreamSemanticPair semantic_pair;
147171 // / By default, the joined result stream semantic always is append-only unless the current layer has aggregates
148172 // / or the query forces to emit changelog
149- bool allow_emit_changelog_join_result = current_select_has_aggregates || query_info.force_emit_changelog ;
173+ JoinChangelogMode join_changelog_mode = JoinChangelogMode::None;
174+ if (current_select_has_aggregates)
175+ join_changelog_mode = JoinChangelogMode::Try;
176+ else if (query_info.force_emit_changelog )
177+ join_changelog_mode = JoinChangelogMode::Force;
178+
150179 // / First, look at what the current layer does
151180
152181 // / When the current layer has join or aggregates, we calculate the output data semantic locally and its inputs data stream semantic.
@@ -161,7 +190,7 @@ DataStreamSemanticPair calculateDataStreamSemantic(
161190 *right_input_data_stream_semantic,
162191 *kind_and_strictness,
163192 query_info,
164- allow_emit_changelog_join_result );
193+ join_changelog_mode );
165194 }
166195 else
167196 {
@@ -189,10 +218,7 @@ DataStreamSemanticPair calculateDataStreamSemantic(
189218 *right_input_data_stream_semantic,
190219 *kind_and_strictness,
191220 query_info,
192- allow_emit_changelog_join_result);
193-
194- if (query_info.force_emit_changelog && !Streaming::isChangelogDataStream (semantic_pair.effective_input_data_stream_semantic ))
195- throw Exception (ErrorCodes::NOT_IMPLEMENTED , " Not implemented for emit changelog from non-changelog join results" );
221+ join_changelog_mode);
196222
197223 semantic_pair.output_data_stream_semantic = semantic_pair.effective_input_data_stream_semantic ;
198224 }
0 commit comments