Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 6165e9d

Browse files
authored
Generalizing plan execution (#1429)
<!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' or '/run-it' --> This PR extends the current Quesma workflow to allow interrupting query execution when the execution plan contains multiple queries and we’ve already retrieved enough results. The idea is to split the original query into n queries with different time ranges. If the first one (covering a 15-minute range) returns enough results, we will stop the execution. The problem is reproducible on `kibana_sample_data_flights` with the following query: ``` GET /kibana_sample_data_flights/_search { "_source": false, "fields": [ { "field": "*", "include_unmapped": "true" }, { "field": "@timestamp", "format": "strict_date_optional_time" }, { "field": "timestamp", "format": "strict_date_optional_time" } ], "highlight": { "fields": { "*": {} }, "fragment_size": 2147483647, "post_tags": [ "@/kibana-highlighted-field@" ], "pre_tags": [ "@kibana-highlighted-field@" ] }, "query": { "bool": { "filter": [ { "range": { "timestamp": { "format": "strict_date_optional_time", "gte": "2025-05-13T08:46:01.926Z", "lte": "2025-05-13T09:01:01.926Z" } } } ], "must": [], "must_not": [], "should": [] } }, "runtime_mappings": { "hour_of_day": { "script": { "source": "emit(doc['timestamp'].value.getHour());" }, "type": "long" } }, "script_fields": {}, "size": 500, "sort": [ { "timestamp": { "format": "strict_date_optional_time", "order": "desc", "unmapped_type": "boolean" } }, { "_doc": { "order": "desc", "unmapped_type": "boolean" } } ], "stored_fields": [ "*" ], "track_total_hits": false, "version": true } ```
1 parent c2db79a commit 6165e9d

20 files changed

+569
-118
lines changed

cmd/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/QuesmaOrg/quesma/quesma-cli
33
go 1.24.0
44

55
require (
6-
github.com/QuesmaOrg/quesma/platform v0.0.0-20250303135300-04f0a4897c30
6+
github.com/QuesmaOrg/quesma/platform v0.0.0-20250519105918-0f6942f1a3dd
77
github.com/goccy/go-json v0.10.5
88
github.com/stretchr/testify v1.10.0
99
)

cmd/v2_test_objects.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -422,11 +422,11 @@ func (p *QueryTransformationPipeline) ComposeResult(results [][]model.QueryResul
422422
type QueryTransformer1 struct {
423423
}
424424

425-
func (p *QueryTransformer1) Transform(queries []*model.Query) ([]*model.Query, error) {
425+
func (p *QueryTransformer1) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {
426426
logger.Debug().Msg("SimpleQueryTransformationPipeline: Transform")
427427
// Do basic transformation
428428

429-
return queries, nil
429+
return plan, nil
430430
}
431431

432432
func NewQueryTransformer1() *QueryTransformer1 {

platform/frontend_connectors/schema_transformer.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -974,7 +974,7 @@ func (s *SchemaCheckPass) applyAliasColumns(indexSchema schema.Schema, query *mo
974974
return query, nil
975975
}
976976

977-
func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, error) {
977+
func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {
978978

979979
transformationChain := []struct {
980980
TransformationName string
@@ -1014,7 +1014,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
10141014
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
10151015
}
10161016

1017-
for k, query := range queries {
1017+
for k, query := range plan.Queries {
10181018
var err error
10191019

10201020
if !s.cfg.Logging.EnableSQLTracing {
@@ -1043,9 +1043,9 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
10431043
}
10441044
}
10451045

1046-
queries[k] = query
1046+
plan.Queries[k] = query
10471047
}
1048-
return queries, nil
1048+
return plan, nil
10491049
}
10501050

10511051
func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {

platform/frontend_connectors/schema_transformer_test.go

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -499,10 +499,11 @@ func Test_ipRangeTransform(t *testing.T) {
499499
q.Schema = currentSchema
500500
q.Indexes = []string{q.TableName}
501501
}
502+
plan := model.NewExecutionPlan(queries[k], nil)
502503

503-
resultQueries, err := transform.Transform(queries[k])
504+
resultPlan, err := transform.Transform(plan)
504505
assert.NoError(t, err)
505-
assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultQueries[0].SelectCommand.String())
506+
assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultPlan.Queries[0].SelectCommand.String())
506507
})
507508
}
508509
}
@@ -739,17 +740,21 @@ func Test_arrayType(t *testing.T) {
739740
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
740741
tt.query.Schema = indexSchema
741742
tt.query.Indexes = []string{tt.query.TableName}
742-
actual, err := transform.Transform([]*model.Query{tt.query})
743+
plan := model.NewExecutionPlan(
744+
[]*model.Query{tt.query},
745+
nil,
746+
)
747+
actual, err := transform.Transform(plan)
743748
assert.NoError(t, err)
744749

745750
if err != nil {
746751
t.Fatal(err)
747752
}
748753

749-
assert.True(t, len(actual) == 1, "len queries == 1")
754+
assert.True(t, len(actual.Queries) == 1, "len queries == 1")
750755

751756
expectedJson := asString(tt.expected)
752-
actualJson := asString(actual[0])
757+
actualJson := asString(actual.Queries[0])
753758

754759
assert.Equal(t, expectedJson, actualJson)
755760
})
@@ -1802,23 +1807,27 @@ func Test_mapKeys(t *testing.T) {
18021807
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
18031808
tt.query.Schema = indexSchema
18041809
tt.query.Indexes = []string{tt.query.TableName}
1805-
var actual []*model.Query
1810+
var actual *model.ExecutionPlan
18061811
var err error
1812+
plan := model.NewExecutionPlan(
1813+
[]*model.Query{tt.query},
1814+
nil,
1815+
)
18071816
if indexConfig[tt.query.TableName].EnableFieldMapSyntax {
1808-
actual, err = transformPass.Transform([]*model.Query{tt.query})
1817+
actual, err = transformPass.Transform(plan)
18091818
} else {
1810-
actual, err = noTransformPass.Transform([]*model.Query{tt.query})
1819+
actual, err = noTransformPass.Transform(plan)
18111820
}
18121821
assert.NoError(t, err)
18131822

18141823
if err != nil {
18151824
t.Fatal(err)
18161825
}
18171826

1818-
assert.True(t, len(actual) == 1, "len queries == 1")
1827+
assert.True(t, len(actual.Queries) == 1, "len queries == 1")
18191828

18201829
expectedJson := asString(tt.expected)
1821-
actualJson := asString(actual[0])
1830+
actualJson := asString(actual.Queries[0])
18221831

18231832
assert.Equal(t, expectedJson, actualJson)
18241833
})
@@ -1895,17 +1904,22 @@ func Test_cluster(t *testing.T) {
18951904
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
18961905
tt.query.Schema = indexSchema
18971906
tt.query.Indexes = []string{tt.query.TableName}
1898-
actual, err := transform.Transform([]*model.Query{tt.query})
1907+
1908+
plan := model.NewExecutionPlan(
1909+
[]*model.Query{tt.query},
1910+
nil,
1911+
)
1912+
actual, err := transform.Transform(plan)
18991913
assert.NoError(t, err)
19001914

19011915
if err != nil {
19021916
t.Fatal(err)
19031917
}
19041918

1905-
assert.True(t, len(actual) == 1, "len queries == 1")
1919+
assert.True(t, len(actual.Queries) == 1, "len queries == 1")
19061920

19071921
expectedJson := asString(tt.expected)
1908-
actualJson := asString(actual[0])
1922+
actualJson := asString(actual.Queries[0])
19091923

19101924
assert.Equal(t, expectedJson, actualJson)
19111925
})

