Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit e2c547f

Browse files
committed
Some intermediate changes
1 parent f013dd3 commit e2c547f

File tree

6 files changed

+53
-3
lines changed

6 files changed

+53
-3
lines changed

platform/frontend_connectors/search.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,9 @@ func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.Execu
395395
doneCh <- asyncSearchWithError{translatedQueryBody: translatedQueryBody, err: err}
396396
}
397397

398+
if plan.Merge != nil && plan.ShouldBeMerged {
399+
plan, results = plan.Merge(plan, results)
400+
}
398401
searchResponse := queryTranslator.MakeSearchResponse(plan.Queries, results)
399402

400403
doneCh <- asyncSearchWithError{response: searchResponse, translatedQueryBody: translatedQueryBody, err: err}
@@ -516,10 +519,15 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
516519
logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(queriesBodyConcat)
517520
goto logErrorAndReturn
518521
}
522+
if len(plan.Queries) > 0 {
523+
for i, query := range plan.Queries {
524+
logger.InfoWithCtx(ctx).Msgf("Input SQL query %d: %s", i, query.SelectCommand.String())
525+
}
526+
}
519527
err = q.transformQueries(plan)
520528

521529
// TODO only for debug purposes, remove it
522-
if len(plan.Queries) > 1 {
530+
if len(plan.Queries) > 0 {
523531
logger.InfoWithCtx(ctx).Msgf("Parsed queries: %d", len(plan.Queries))
524532
bytes, _ := body.Bytes()
525533
logger.InfoWithCtx(ctx).Msgf("Body: %s", string(bytes))

platform/go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ require (
2222
github.com/huandu/go-clone v1.7.2
2323
github.com/ip2location/ip2location-go/v9 v9.7.1
2424
github.com/jackc/pgx/v5 v5.7.2
25+
github.com/jinzhu/copier v0.4.0
2526
github.com/k0kubun/pp v3.0.1+incompatible
2627
github.com/knadh/koanf/parsers/json v0.1.0
2728
github.com/knadh/koanf/parsers/yaml v0.1.0

platform/go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,8 @@ github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
8484
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
8585
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
8686
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
87+
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
88+
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
8789
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
8890
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
8991
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=

platform/model/query.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,8 @@ type (
7272
// WARNING: it's probably not passed everywhere where it's needed, just in one place.
7373
// But it works for the test + our dashboards, so let's fix it later if necessary.
7474
// NoMetadataField (nil) is a valid option and means no meta field in the response.
75-
Metadata JsonMap
75+
Metadata JsonMap
76+
ChildQueries *[]Query
7677
}
7778
QueryType interface {
7879
// TranslateSqlResponseToJson
@@ -115,6 +116,9 @@ type ExecutionPlan struct {
115116
// if for some reason we need to stop the execution
116117
// e.g., there is some condition like enough results
117118
Interrupt func(rows []QueryResultRow) bool
119+
120+
ShouldBeMerged bool // if true, we should merge the results of this plan with the main plan
121+
Merge func(plan *ExecutionPlan, results [][]QueryResultRow) (*ExecutionPlan, [][]QueryResultRow)
118122
}
119123

120124
func NewQueryExecutionHints() *QueryOptimizeHints {

platform/optimize/split_time_range_ext.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ package optimize
55
import (
66
"github.com/QuesmaOrg/quesma/platform/logger"
77
"github.com/QuesmaOrg/quesma/platform/model"
8+
"github.com/jinzhu/copier"
9+
"log"
810
"sort"
911
"strconv"
1012
"strings"
@@ -237,6 +239,17 @@ func (s splitTimeRangeExt) Transform(plan *model.ExecutionPlan, properties map[s
237239

238240
if len(newQueries) > 0 {
239241
plan.Queries[0].SelectCommand = newQueries[0].SelectCommand
242+
243+
for i := 1; i < len(newQueries); i++ {
244+
var queryCopy model.Query
245+
err := copier.Copy(&queryCopy, &plan.Queries[0])
246+
if err != nil {
247+
log.Println("copier.Copy failed:", err)
248+
}
249+
plan.Queries = append(plan.Queries, &queryCopy)
250+
plan.Queries[i].SelectCommand = newQueries[i].SelectCommand
251+
}
252+
plan.ShouldBeMerged = true
240253
}
241254
for _, subquery := range newQueries {
242255
sql := subquery.SelectCommand.String()
@@ -246,7 +259,15 @@ func (s splitTimeRangeExt) Transform(plan *model.ExecutionPlan, properties map[s
246259
plan.Interrupt = func(rows []model.QueryResultRow) bool {
247260
return len(rows) >= 500
248261
}
249-
262+
plan.Merge = func(plan *model.ExecutionPlan, results [][]model.QueryResultRow) (*model.ExecutionPlan, [][]model.QueryResultRow) {
263+
var mergedResults [][]model.QueryResultRow
264+
mergedResults = make([][]model.QueryResultRow, 0)
265+
mergedResults = append(mergedResults, results[0])
266+
if len(plan.Queries) > len(mergedResults) {
267+
plan.Queries = plan.Queries[:len(plan.Queries)-1]
268+
}
269+
return plan, mergedResults
270+
}
250271
return plan, nil
251272

252273
}

platform/parsers/elastic_query_dsl/query_translator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,20 @@ func (cw *ClickhouseQueryTranslator) makeTotalCount(queries []*model.Query, resu
229229
func (cw *ClickhouseQueryTranslator) MakeSearchResponse(queries []*model.Query, ResultSets [][]model.QueryResultRow) *model.SearchResp {
230230
var hits *model.SearchHits
231231
var total *model.Total
232+
233+
if len(queries) != len(ResultSets) {
234+
logger.ErrorWithCtx(cw.Ctx).Msgf("queries and resultsets have different length: %d != %d", len(queries), len(ResultSets))
235+
return &model.SearchResp{
236+
Aggregations: nil,
237+
Timeout: false,
238+
Shards: model.ResponseShards{
239+
Total: 1,
240+
Successful: 1,
241+
Failed: 0,
242+
},
243+
}
244+
}
245+
232246
queries, ResultSets, total = cw.makeTotalCount(queries, ResultSets) // get hits and remove it from queries
233247
queries, ResultSets, hits = cw.makeHits(queries, ResultSets) // get hits and remove it from queries
234248

0 commit comments

Comments
 (0)