Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit afb5cac

Browse files
committed
Adding siblings queries
1 parent e2c547f commit afb5cac

File tree

7 files changed

+82
-35
lines changed

7 files changed

+82
-35
lines changed

cmd/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ module github.com/QuesmaOrg/quesma/quesma-cli
33
go 1.24.0
44

55
require (
6-
github.com/QuesmaOrg/quesma/platform v0.0.0-20250303135300-04f0a4897c30
6+
github.com/QuesmaOrg/quesma/platform v0.0.0-20250519105918-0f6942f1a3dd
77
github.com/goccy/go-json v0.10.5
88
github.com/stretchr/testify v1.10.0
99
)

platform/frontend_connectors/search.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -694,15 +694,15 @@ type QueryJob func(ctx context.Context) (*model.ExecutionPlan, []model.QueryResu
694694
func (q *QueryRunner) runQueryJobsSequence(ctx context.Context, jobs []QueryJob) ([][]model.QueryResultRow, []clickhouse.PerformanceResult, error) {
695695
var results = make([][]model.QueryResultRow, 0)
696696
var performance = make([]clickhouse.PerformanceResult, 0)
697-
for _, job := range jobs {
697+
for n, job := range jobs {
698698
plan, rows, perf, err := job(ctx)
699699
performance = append(performance, perf)
700700
if err != nil {
701701
return nil, performance, err
702702
}
703703

704704
results = append(results, rows)
705-
if plan.Interrupt(rows) {
705+
if plan.Interrupt(n, rows) {
706706
break
707707
}
708708
}
@@ -753,7 +753,7 @@ func (q *QueryRunner) runQueryJobsParallel(ctx context.Context, jobs []QueryJob)
753753
return nil, performances, res.err
754754
}
755755
logger.InfoWithCtx(ctx).Msg("Collected result")
756-
if res.plan != nil && res.plan.Interrupt(res.rows) {
756+
if res.plan != nil && res.plan.Interrupt(res.jobId, res.rows) {
757757
logger.InfoWithCtx(ctx).Msg("Interrupting")
758758
break
759759
}

platform/go.mod

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ require (
2222
github.com/huandu/go-clone v1.7.2
2323
github.com/ip2location/ip2location-go/v9 v9.7.1
2424
github.com/jackc/pgx/v5 v5.7.2
25-
github.com/jinzhu/copier v0.4.0
2625
github.com/k0kubun/pp v3.0.1+incompatible
2726
github.com/knadh/koanf/parsers/json v0.1.0
2827
github.com/knadh/koanf/parsers/yaml v0.1.0

platform/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,6 @@ github.com/jackc/pgx/v5 v5.7.2 h1:mLoDLV6sonKlvjIEsV56SkWNCnuNv531l94GaIzO+XI=
8484
github.com/jackc/pgx/v5 v5.7.2/go.mod h1:ncY89UGWxg82EykZUwSpUKEfccBGGYq1xjrOpsbsfGQ=
8585
github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo=
8686
github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4=
87-
github.com/jinzhu/copier v0.4.0 h1:w3ciUoD19shMCRargcpm0cm91ytaBhDvuRpz1ODO/U8=
88-
github.com/jinzhu/copier v0.4.0/go.mod h1:DfbEm0FYsaqBcKcFuvmOZb218JkPGtvSHsKg8S8hyyg=
8987
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM=
9088
github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k=
9189
github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40=

platform/model/query.go

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,8 +72,7 @@ type (
7272
// WARNING: it's probably not passed everywhere where it's needed, just in one place.
7373
// But it works for the test + our dashboards, so let's fix it later if necessary.
7474
// NoMetadataField (nil) is a valid option and means no meta field in the response.
75-
Metadata JsonMap
76-
ChildQueries *[]Query
75+
Metadata JsonMap
7776
}
7877
QueryType interface {
7978
// TranslateSqlResponseToJson
@@ -115,10 +114,11 @@ type ExecutionPlan struct {
115114
// Interrupt function to stop the execution of the plan
116115
// if for some reason we need to stop the execution
117116
// e.g., there is some condition like enough results
118-
Interrupt func(rows []QueryResultRow) bool
117+
Interrupt func(queryId int, rows []QueryResultRow) bool
119118

120119
ShouldBeMerged bool // if true, we should merge the results of this plan with the main plan
121120
Merge func(plan *ExecutionPlan, results [][]QueryResultRow) (*ExecutionPlan, [][]QueryResultRow)
121+
Siblings map[int][]int // map of query id to list of sibling query ids
122122
}
123123

124124
func NewQueryExecutionHints() *QueryOptimizeHints {
@@ -177,3 +177,46 @@ type HitsCountInfo struct {
177177
func NewEmptyHitsCountInfo() HitsCountInfo {
178178
return HitsCountInfo{Type: Normal}
179179
}
180+
181+
func (q *Query) Clone() *Query {
182+
// Create a new Query object
183+
clone := &Query{
184+
SelectCommand: q.SelectCommand, // Assuming SelectCommand has its own copy logic if needed
185+
OptimizeHints: nil,
186+
TransformationHistory: q.TransformationHistory, // Assuming TransformationHistory is immutable or shallow copy is sufficient
187+
Type: q.Type,
188+
TableName: q.TableName,
189+
Indexes: append([]string{}, q.Indexes...), // Deep copy of slice
190+
Schema: q.Schema, // Assuming schema.Schema is immutable or shallow copy is sufficient
191+
Highlighter: q.Highlighter, // Assuming Highlighter is immutable or shallow copy is sufficient
192+
SearchAfter: q.SearchAfter, // Assuming `any` is immutable or shallow copy is sufficient
193+
RuntimeMappings: make(map[string]RuntimeMapping),
194+
Metadata: nil,
195+
}
196+
197+
// Deep copy OptimizeHints if it exists
198+
if q.OptimizeHints != nil {
199+
clone.OptimizeHints = &QueryOptimizeHints{
200+
ClickhouseQuerySettings: make(map[string]any),
201+
OptimizationsPerformed: append([]string{}, q.OptimizeHints.OptimizationsPerformed...),
202+
}
203+
for k, v := range q.OptimizeHints.ClickhouseQuerySettings {
204+
clone.OptimizeHints.ClickhouseQuerySettings[k] = v
205+
}
206+
}
207+
208+
// Deep copy RuntimeMappings
209+
for k, v := range q.RuntimeMappings {
210+
clone.RuntimeMappings[k] = v
211+
}
212+
213+
// Deep copy Metadata if it exists
214+
if q.Metadata != nil {
215+
clone.Metadata = make(JsonMap)
216+
for k, v := range q.Metadata {
217+
clone.Metadata[k] = v
218+
}
219+
}
220+
221+
return clone
222+
}

platform/optimize/split_time_range_ext.go

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@ package optimize
55
import (
66
"github.com/QuesmaOrg/quesma/platform/logger"
77
"github.com/QuesmaOrg/quesma/platform/model"
8-
"github.com/jinzhu/copier"
9-
"log"
108
"sort"
119
"strconv"
1210
"strings"
@@ -227,46 +225,54 @@ func (s splitTimeRangeExt) transformQuery(query *model.Query, properties map[str
227225

228226
func (s splitTimeRangeExt) Transform(plan *model.ExecutionPlan, properties map[string]string) (*model.ExecutionPlan, error) {
229227

230-
var newQueries []*model.Query
231-
232-
for _, query := range plan.Queries {
228+
queriesSubqueriesMapping := make(map[int][]*model.Query)
229+
for i, query := range plan.Queries {
233230
subqueries, err := s.transformQuery(query, properties)
234231
if err != nil {
235232
return nil, err
236233
}
237-
newQueries = append(newQueries, subqueries...)
234+
queriesSubqueriesMapping[i] = subqueries
238235
}
239236

240-
if len(newQueries) > 0 {
241-
plan.Queries[0].SelectCommand = newQueries[0].SelectCommand
237+
var newQueries []*model.Query
242238

243-
for i := 1; i < len(newQueries); i++ {
244-
var queryCopy model.Query
245-
err := copier.Copy(&queryCopy, &plan.Queries[0])
246-
if err != nil {
247-
log.Println("copier.Copy failed:", err)
248-
}
249-
plan.Queries = append(plan.Queries, &queryCopy)
250-
plan.Queries[i].SelectCommand = newQueries[i].SelectCommand
239+
for i := range plan.Queries {
240+
subqueries := queriesSubqueriesMapping[i]
241+
plan.Queries[i].SelectCommand = subqueries[0].SelectCommand
242+
for j := 1; j < len(subqueries); j++ {
243+
newQuery := plan.Queries[0].Clone()
244+
newQuery.SelectCommand = subqueries[j].SelectCommand
245+
newQueries = append(newQueries, newQuery)
246+
plan.Siblings[i] = append(plan.Siblings[i], j)
251247
}
252-
plan.ShouldBeMerged = true
253248
}
254-
for _, subquery := range newQueries {
255-
sql := subquery.SelectCommand.String()
256-
logger.Info().Msgf("@@@@@@Transformed query: %s", sql)
249+
250+
for i, subqueryPerQuery := range queriesSubqueriesMapping {
251+
querySQL := plan.Queries[i].SelectCommand
252+
logger.Info().Msgf("@@@@@@Original query: %s", querySQL.String())
253+
for j := 0; j < len(subqueryPerQuery); j++ {
254+
logger.Info().Msgf("@@@@@@Transformed query: %s", subqueryPerQuery[j].SelectCommand.String())
255+
}
257256
}
258257

259-
plan.Interrupt = func(rows []model.QueryResultRow) bool {
260-
return len(rows) >= 500
258+
plan.Interrupt = func(queryId int, rows []model.QueryResultRow) bool {
259+
if _, ok := plan.Siblings[queryId]; ok {
260+
return len(rows) >= 500
261+
}
262+
return false
261263
}
262264
plan.Merge = func(plan *model.ExecutionPlan, results [][]model.QueryResultRow) (*model.ExecutionPlan, [][]model.QueryResultRow) {
263265
var mergedResults [][]model.QueryResultRow
264266
mergedResults = make([][]model.QueryResultRow, 0)
265-
mergedResults = append(mergedResults, results[0])
267+
266268
if len(plan.Queries) > len(mergedResults) {
269+
// merge the results of the siblings queries
270+
mergedResults = append(mergedResults, results[0])
271+
// remove siblings queries from the plan
267272
plan.Queries = plan.Queries[:len(plan.Queries)-1]
273+
return plan, mergedResults
268274
}
269-
return plan, mergedResults
275+
return plan, results
270276
}
271277
return plan, nil
272278

platform/parsers/elastic_query_dsl/query_parser.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@ func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.Executi
101101
plan := &model.ExecutionPlan{
102102
Queries: queries,
103103
QueryRowsTransformers: queryResultTransformers,
104-
Interrupt: func(rows []model.QueryResultRow) bool {
104+
Siblings: make(map[int][]int),
105+
Interrupt: func(queryId int, rows []model.QueryResultRow) bool {
105106
return false
106107
},
107108
}

0 commit comments

Comments
 (0)