platform/frontend_connectors/search.go

Lines changed: 63 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ type AsyncQuery struct {
364364

365365
func (q *QueryRunner) transformQueries(plan *model.ExecutionPlan) error {
366366
var err error
367-
plan.Queries, err = q.transformationPipeline.Transform(plan.Queries)
367+
_, err = q.transformationPipeline.Transform(plan)
368368
if err != nil {
369369
return fmt.Errorf("error transforming queries: %v", err)
370370
}
@@ -516,7 +516,13 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
516516
logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(queriesBodyConcat)
517517
goto logErrorAndReturn
518518
}
519+
if len(plan.Queries) > 0 {
520+
for i, query := range plan.Queries {
521+
logger.InfoWithCtx(ctx).Msgf("Input SQL query %d: %s", i, query.SelectCommand.String())
522+
}
523+
}
519524
err = q.transformQueries(plan)
525+
520526
if err != nil {
521527
goto logErrorAndReturn
522528
}
@@ -669,70 +675,97 @@ func (q *QueryRunner) isInternalKibanaQuery(query *model.Query) bool {
669675
return false
670676
}
671677

672-
type QueryJob func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error)
678+
type QueryJob func(ctx context.Context) (*model.ExecutionPlan, []model.QueryResultRow, clickhouse.PerformanceResult, error)
673679

