Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 93f6b7f

Browse files
authored
search_after param (used to paginate hits e.g. in Discover), 1/2 strategies (#1104)
#1099 1/2 PR After this PR instead of discarding `search_after` param, we simply add `tuple(sort_field_1, sort_field_2, ...)<(search_after_1, search_after_2, ...) ` to the `WHERE` clause, so we'll (almost) properly return paginated hits (never wrong ones, but we can skip some valid ones) If `sort_field_i` is `asc`, we swap `sort_field_i` with `search_after_i` in the expression above, I think it gives exactly what we want. Timestamp fields use `fromUnixTimestampMilli64` conversion - it'll work because the `search_after`param is what we returned in the previous request ourselves, for previous hits. And for dates/timestamps we always return `unix_timestamp in ms`. This strategy is not fully correct, because we'll skip all hits with exactly the same sort values (`timestamps in ms`, usually), but it's some progress anyway. Before this PR, queries with this param basically didn't work at all, now they have a small bug. TODO (I'll finish in next PR): * Finish Bulletproof strategy, which fixes the limitation above. Should be quick as I already got a lot of tests here, and there's not really a lot of code in the logic itself. * Fix conversion function, we always use `from...Milli64`. It'll always work for Clickhouse, but for Hydrolix and `DateTime` field (without `64`), it won't. It's a known issue, quick to fix. I'd do it in another PR, as before this one, `search_after` didn't work anyway, so there's no regression here. Screens of this working after all the commits: <img width="1439" alt="Screenshot 2024-12-27 at 00 58 42" src="https://github.com/user-attachments/assets/c597a4a3-5f80-41c2-85dc-b79516a73948" /> <img width="1429" alt="Screenshot 2024-12-27 at 00 58 57" src="https://github.com/user-attachments/assets/44fe9c0c-9ec2-4964-b136-91f934849d28" />
1 parent 89892c2 commit 93f6b7f

18 files changed

+976
-68
lines changed

quesma/eql/query_translator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func (cw *ClickhouseEQLQueryTranslator) ParseQuery(body types.JSON) (*model.Exec
8787

8888
if simpleQuery.CanParse {
8989

90-
query = query_util.BuildHitsQuery(cw.Ctx, cw.Table.Name, []string{"*"}, &simpleQuery, queryInfo.Size)
90+
query = query_util.BuildHitsQuery(cw.Ctx, cw.Table.Name, []string{"*"}, &simpleQuery, queryInfo.Size, queryInfo.SearchAfter)
9191
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, query.SelectCommand.OrderByFieldNames(), true, false, false, []string{cw.Table.Name})
9292
query.Type = &queryType
9393
query.Highlighter = highlighter
@@ -105,7 +105,7 @@ func (cw *ClickhouseEQLQueryTranslator) parseQuery(queryAsMap types.JSON) (query
105105
// no highlighting here
106106
highlighter = queryparser.NewEmptyHighlighter()
107107

108-
searchQueryInfo.Typ = model.ListAllFields
108+
searchQueryInfo.Type = model.ListAllFields
109109

110110
var eqlQuery string
111111

quesma/model/expr_string_renderer.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -76,18 +76,18 @@ func (v *renderer) VisitLiteral(l LiteralExpr) interface{} {
7676
}
7777

7878
func (v *renderer) VisitTuple(t TupleExpr) interface{} {
79-
switch len(t.Exprs) {
79+
exprs := make([]string, 0, len(t.Exprs))
80+
for _, expr := range t.Exprs {
81+
exprs = append(exprs, expr.Accept(v).(string))
82+
}
83+
switch len(exprs) {
8084
case 0:
81-
logger.WarnWithThrottling("VisitTuple", "TupleExpr with no expressions")
82-
return "()"
85+
logger.WarnWithThrottling("visitTuple", "tupleExpr with no expressions") // hacky way to log this
86+
return "tuple()"
8387
case 1:
84-
return t.Exprs[0].Accept(v)
88+
return exprs[0]
8589
default:
86-
args := make([]string, len(t.Exprs))
87-
for i, arg := range t.Exprs {
88-
args[i] = arg.Accept(v).(string)
89-
}
90-
return fmt.Sprintf("tuple(%s)", strings.Join(args, ", ")) // can omit "tuple", but I think SQL's more readable with it
90+
return fmt.Sprintf("tuple(%s)", strings.Join(exprs, ", ")) // can omit "tuple", but I think SQL's more readable with it
9191
}
9292
}
9393

quesma/model/query.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ type (
6464
Schema schema.Schema
6565

6666
Highlighter Highlighter
67+
SearchAfter any // Value of query's "search_after" param. Used for pagination of hits. SearchAfterEmpty means no pagination
6768

6869
RuntimeMappings map[string]RuntimeMapping
6970

@@ -84,6 +85,8 @@ type (
8485
}
8586
)
8687

88+
var SearchAfterEmpty any = nil
89+
8790
// RuntimeMapping is a mapping of a field to a runtime expression
8891
type RuntimeMapping struct {
8992
Field string
@@ -155,12 +158,13 @@ func (queryType HitsInfo) String() string {
155158
}
156159

157160
type HitsCountInfo struct {
158-
Typ HitsInfo
161+
Type HitsInfo
159162
RequestedFields []string
160163
Size int // how many hits to return
161164
TrackTotalHits int // >= 0: we want this nr of total hits, TrackTotalHitsTrue: it was "true", TrackTotalHitsFalse: it was "false", in the request
165+
SearchAfter any // Value of query's "search_after" param. Used for pagination of hits. SearchAfterEmpty means no pagination
162166
}
163167

164168
func NewEmptyHitsCountInfo() HitsCountInfo {
165-
return HitsCountInfo{Typ: Normal}
169+
return HitsCountInfo{Type: Normal}
166170
}

quesma/queryparser/query_parser.go

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -109,12 +109,12 @@ func (cw *ClickhouseQueryTranslator) ParseQuery(body types.JSON) (*model.Executi
109109
func (cw *ClickhouseQueryTranslator) buildListQueryIfNeeded(
110110
simpleQuery *model.SimpleQuery, queryInfo model.HitsCountInfo, highlighter model.Highlighter) *model.Query {
111111
var fullQuery *model.Query
112-
switch queryInfo.Typ {
112+
switch queryInfo.Type {
113113
case model.ListByField:
114114
// queryInfo = (ListByField, fieldName, 0, LIMIT)
115-
fullQuery = cw.BuildNRowsQuery(queryInfo.RequestedFields, simpleQuery, queryInfo.Size)
115+
fullQuery = cw.BuildNRowsQuery(queryInfo.RequestedFields, simpleQuery, queryInfo)
116116
case model.ListAllFields:
117-
fullQuery = cw.BuildNRowsQuery([]string{"*"}, simpleQuery, queryInfo.Size)
117+
fullQuery = cw.BuildNRowsQuery([]string{"*"}, simpleQuery, queryInfo)
118118
default:
119119
}
120120
if fullQuery != nil {
@@ -179,6 +179,7 @@ func (cw *ClickhouseQueryTranslator) parseQueryInternal(body types.JSON) (*model
179179
queryInfo := cw.tryProcessSearchMetadata(queryAsMap)
180180
queryInfo.Size = size
181181
queryInfo.TrackTotalHits = trackTotalHits
182+
queryInfo.SearchAfter = queryAsMap["search_after"]
182183

183184
return &parsedQuery, queryInfo, highlighter, nil
184185
}
@@ -224,7 +225,7 @@ func (cw *ClickhouseQueryTranslator) ParseHighlighter(queryMap QueryMap) model.H
224225
func (cw *ClickhouseQueryTranslator) parseMetadata(queryMap QueryMap) QueryMap {
225226
queryMetadata := make(QueryMap, 5)
226227
for k, v := range queryMap {
227-
if k == "query" || k == "bool" || k == "query_string" || k == "index_filter" { // probably change that, made so tests work, but let's see after more real use cases {
228+
if k == "query" || k == "bool" || k == "query_string" || k == "index_filter" || k == "search_after" { // probably change that, made so tests work, but let's see after more real use cases {
228229
continue
229230
}
230231
queryMetadata[k] = v
@@ -976,7 +977,7 @@ func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.H
976977

977978
fields, ok := queryMap["fields"].([]any)
978979
if !ok {
979-
return model.HitsCountInfo{Typ: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
980+
return model.HitsCountInfo{Type: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
980981
}
981982
if len(fields) > 1 {
982983
fieldNames := make([]string, 0)
@@ -999,13 +1000,13 @@ func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.H
9991000
}
10001001
logger.Debug().Msgf("requested more than one field %s, falling back to '*'", fieldNames)
10011002
// so far everywhere I've seen, > 1 field ==> "*" is one of them
1002-
return model.HitsCountInfo{Typ: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
1003+
return model.HitsCountInfo{Type: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
10031004
} else if len(fields) == 0 {
10041005
// isCount, ok := queryMap["track_total_hits"].(bool)
10051006
// TODO make count separate!
10061007
/*
10071008
if ok && isCount {
1008-
return model.HitsCountInfo{Typ: model.CountAsync, RequestedFields: make([]string, 0), FieldName: "", I1: 0, I2: 0}, true
1009+
return model.HitsCountInfo{Type: model.CountAsync, RequestedFields: make([]string, 0), FieldName: "", I1: 0, I2: 0}, true
10091010
}
10101011
*/
10111012
return model.NewEmptyHitsCountInfo(), false
@@ -1031,9 +1032,9 @@ func (cw *ClickhouseQueryTranslator) isItListRequest(queryMap QueryMap) (model.H
10311032

10321033
resolvedField := ResolveField(cw.Ctx, fieldName, cw.Schema)
10331034
if resolvedField == "*" {
1034-
return model.HitsCountInfo{Typ: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
1035+
return model.HitsCountInfo{Type: model.ListAllFields, RequestedFields: []string{"*"}, Size: size}, true
10351036
}
1036-
return model.HitsCountInfo{Typ: model.ListByField, RequestedFields: []string{resolvedField}, Size: size}, true
1037+
return model.HitsCountInfo{Type: model.ListByField, RequestedFields: []string{resolvedField}, Size: size}, true
10371038
}
10381039
}
10391040

quesma/queryparser/query_translator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,6 @@ func (cw *ClickhouseQueryTranslator) BuildCountQuery(whereClause model.Expr, sam
307307
}
308308
}
309309

310-
func (cw *ClickhouseQueryTranslator) BuildNRowsQuery(fieldNames []string, query *model.SimpleQuery, limit int) *model.Query {
311-
return query_util.BuildHitsQuery(cw.Ctx, model.SingleTableNamePlaceHolder, fieldNames, query, limit)
310+
func (cw *ClickhouseQueryTranslator) BuildNRowsQuery(fieldNames []string, query *model.SimpleQuery, info model.HitsCountInfo) *model.Query {
311+
return query_util.BuildHitsQuery(cw.Ctx, model.SingleTableNamePlaceHolder, fieldNames, query, info.Size, info.SearchAfter)
312312
}

quesma/queryparser/query_translator_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func TestMakeResponseSearchQuery(t *testing.T) {
182182
t.Run(tt.queryType.String(), func(t *testing.T) {
183183
hitQuery := query_util.BuildHitsQuery(
184184
context.Background(), "test", []string{"*"},
185-
&model.SimpleQuery{}, model.WeNeedUnlimitedCount,
185+
&model.SimpleQuery{}, model.WeNeedUnlimitedCount, model.SearchAfterEmpty,
186186
)
187187
highlighter := NewEmptyHighlighter()
188188
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, hitQuery.SelectCommand.OrderByFieldNames(), true, false, false, []string{cw.Table.Name})
@@ -275,7 +275,7 @@ func TestMakeResponseAsyncSearchQuery(t *testing.T) {
275275
{Cols: []model.QueryResultCol{model.NewQueryResultCol("message", "User updated")}},
276276
{Cols: []model.QueryResultCol{model.NewQueryResultCol("message", "User created")}},
277277
},
278-
query_util.BuildHitsQuery(context.Background(), "test", []string{"message"}, &model.SimpleQuery{}, model.WeNeedUnlimitedCount),
278+
query_util.BuildHitsQuery(context.Background(), "test", []string{"message"}, &model.SimpleQuery{}, model.WeNeedUnlimitedCount, model.SearchAfterEmpty),
279279
},
280280
{
281281
`
@@ -413,7 +413,7 @@ func TestMakeResponseAsyncSearchQuery(t *testing.T) {
413413
},
414414
},
415415
},
416-
query_util.BuildHitsQuery(context.Background(), "test", []string{"*"}, &model.SimpleQuery{}, model.WeNeedUnlimitedCount)},
416+
query_util.BuildHitsQuery(context.Background(), "test", []string{"*"}, &model.SimpleQuery{}, model.WeNeedUnlimitedCount, model.SearchAfterEmpty)},
417417
}
418418
for i, tt := range args {
419419
t.Run(strconv.Itoa(i), func(t *testing.T) {
@@ -440,8 +440,8 @@ func TestMakeResponseSearchQueryIsProperJson(t *testing.T) {
440440
cw := ClickhouseQueryTranslator{Table: clickhouse.NewEmptyTable("@"), Ctx: context.Background()}
441441
const limit = 1000
442442
queries := []*model.Query{
443-
cw.BuildNRowsQuery([]string{"*"}, &model.SimpleQuery{}, limit),
444-
cw.BuildNRowsQuery([]string{"@"}, &model.SimpleQuery{}, 0),
443+
cw.BuildNRowsQuery([]string{"*"}, &model.SimpleQuery{}, model.HitsCountInfo{Size: limit}),
444+
cw.BuildNRowsQuery([]string{"@"}, &model.SimpleQuery{}, model.HitsCountInfo{Size: 0}),
445445
}
446446
for _, query := range queries {
447447
resultRow := model.QueryResultRow{Cols: make([]model.QueryResultCol, 0)}

quesma/queryparser/query_util/query_util.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ func FilterAggregationQueries(queries []*model.Query) []*model.Query {
4242
}
4343
*/
4444

45-
func BuildHitsQuery(ctx context.Context, tableName string, fieldNames []string, query *model.SimpleQuery, limit int) *model.Query {
45+
func BuildHitsQuery(ctx context.Context, tableName string, fieldNames []string, query *model.SimpleQuery, limit int, searchAfter any) *model.Query {
4646
var columns []model.Expr
4747
for _, fieldName := range fieldNames {
4848
if fieldName == "*" {
@@ -55,6 +55,7 @@ func BuildHitsQuery(ctx context.Context, tableName string, fieldNames []string,
5555
return &model.Query{
5656
SelectCommand: *model.NewSelectCommand(columns, nil, query.OrderBy, model.NewTableRef(tableName),
5757
query.WhereClause, []model.Expr{}, applySizeLimit(ctx, limit), 0, false, []*model.CTE{}),
58+
SearchAfter: searchAfter,
5859
}
5960
}
6061

Lines changed: 173 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,173 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package quesma
4+
5+
import (
6+
"fmt"
7+
"quesma/logger"
8+
"quesma/model"
9+
"quesma/schema"
10+
"quesma/util"
11+
)
12+
13+
type (
14+
searchAfterStrategy interface {
15+
// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
16+
validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParameterParsed []model.Expr, err error)
17+
transform(query *model.Query, searchAfterParameterParsed []model.Expr) (*model.Query, error)
18+
}
19+
searchAfterStrategyType int
20+
)
21+
22+
func searchAfterStrategyFactory(strategy searchAfterStrategyType) searchAfterStrategy {
23+
switch strategy {
24+
case bulletproof:
25+
return searchAfterStrategyBulletproof{}
26+
case justDiscardTheParameter:
27+
return searchAfterStrategyJustDiscardTheParameter{}
28+
case basicAndFast:
29+
return searchAfterStrategyBasicAndFast{}
30+
default:
31+
logger.Error().Msgf("Unknown search_after strategy: %d. Using default (basicAndFast).", strategy)
32+
return searchAfterStrategyBasicAndFast{}
33+
}
34+
}
35+
36+
const (
37+
basicAndFast searchAfterStrategyType = iota // default until bulletproof is implemented
38+
bulletproof
39+
justDiscardTheParameter
40+
defaultSearchAfterStrategy = basicAndFast
41+
)
42+
43+
func (s searchAfterStrategyType) String() string {
44+
return []string{"BasicAndFast", "Bulletproof", "JustDiscardTheParameter"}[s]
45+
}
46+
47+
// ---------------------------------------------------------------------------------
48+
// | Bulletproof, but might be a bit slower for gigantic datasets |
49+
// ---------------------------------------------------------------------------------
50+
51+
type searchAfterStrategyBulletproof struct{} // TODO, don't look!
52+
53+
// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
54+
func (s searchAfterStrategyBulletproof) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParameterParsed []model.Expr, err error) {
55+
logger.Debug().Msgf("searchAfter: %v", query.SearchAfter)
56+
if query.SearchAfter == nil {
57+
return nil, nil
58+
}
59+
60+
asArray, ok := query.SearchAfter.([]any)
61+
if !ok {
62+
return nil, fmt.Errorf("search_after must be an array, got: %v", query.SearchAfter)
63+
}
64+
65+
if len(asArray) != len(query.SelectCommand.OrderBy) {
66+
return nil, fmt.Errorf("len(search_after) != len(sortFields), search_after: %v, sortFields: %v", asArray, query.SelectCommand.OrderBy)
67+
}
68+
69+
return nil, nil
70+
}
71+
72+
func (s searchAfterStrategyBulletproof) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
73+
return query, nil
74+
}
75+
76+
// -------------------------------------------------------------------------------------------------------------------------------
77+
// | JustDiscardTheParameter: probably only good for tests or when you don't need this functionality and want better performance |
78+
// -------------------------------------------------------------------------------------------------------------------------------
79+
80+
type searchAfterStrategyJustDiscardTheParameter struct{}
81+
82+
// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
83+
func (s searchAfterStrategyJustDiscardTheParameter) validateAndParse(*model.Query, schema.Schema) (searchAfterParameterParsed []model.Expr, err error) {
84+
return nil, nil
85+
}
86+
87+
func (s searchAfterStrategyJustDiscardTheParameter) transform(query *model.Query, _ []model.Expr) (*model.Query, error) {
88+
return query, nil
89+
}
90+
91+
// ----------------------------------------------------------------------------------
92+
// | First, simple strategy: BasicAndFast (default until Bulletproof is implemented |
93+
// ----------------------------------------------------------------------------------
94+
95+
type searchAfterStrategyBasicAndFast struct{}
96+
97+
// validateAndParse validates the 'searchAfter', which is what came from the request's search_after field.
98+
func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, indexSchema schema.Schema) (searchAfterParsed []model.Expr, err error) {
99+
if query.SearchAfter == nil {
100+
return nil, nil
101+
}
102+
103+
asArray, ok := query.SearchAfter.([]any)
104+
if !ok {
105+
return nil, fmt.Errorf("search_after must be an array, got: %v", query.SearchAfter)
106+
}
107+
if len(asArray) != len(query.SelectCommand.OrderBy) {
108+
return nil, fmt.Errorf("len(search_after) != len(sortFields), search_after: %v, sortFields: %v", asArray, query.SelectCommand.OrderBy)
109+
}
110+
111+
sortFieldsNr := len(asArray)
112+
searchAfterParsed = make([]model.Expr, sortFieldsNr)
113+
for i, searchAfterValue := range asArray {
114+
column, ok := query.SelectCommand.OrderBy[i].Expr.(model.ColumnRef)
115+
if !ok {
116+
return nil, fmt.Errorf("for basicAndFast strategy, order by must be a column reference")
117+
}
118+
119+
field, resolved := indexSchema.ResolveField(column.ColumnName)
120+
if !resolved {
121+
return nil, fmt.Errorf("could not resolve field: %v", model.AsString(query.SelectCommand.OrderBy[i].Expr))
122+
}
123+
124+
if field.Type.Name == "date" || field.Type.Name == "timestamp" {
125+
if number, isNumber := util.ExtractNumeric64Maybe(searchAfterValue); isNumber {
126+
if number >= 0 && util.IsFloat64AnInt64(number) {
127+
// this param will always be timestamp in milliseconds, as we create it like this while rendering hits
128+
searchAfterParsed[i] = model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number)))
129+
} else {
130+
return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a unix timestamp in milliseconds")
131+
}
132+
} else {
133+
return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a number")
134+
}
135+
} else {
136+
searchAfterParsed[i] = model.NewLiteral(util.SingleQuoteIfString(searchAfterValue))
137+
}
138+
}
139+
140+
return searchAfterParsed, nil
141+
}
142+
143+
func (s searchAfterStrategyBasicAndFast) transform(query *model.Query, searchAfterParsed []model.Expr) (*model.Query, error) {
144+
// If all order by's would be DESC, we would add to the where clause:
145+
// tuple(sortField1, sortField2, ...) > tuple(searchAfter1, searchAfter2, ...)
146+
// But because some fields might be ASC, we need to swap sortField_i with searchAfter_i
147+
sortFieldsNr := len(searchAfterParsed)
148+
lhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
149+
rhs := model.NewTupleExpr(make([]model.Expr, sortFieldsNr)...)
150+
for i, searchAfterValue := range searchAfterParsed {
151+
lhs.Exprs[i] = searchAfterValue
152+
rhs.Exprs[i] = query.SelectCommand.OrderBy[i].Expr
153+
if query.SelectCommand.OrderBy[i].Direction == model.AscOrder {
154+
lhs.Exprs[i], rhs.Exprs[i] = rhs.Exprs[i], lhs.Exprs[i] // swap
155+
}
156+
}
157+
158+
newWhereClause := model.NewInfixExpr(lhs, ">", rhs)
159+
query.SelectCommand.WhereClause = model.And([]model.Expr{query.SelectCommand.WhereClause, newWhereClause})
160+
return query, nil
161+
}
162+
163+
func (s *SchemaCheckPass) applySearchAfterParameter(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
164+
searchAfterParsed, err := s.searchAfterStrategy.validateAndParse(query, indexSchema)
165+
if err != nil {
166+
return nil, err
167+
}
168+
if searchAfterParsed == nil {
169+
return query, nil
170+
}
171+
172+
return s.searchAfterStrategy.transform(query, searchAfterParsed)
173+
}

0 commit comments

Comments
 (0)