Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/QuesmaOrg/quesma/quesma-cli
go 1.24.0

require (
github.com/QuesmaOrg/quesma/platform v0.0.0-20250303135300-04f0a4897c30
github.com/QuesmaOrg/quesma/platform v0.0.0-20250519105918-0f6942f1a3dd
github.com/goccy/go-json v0.10.5
github.com/stretchr/testify v1.10.0
)
Expand Down
4 changes: 2 additions & 2 deletions cmd/v2_test_objects.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,11 +422,11 @@ func (p *QueryTransformationPipeline) ComposeResult(results [][]model.QueryResul
type QueryTransformer1 struct {
}

func (p *QueryTransformer1) Transform(queries []*model.Query) ([]*model.Query, error) {
func (p *QueryTransformer1) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {
logger.Debug().Msg("SimpleQueryTransformationPipeline: Transform")
// Do basic transformation

return queries, nil
return plan, nil
}

func NewQueryTransformer1() *QueryTransformer1 {
Expand Down
8 changes: 4 additions & 4 deletions platform/frontend_connectors/schema_transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -974,7 +974,7 @@ func (s *SchemaCheckPass) applyAliasColumns(indexSchema schema.Schema, query *mo
return query, nil
}

func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, error) {
func (s *SchemaCheckPass) Transform(plan *model.ExecutionPlan) (*model.ExecutionPlan, error) {

transformationChain := []struct {
TransformationName string
Expand Down Expand Up @@ -1014,7 +1014,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
{TransformationName: "BooleanLiteralTransformation", Transformation: s.applyBooleanLiteralLowering},
}

for k, query := range queries {
for k, query := range plan.Queries {
var err error

if !s.cfg.Logging.EnableSQLTracing {
Expand Down Expand Up @@ -1043,9 +1043,9 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
}
}

queries[k] = query
plan.Queries[k] = query
}
return queries, nil
return plan, nil
}

func (s *SchemaCheckPass) applyMatchOperator(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
Expand Down
40 changes: 27 additions & 13 deletions platform/frontend_connectors/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,10 +499,11 @@ func Test_ipRangeTransform(t *testing.T) {
q.Schema = currentSchema
q.Indexes = []string{q.TableName}
}
plan := model.NewExecutionPlan(queries[k], nil)

resultQueries, err := transform.Transform(queries[k])
resultPlan, err := transform.Transform(plan)
assert.NoError(t, err)
assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultQueries[0].SelectCommand.String())
assert.Equal(t, expectedQueries[k].SelectCommand.String(), resultPlan.Queries[0].SelectCommand.String())
})
}
}
Expand Down Expand Up @@ -739,17 +740,21 @@ func Test_arrayType(t *testing.T) {
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
tt.query.Schema = indexSchema
tt.query.Indexes = []string{tt.query.TableName}
actual, err := transform.Transform([]*model.Query{tt.query})
plan := model.NewExecutionPlan(
[]*model.Query{tt.query},
nil,
)
actual, err := transform.Transform(plan)
assert.NoError(t, err)

if err != nil {
t.Fatal(err)
}

assert.True(t, len(actual) == 1, "len queries == 1")
assert.True(t, len(actual.Queries) == 1, "len queries == 1")

expectedJson := asString(tt.expected)
actualJson := asString(actual[0])
actualJson := asString(actual.Queries[0])

assert.Equal(t, expectedJson, actualJson)
})
Expand Down Expand Up @@ -1802,23 +1807,27 @@ func Test_mapKeys(t *testing.T) {
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
tt.query.Schema = indexSchema
tt.query.Indexes = []string{tt.query.TableName}
var actual []*model.Query
var actual *model.ExecutionPlan
var err error
plan := model.NewExecutionPlan(
[]*model.Query{tt.query},
nil,
)
if indexConfig[tt.query.TableName].EnableFieldMapSyntax {
actual, err = transformPass.Transform([]*model.Query{tt.query})
actual, err = transformPass.Transform(plan)
} else {
actual, err = noTransformPass.Transform([]*model.Query{tt.query})
actual, err = noTransformPass.Transform(plan)
}
assert.NoError(t, err)

if err != nil {
t.Fatal(err)
}

assert.True(t, len(actual) == 1, "len queries == 1")
assert.True(t, len(actual.Queries) == 1, "len queries == 1")

expectedJson := asString(tt.expected)
actualJson := asString(actual[0])
actualJson := asString(actual.Queries[0])

assert.Equal(t, expectedJson, actualJson)
})
Expand Down Expand Up @@ -1895,17 +1904,22 @@ func Test_cluster(t *testing.T) {
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
tt.query.Schema = indexSchema
tt.query.Indexes = []string{tt.query.TableName}
actual, err := transform.Transform([]*model.Query{tt.query})

plan := model.NewExecutionPlan(
[]*model.Query{tt.query},
nil,
)
actual, err := transform.Transform(plan)
assert.NoError(t, err)

if err != nil {
t.Fatal(err)
}

assert.True(t, len(actual) == 1, "len queries == 1")
assert.True(t, len(actual.Queries) == 1, "len queries == 1")

expectedJson := asString(tt.expected)
actualJson := asString(actual[0])
actualJson := asString(actual.Queries[0])

assert.Equal(t, expectedJson, actualJson)
})
Expand Down
91 changes: 63 additions & 28 deletions platform/frontend_connectors/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ type AsyncQuery struct {

func (q *QueryRunner) transformQueries(plan *model.ExecutionPlan) error {
var err error
plan.Queries, err = q.transformationPipeline.Transform(plan.Queries)
_, err = q.transformationPipeline.Transform(plan)
if err != nil {
return fmt.Errorf("error transforming queries: %v", err)
}
Expand Down Expand Up @@ -516,7 +516,13 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
logger.ErrorWithCtxAndReason(ctx, "Quesma generated invalid SQL query").Msg(queriesBodyConcat)
goto logErrorAndReturn
}
if len(plan.Queries) > 0 {
for i, query := range plan.Queries {
logger.InfoWithCtx(ctx).Msgf("Input SQL query %d: %s", i, query.SelectCommand.String())
}
}
err = q.transformQueries(plan)

if err != nil {
goto logErrorAndReturn
}
Expand Down Expand Up @@ -669,70 +675,97 @@ func (q *QueryRunner) isInternalKibanaQuery(query *model.Query) bool {
return false
}

type QueryJob func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error)
type QueryJob func(ctx context.Context) (*model.ExecutionPlan, []model.QueryResultRow, clickhouse.PerformanceResult, error)