674680
func (q *QueryRunner) runQueryJobsSequence(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
675681
var results = make([][]model.QueryResultRow, 0)
676682
var performance = make([]clickhouse.PerformanceResult, 0)
677-
for _, job := range jobs {
678-
rows, perf, err := job(ctx)
683+
for n, job := range jobs {
684+
plan, rows, perf, err := job(ctx)
679685
performance = append(performance, perf)
680686
if err != nil {
681687
return nil, performance, err
682688
}
683689

684690
results = append(results, rows)
691+
if plan.Interrupt(n, rows) {
692+
break
693+
}
685694
}
686695
return results, performance, nil
687696
}
688697

689-
func (q *QueryRunner) runQueryJobsParallel(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
690-
691-
var results = make([][]model.QueryResultRow, len(jobs))
692-
var performances = make([]clickhouse.PerformanceResult, len(jobs))
698+
func (q *QueryRunner) runQueryJobsParallel(ctx context.Context, jobs []QueryJob) (
699+
[][]model.QueryResultRow,
700+
[]clickhouse.PerformanceResult,
701+
error,
702+
) {
703+
var (
704+
results = make([][]model.QueryResultRow, len(jobs))
705+
performances = make([]clickhouse.PerformanceResult, len(jobs))
706+
)
693707
type result struct {
708+
plan *model.ExecutionPlan
694709
rows []model.QueryResultRow
695710
perf clickhouse.PerformanceResult
696711
err error
697712
jobId int
698713
}
699714

700-
// this is our context to control the execution of the jobs
701-
702-
// cancellation is done by the parent context
703-
// or by the first goroutine that returns an error
715+
// cancelable context to stop jobs early
704716
ctx, cancel := context.WithCancel(ctx)
705-
// clean up on return
706717
defer cancel()
707718

708719
collector := make(chan result, len(jobs))
720+
721+
// spawn worker goroutines
709722
for n, job := range jobs {
710-
// produce
711723
go func(ctx context.Context, jobId int, j QueryJob) {
712724
defer recovery.LogAndHandlePanic(ctx, func(err error) {
713725
collector <- result{err: err, jobId: jobId}
714726
})
727+
715728
start := time.Now()
716-
rows, perf, err := j(ctx)
729+
plan, rows, perf, err := j(ctx)
717730
logger.DebugWithCtx(ctx).Msgf("parallel job %d finished in %v", jobId, time.Since(start))
718-
collector <- result{rows: rows, perf: perf, err: err, jobId: jobId}
731+
collector <- result{plan: plan, rows: rows, perf: perf, err: err, jobId: jobId}
719732
}(ctx, n, job)
720733
}
721734

722-
// consume
723-
for range len(jobs) {
735+
expected := len(jobs)
736+
received := 0
737+
738+
for received < expected {
724739
res := <-collector
740+
received++
741+
725742
performances[res.jobId] = res.perf
726-
if res.err == nil {
727-
results[res.jobId] = res.rows
728-
} else {
743+
744+
if res.err != nil {
745+
logger.WarnWithCtx(ctx).Msgf("Job %d failed: %v", res.jobId, res.err)
746+
cancel() // cancel remaining jobs
747+
748+
// Drain the rest to avoid goroutine leaks
749+
for received < len(jobs) {
750+
<-collector
751+
received++
752+
}
729753
return nil, performances, res.err
730754
}
755+
756+
results[res.jobId] = res.rows
757+
758+
if res.plan != nil && res.plan.Interrupt(res.jobId, res.rows) {
759+
logger.InfoWithCtx(ctx).Msgf("Job %d triggered interrupt", res.jobId)
760+
expected--
761+
}
762+
if expected == received {
763+
cancel()
764+
}
731765
}
732766

733767
return results, performances, nil
734768
}
735-
736769
func (q *QueryRunner) runQueryJobs(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
737770

738771
numberOfJobs := len(jobs)
@@ -762,18 +795,18 @@ func (q *QueryRunner) runQueryJobs(ctx context.Context, jobs []QueryJob) ([][]mo
762795

763796
}
764797

765-
func (q *QueryRunner) makeJob(table *clickhouse.Table, query *model.Query) QueryJob {
766-
return func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error) {
798+
func (q *QueryRunner) makeJob(plan *model.ExecutionPlan, table *clickhouse.Table, query *model.Query) QueryJob {
799+
return func(ctx context.Context) (*model.ExecutionPlan, []model.QueryResultRow, clickhouse.PerformanceResult, error) {
767800
var err error
768801
rows, performance, err := q.logManager.ProcessQuery(ctx, table, query)
769802

770803
if err != nil {
771804
logger.ErrorWithCtx(ctx).Msg(err.Error())
772805
performance.Error = err
773-
return nil, performance, err
806+
return plan, nil, performance, err
774807
}
775808

776-
return rows, performance, nil
809+
return plan, rows, performance, nil
777810
}
778811
}
779812

@@ -790,6 +823,7 @@ func (q *QueryRunner) searchWorkerCommon(
790823
var jobs []QueryJob
791824
var jobHitsPosition []int // it keeps the position of the hits array for each job
792825

826+
logger.InfoWithCtx(ctx).Msgf("search worker with query %d %v", len(queries), queries)
793827
for i, query := range queries {
794828
sql := query.SelectCommand.String()
795829

@@ -809,7 +843,7 @@ func (q *QueryRunner) searchWorkerCommon(
809843
continue
810844
}
811845

812-
job := q.makeJob(table, query)
846+
job := q.makeJob(plan, table, query)
813847
jobs = append(jobs, job)
814848
jobHitsPosition = append(jobHitsPosition, i)
815849
}
@@ -914,14 +948,15 @@ func (q *QueryRunner) postProcessResults(plan *model.ExecutionPlan, results [][]
914948

915949
}
916950

951+
pipeline = append(pipeline, pipelineElement{"siblingsTransformer", &SiblingsTransformer{}})
917952
var err error
918953
for _, t := range pipeline {
919954

920955
// TODO we should check if the transformer is applicable here
921956
// for example if the schema doesn't hava array fields, we should skip the arrayResultTransformer
922957
// these transformers can be cpu and mem consuming
923958

924-
results, err = t.transformer.Transform(results)
959+
plan, results, err = t.transformer.Transform(plan, results)
925960
if err != nil {
926961
return nil, fmt.Errorf("resuls transformer %s has failed: %w", t.name, err)
927962
}

platform/frontend_connectors/search_ab_testing.go

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -134,13 +134,11 @@ func (q *QueryRunner) executeABTesting(ctx context.Context, plan *model.Executio
134134

135135
case *quesma_api.ConnectorDecisionElastic:
136136
planExecutor = func(ctx context.Context) ([]byte, error) {
137-
elasticPlan := &model.ExecutionPlan{
138-
IndexPattern: plan.IndexPattern,
139-
QueryRowsTransformers: []model.QueryRowsTransformer{},
140-
Queries: []*model.Query{},
141-
StartTime: plan.StartTime,
142-
Name: config.ElasticsearchTarget,
143-
}
137+
elasticPlan := model.NewExecutionPlan(
138+
[]*model.Query{}, []model.QueryRowsTransformer{})
139+
elasticPlan.Name = config.ElasticsearchTarget
140+
elasticPlan.IndexPattern = plan.IndexPattern
141+
elasticPlan.StartTime = plan.StartTime
144142
return q.executePlanElastic(ctx, elasticPlan, body, optAsync, optComparePlansCh, isMainPlan)
145143
}
146144

0 commit comments

Comments
 (0)