Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 981de29

Browse files
committed
Merging fixes
1 parent e280a88 commit 981de29

File tree

1 file changed

+23
-11
lines changed

1 file changed

+23
-11
lines changed

platform/optimize/split_time_range_ext.go

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -247,8 +247,7 @@ func (s splitTimeRangeExt) Transform(plan *model.ExecutionPlan, properties map[s
247247
nextQueryId++
248248
}
249249
}
250-
_ = newQueries
251-
//plan.Queries = append(plan.Queries, newQueries...)
250+
plan.Queries = append(plan.Queries, newQueries...)
252251
for i, subqueryPerQuery := range queriesSubqueriesMapping {
253252
querySQL := plan.Queries[i].SelectCommand
254253
logger.Info().Msgf("@@@@@@Original query: %s", querySQL.String())
@@ -264,19 +263,32 @@ func (s splitTimeRangeExt) Transform(plan *model.ExecutionPlan, properties map[s
264263
return false
265264
}
266265
plan.Merge = func(plan *model.ExecutionPlan, results [][]model.QueryResultRow) (*model.ExecutionPlan, [][]model.QueryResultRow) {
267-
if len(plan.Queries) > len(results) {
268-
var mergedResults [][]model.QueryResultRow
269-
mergedResults = make([][]model.QueryResultRow, 0)
270-
// merge the results of the siblings queries
271-
mergedResults = append(mergedResults, results[0])
272-
// remove siblings queries from the plan
273-
plan.Queries = plan.Queries[:len(plan.Queries)-1]
274-
return plan, mergedResults
266+
// That's the case when all siblings were executed and there are results for all of them
267+
if len(plan.Queries) == len(results) {
268+
for k, v := range plan.Siblings {
269+
for _, sibling := range v {
270+
// remove sibling query from the plan
271+
plan.Queries = append(plan.Queries[:sibling], plan.Queries[sibling+1:]...)
272+
// merge results of sibling query into the original query
273+
results[k] = append(results[k], results[sibling]...)
274+
// remove results of sibling query from the results
275+
results = append(results[:sibling], results[sibling+1:]...)
276+
}
277+
}
278+
plan.Siblings = make(map[int][]int)
279+
return plan, results
280+
}
281+
// That's the case when some sibling queries were interrupted and we don't have results for them
282+
for _, v := range plan.Siblings {
283+
for _, sibling := range v {
284+
// remove sibling query from the plan
285+
plan.Queries = append(plan.Queries[:sibling], plan.Queries[sibling+1:]...)
286+
}
275287
}
288+
276289
return plan, results
277290
}
278291
return plan, nil
279-
280292
}
281293

282294
func (s splitTimeRangeExt) Name() string {

0 commit comments

Comments
 (0)