func (q *QueryRunner) runQueryJobsSequence(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
var results = make([][]model.QueryResultRow, 0)
var performance = make([]clickhouse.PerformanceResult, 0)
for _, job := range jobs {
rows, perf, err := job(ctx)
for n, job := range jobs {
plan, rows, perf, err := job(ctx)
performance = append(performance, perf)
if err != nil {
return nil, performance, err
}

results = append(results, rows)
if plan.Interrupt(n, rows) {
break
}
}
return results, performance, nil
}

func (q *QueryRunner) runQueryJobsParallel(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {

var results = make([][]model.QueryResultRow, len(jobs))
var performances = make([]clickhouse.PerformanceResult, len(jobs))
func (q *QueryRunner) runQueryJobsParallel(ctx context.Context, jobs []QueryJob) (
[][]model.QueryResultRow,
[]clickhouse.PerformanceResult,
error,
) {
var (
results = make([][]model.QueryResultRow, len(jobs))
performances = make([]clickhouse.PerformanceResult, len(jobs))
)
type result struct {
plan *model.ExecutionPlan
rows []model.QueryResultRow
perf clickhouse.PerformanceResult
err error
jobId int
}

// this is our context to control the execution of the jobs

// cancellation is done by the parent context
// or by the first goroutine that returns an error
// cancelable context to stop jobs early
ctx, cancel := context.WithCancel(ctx)
// clean up on return
defer cancel()

collector := make(chan result, len(jobs))

// spawn worker goroutines
for n, job := range jobs {
// produce
go func(ctx context.Context, jobId int, j QueryJob) {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
collector <- result{err: err, jobId: jobId}
})

start := time.Now()
rows, perf, err := j(ctx)
plan, rows, perf, err := j(ctx)
logger.DebugWithCtx(ctx).Msgf("parallel job %d finished in %v", jobId, time.Since(start))
collector <- result{rows: rows, perf: perf, err: err, jobId: jobId}
collector <- result{plan: plan, rows: rows, perf: perf, err: err, jobId: jobId}
}(ctx, n, job)
}

// consume
for range len(jobs) {
expected := len(jobs)
received := 0

for received < expected {
res := <-collector
received++

performances[res.jobId] = res.perf
if res.err == nil {
results[res.jobId] = res.rows
} else {

if res.err != nil {
logger.WarnWithCtx(ctx).Msgf("Job %d failed: %v", res.jobId, res.err)
cancel() // cancel remaining jobs

// Drain the rest to avoid goroutine leaks
for received < len(jobs) {
<-collector
received++
}
return nil, performances, res.err
}

results[res.jobId] = res.rows

if res.plan != nil && res.plan.Interrupt(res.jobId, res.rows) {
logger.InfoWithCtx(ctx).Msgf("Job %d triggered interrupt", res.jobId)
expected--
}
if expected == received {
cancel()
}
}

return results, performances, nil
}

func (q *QueryRunner) runQueryJobs(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {

numberOfJobs := len(jobs)
Expand Down Expand Up @@ -762,18 +795,18 @@ func (q *QueryRunner) runQueryJobs(ctx context.Context, jobs []QueryJob) ([][]mo

}

func (q *QueryRunner) makeJob(table *clickhouse.Table, query *model.Query) QueryJob {
return func(ctx context.Context) ([]model.QueryResultRow, clickhouse.PerformanceResult, error) {
func (q *QueryRunner) makeJob(plan *model.ExecutionPlan, table *clickhouse.Table, query *model.Query) QueryJob {
return func(ctx context.Context) (*model.ExecutionPlan, []model.QueryResultRow, clickhouse.PerformanceResult, error) {
var err error
rows, performance, err := q.logManager.ProcessQuery(ctx, table, query)

if err != nil {
logger.ErrorWithCtx(ctx).Msg(err.Error())
performance.Error = err
return nil, performance, err
return plan, nil, performance, err
}

return rows, performance, nil
return plan, rows, performance, nil
}
}

Expand All @@ -790,6 +823,7 @@ func (q *QueryRunner) searchWorkerCommon(
var jobs []QueryJob
var jobHitsPosition []int // it keeps the position of the hits array for each job

logger.InfoWithCtx(ctx).Msgf("search worker with query %d %v", len(queries), queries)
for i, query := range queries {
sql := query.SelectCommand.String()

Expand All @@ -809,7 +843,7 @@ func (q *QueryRunner) searchWorkerCommon(
continue
}

job := q.makeJob(table, query)
job := q.makeJob(plan, table, query)
jobs = append(jobs, job)
jobHitsPosition = append(jobHitsPosition, i)
}
Expand Down Expand Up @@ -914,14 +948,15 @@ func (q *QueryRunner) postProcessResults(plan *model.ExecutionPlan, results [][]

}

pipeline = append(pipeline, pipelineElement{"siblingsTransformer", &SiblingsTransformer{}})
var err error
for _, t := range pipeline {

// TODO we should check if the transformer is applicable here
// for example if the schema doesn't hava array fields, we should skip the arrayResultTransformer
// these transformers can be cpu and mem consuming

results, err = t.transformer.Transform(results)
plan, results, err = t.transformer.Transform(plan, results)
if err != nil {
return nil, fmt.Errorf("resuls transformer %s has failed: %w", t.name, err)
}
Expand Down
12 changes: 5 additions & 7 deletions platform/frontend_connectors/search_ab_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,11 @@ func (q *QueryRunner) executeABTesting(ctx context.Context, plan *model.Executio

case *quesma_api.ConnectorDecisionElastic:
planExecutor = func(ctx context.Context) ([]byte, error) {
elasticPlan := &model.ExecutionPlan{
IndexPattern: plan.IndexPattern,
QueryRowsTransformers: []model.QueryRowsTransformer{},
Queries: []*model.Query{},
StartTime: plan.StartTime,
Name: config.ElasticsearchTarget,
}
elasticPlan := model.NewExecutionPlan(
[]*model.Query{}, []model.QueryRowsTransformer{})
elasticPlan.Name = config.ElasticsearchTarget
elasticPlan.IndexPattern = plan.IndexPattern
elasticPlan.StartTime = plan.StartTime
return q.executePlanElastic(ctx, elasticPlan, body, optAsync, optComparePlansCh, isMainPlan)
}

Expand Down
Loading
Loading