Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit c2db79a

Browse files
authored
Fix Kibana's surrounding documents view (related to search_after query) (#1432)
Fixes "surrounding documents" view. It's imperfect (since based on the "timestamp being id" and lack of PK being a unique constraint in the ClickHouse/Hydrolix realm) , yet works 😉 It actually took some non-trivial hacks to land for this to even be feasible, I tried to comment any non-obvious action taken. Most notable: * Append document id (`_id`) as a drop-in replacement for Lucene's `_doc` field * Align all parsing logic accordingly * In fact ignore that tiebreaker later on since it's not available and would need recomputing again on transformation (good thing in that even in Elasticsearch that's non deterministic (ref: [docs](https://www.elastic.co/docs/reference/elasticsearch/rest-apis/paginate-search-results)) * Fix the timestamp casting issues which caused errors for non-millisecond timestamp fields ### Screenshots showing that functionality <img width="300" alt="image" src="https://github.com/user-attachments/assets/90c632d3-8365-456c-a9de-8f3c76893a79" /> <img width="300" alt="image" src="https://github.com/user-attachments/assets/d4cb8633-f736-4ff3-9346-424c9cfeb2c6" />
1 parent 0f6942f commit c2db79a

File tree

8 files changed

+71
-30
lines changed

8 files changed

+71
-30
lines changed

platform/frontend_connectors/schema_search_after_transformer.go

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/QuesmaOrg/quesma/platform/model"
99
"github.com/QuesmaOrg/quesma/platform/schema"
1010
"github.com/QuesmaOrg/quesma/platform/util"
11+
"slices"
1112
)
1213

1314
type (
@@ -104,12 +105,21 @@ func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, in
104105
if !ok {
105106
return nil, fmt.Errorf("search_after must be an array, got: %v", query.SearchAfter)
106107
}
107-
if len(asArray) != len(query.SelectCommand.OrderBy) {
108+
// Kibana uses search_after API for pagination in the following way:
109+
// "sort": [
110+
// { "@timestamp": { "order": "asc" } }, // this is the main sorting criteria -> timestamp field from the Data View setting
111+
// { "_doc": { "order": "asc" } } // this is the tiebreaker field, referencing the Lucene document ID
112+
//
113+
// Quesma ignores the _doc field during query parsing, therefore there's no related `query.SelectCommand.OrderBy` clause.
114+
// So the condition below is to handle this Kibana-specific case, in which we ignore the _doc and just sort by the first field only.
115+
// In any other case, we expect the search_after to have the same number of elements as the order by fields.
116+
if len(asArray) == 2 && slices.Contains(query.SearchAfterFieldNames, "_doc") {
117+
asArray = asArray[:len(asArray)-1]
118+
} else if len(asArray) != len(query.SelectCommand.OrderBy) {
108119
return nil, fmt.Errorf("len(search_after) != len(sortFields), search_after: %v, sortFields: %v", asArray, query.SelectCommand.OrderBy)
109120
}
110121

111-
sortFieldsNr := len(asArray)
112-
searchAfterParsed = make([]model.Expr, sortFieldsNr)
122+
searchAfterParsed = make([]model.Expr, len(asArray))
113123
for i, searchAfterValue := range asArray {
114124
column, ok := query.SelectCommand.OrderBy[i].Expr.(model.ColumnRef)
115125
if !ok {
@@ -124,8 +134,16 @@ func (s searchAfterStrategyBasicAndFast) validateAndParse(query *model.Query, in
124134
if field.Type.Name == "date" || field.Type.Name == "timestamp" {
125135
if number, isNumber := util.ExtractNumeric64Maybe(searchAfterValue); isNumber {
126136
if number >= 0 && util.IsFloat64AnInt64(number) {
127-
// this param will always be timestamp in milliseconds, as we create it like this while rendering hits
128-
searchAfterParsed[i] = model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number)))
137+
milliTimestamp := model.NewFunction("fromUnixTimestamp64Milli", model.NewLiteral(int64(number)))
138+
if field.InternalPropertyType == "DateTime" {
139+
// Here we have to make sure that the statement landing in the where clause will match the field type
140+
searchAfterParsed[i] = model.NewFunction("toDateTime", milliTimestamp)
141+
} else {
142+
// Default case of (field.InternalPropertyType == "DateTime64")
143+
// which most often is DateTime64(3) allowing millisecond precision
144+
searchAfterParsed[i] = milliTimestamp
145+
}
146+
129147
} else {
130148
return nil, fmt.Errorf("for basicAndFast strategy, search_after must be a unix timestamp in milliseconds")
131149
}

platform/frontend_connectors/schema_transformer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1031,7 +1031,7 @@ func (s *SchemaCheckPass) Transform(queries []*model.Query) ([]*model.Query, err
10311031

10321032
query, err = transformation.Transformation(query.Schema, query)
10331033
if err != nil {
1034-
return nil, err
1034+
return nil, fmt.Errorf("error applying transformation %s: %w", transformation.TransformationName, err)
10351035
}
10361036

10371037
if s.cfg.Logging.EnableSQLTracing {

platform/frontend_connectors/search_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1003,12 +1003,14 @@ func TestSearchAfterParameter_sortByJustTimestamp(t *testing.T) {
10031003
expectedSQL string
10041004
resultRowsFromDB [][]any
10051005
basicAndFastSortFieldPerHit []int64
1006+
expectedSortValuesCount int
10061007
}{
10071008
{
10081009
request: `{"size": 3, "track_total_hits": false, "sort": [{"@timestamp": {"order": "desc"}}]}`,
10091010
expectedSQL: `SELECT "@timestamp", "message" FROM __quesma_table_name ORDER BY "@timestamp" DESC LIMIT 3`,
10101011
resultRowsFromDB: [][]any{{someTime, "m1"}, {someTime, "m2"}, {someTime, "m3"}},
10111012
basicAndFastSortFieldPerHit: []int64{someTime.UnixMilli(), someTime.UnixMilli(), someTime.UnixMilli()},
1013+
expectedSortValuesCount: 1,
10121014
},
10131015
{
10141016
request: `
@@ -1024,18 +1026,21 @@ func TestSearchAfterParameter_sortByJustTimestamp(t *testing.T) {
10241026
expectedSQL: `SELECT "@timestamp", "message" FROM __quesma_table_name WHERE fromUnixTimestamp64Milli(1706551896491)>"@timestamp" ORDER BY "@timestamp" DESC LIMIT 3`,
10251027
resultRowsFromDB: [][]any{{sub(1), "m8"}, {sub(2), "m9"}, {sub(3), "m10"}},
10261028
basicAndFastSortFieldPerHit: []int64{sub(1).UnixMilli(), sub(2).UnixMilli(), sub(3).UnixMilli()},
1029+
expectedSortValuesCount: 2,
10271030
},
10281031
{
10291032
request: `{"search_after": [1706551896488], "size": 3, "track_total_hits": false, "sort": [{"@timestamp": {"order": "desc"}}]}`,
10301033
expectedSQL: `SELECT "@timestamp", "message" FROM __quesma_table_name WHERE fromUnixTimestamp64Milli(1706551896488)>"@timestamp" ORDER BY "@timestamp" DESC LIMIT 3`,
10311034
resultRowsFromDB: [][]any{{sub(4), "m11"}, {sub(5), "m12"}, {sub(6), "m13"}},
10321035
basicAndFastSortFieldPerHit: []int64{sub(4).UnixMilli(), sub(5).UnixMilli(), sub(6).UnixMilli()},
1036+
expectedSortValuesCount: 1,
10331037
},
10341038
{
10351039
request: `{"search_after": [1706551896485], "size": 3, "track_total_hits": false, "sort": [{"@timestamp": {"order": "desc"}}]}`,
10361040
expectedSQL: `SELECT "@timestamp", "message" FROM __quesma_table_name WHERE fromUnixTimestamp64Milli(1706551896485)>"@timestamp" ORDER BY "@timestamp" DESC LIMIT 3`,
10371041
resultRowsFromDB: [][]any{{sub(7), "m14"}, {sub(8), "m15"}, {sub(9), "m16"}},
10381042
basicAndFastSortFieldPerHit: []int64{sub(7).UnixMilli(), sub(8).UnixMilli(), sub(9).UnixMilli()},
1043+
expectedSortValuesCount: 1,
10391044
},
10401045
}
10411046

@@ -1078,7 +1083,7 @@ func TestSearchAfterParameter_sortByJustTimestamp(t *testing.T) {
10781083
assert.Len(t, hits, len(iteration.resultRowsFromDB))
10791084
for i, hit := range hits {
10801085
sortField := hit.(model.JsonMap)["sort"].([]any)
1081-
assert.Len(t, sortField, 1)
1086+
assert.Len(t, sortField, iteration.expectedSortValuesCount)
10821087
assert.Equal(t, float64(iteration.basicAndFastSortFieldPerHit[i]), sortField[0].(float64))
10831088
}
10841089
}

platform/model/query.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ type (
6363
// this is schema for current query, this schema should be used in pipeline processing
6464
Schema schema.Schema
6565

66-
Highlighter Highlighter
67-
SearchAfter any // Value of query's "search_after" param. Used for pagination of hits. SearchAfterEmpty means no pagination
66+
Highlighter Highlighter
67+
SearchAfter any // Value of query's "search_after" param. Used for pagination of hits. SearchAfterEmpty means no pagination
68+
SearchAfterFieldNames []string // Names of fields used in search_after. These can be different from the order by fields,
6869

6970
RuntimeMappings map[string]RuntimeMapping
7071

platform/model/simple_query.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,12 @@
33
package model
44

55
type SimpleQuery struct {
6-
WhereClause Expr
7-
OrderBy []OrderByExpr
8-
CanParse bool
6+
WhereClause Expr
7+
OrderBy []OrderByExpr
8+
SortFieldNames []string // SortFieldNames is used to preserve fields listed in the `sort` part of the query.
9+
// This can be different from the OrderBy clause, as it may contain Elasticsearch-internal fields like `_doc`.
10+
// In that case, it is not reflected in the OrderBy clause, but is still used for assembling the response.
11+
CanParse bool
912
// NeedCountWithLimit > 0 means we need count(*) LIMIT NeedCountWithLimit
1013
// NeedCountWithLimit 0 (WeNeedUnlimitedCount) means we need count(*) (unlimited)
1114
// NeedCountWithLimit -1 (WeDontNeedCount) means we don't need a count(*) query

platform/model/typical_queries/hits.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ func (query Hits) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.
9494
for _, fieldName := range query.sortFieldNames {
9595
if val, ok := hit.Fields[fieldName]; ok {
9696
hit.Sort = append(hit.Sort, elasticsearch.FormatSortValue(val[0]))
97+
} else if fieldName == "_doc" { // Kibana adds _doc as a tiebreaker field for sorting
98+
hit.Sort = append(hit.Sort, hit.ID)
9799
} else {
98100
logger.WarnWithCtx(query.ctx).Msgf("field %s not found in fields", fieldName)
99101
}

platform/parsers/elastic_query_dsl/query_parser.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -119,10 +119,10 @@ func (cw *ClickhouseQueryTranslator) buildListQueryIfNeeded(
119119
}
120120
if fullQuery != nil {
121121
highlighter.SetTokensToHighlight(fullQuery.SelectCommand)
122-
// TODO: pass right arguments
123-
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, fullQuery.SelectCommand.OrderByFieldNames(), true, false, false, cw.Indexes)
122+
queryType := typical_queries.NewHits(cw.Ctx, cw.Table, &highlighter, simpleQuery.SortFieldNames, true, false, false, cw.Indexes)
124123
fullQuery.Type = &queryType
125124
fullQuery.Highlighter = highlighter
125+
fullQuery.SearchAfterFieldNames = simpleQuery.SortFieldNames
126126
}
127127

128128
return fullQuery
@@ -155,7 +155,7 @@ func (cw *ClickhouseQueryTranslator) parseQueryInternal(body types.JSON) (*model
155155
}
156156

157157
if sortPart, ok := queryAsMap["sort"]; ok {
158-
parsedQuery.OrderBy = cw.parseSortFields(sortPart)
158+
parsedQuery.OrderBy, parsedQuery.SortFieldNames = cw.parseSortFields(sortPart)
159159
}
160160
size := cw.parseSize(queryAsMap, defaultQueryResultSize)
161161

@@ -1080,8 +1080,9 @@ func (cw *ClickhouseQueryTranslator) extractInterval(queryMap QueryMap) (interva
10801080

10811081
// parseSortFields parses sort fields from the query
10821082
// We're skipping ELK internal fields, like "_doc", "_id", etc. (we only accept field starting with "_" if it exists in our table)
1083-
func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns []model.OrderByExpr) {
1083+
func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns []model.OrderByExpr, sortFieldNames []string) {
10841084
sortColumns = make([]model.OrderByExpr, 0)
1085+
sortFieldNames = make([]string, 0)
10851086
switch sortMaps := sortMaps.(type) {
10861087
case []any:
10871088
for _, sortMapAsAny := range sortMaps {
@@ -1093,6 +1094,7 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns
10931094

10941095
// sortMap has only 1 key, so we can just iterate over it
10951096
for k, v := range sortMap {
1097+
sortFieldNames = append(sortFieldNames, k)
10961098
// TODO replace cw.Table.GetFieldInfo with schema.Field[]
10971099
if strings.HasPrefix(k, "_") && cw.Table.GetFieldInfo(cw.Ctx, ResolveField(cw.Ctx, k, cw.Schema)) == clickhouse.NotExists {
10981100
// we're skipping ELK internal fields, like "_doc", "_id", etc.
@@ -1125,9 +1127,10 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns
11251127
}
11261128
}
11271129
}
1128-
return sortColumns
1130+
return sortColumns, sortFieldNames
11291131
case map[string]interface{}:
11301132
for fieldName, fieldValue := range sortMaps {
1133+
sortFieldNames = append(sortFieldNames, fieldName)
11311134
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, ResolveField(cw.Ctx, fieldName, cw.Schema)) == clickhouse.NotExists {
11321135
// TODO Elastic internal fields will need to be supported in the future
11331136
continue
@@ -1141,10 +1144,11 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns
11411144
}
11421145
}
11431146

1144-
return sortColumns
1147+
return sortColumns, sortFieldNames
11451148

11461149
case map[string]string:
11471150
for fieldName, fieldValue := range sortMaps {
1151+
sortFieldNames = append(sortFieldNames, fieldName)
11481152
if strings.HasPrefix(fieldName, "_") && cw.Table.GetFieldInfo(cw.Ctx, ResolveField(cw.Ctx, fieldName, cw.Schema)) == clickhouse.NotExists {
11491153
// TODO Elastic internal fields will need to be supported in the future
11501154
continue
@@ -1156,10 +1160,10 @@ func (cw *ClickhouseQueryTranslator) parseSortFields(sortMaps any) (sortColumns
11561160
}
11571161
}
11581162

1159-
return sortColumns
1163+
return sortColumns, sortFieldNames
11601164
default:
11611165
logger.ErrorWithCtx(cw.Ctx).Msgf("unexpected type of sortMaps: %T, value: %v", sortMaps, sortMaps)
1162-
return []model.OrderByExpr{}
1166+
return []model.OrderByExpr{}, sortFieldNames
11631167
}
11641168
}
11651169

platform/parsers/elastic_query_dsl/query_parser_test.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -215,9 +215,10 @@ func TestQueryParserNoAttrsConfig(t *testing.T) {
215215

216216
func Test_parseSortFields(t *testing.T) {
217217
tests := []struct {
218-
name string
219-
sortMap any
220-
sortColumns []model.OrderByExpr
218+
name string
219+
sortMap any
220+
sortColumns []model.OrderByExpr
221+
sortFieldNames []string
221222
}{
222223
{
223224
name: "compound",
@@ -234,34 +235,39 @@ func Test_parseSortFields(t *testing.T) {
234235
model.NewSortColumn("no_order_field", model.AscOrder),
235236
model.NewSortColumn("_table_field_with_underscore", model.AscOrder),
236237
},
238+
sortFieldNames: []string{"@timestamp", "service.name", "no_order_field", "_table_field_with_underscore", "_doc"},
237239
},
238240
{
239-
name: "empty",
240-
sortMap: []any{},
241-
sortColumns: []model.OrderByExpr{},
241+
name: "empty",
242+
sortMap: []any{},
243+
sortColumns: []model.OrderByExpr{},
244+
sortFieldNames: []string{},
242245
},
243246
{
244247
name: "map[string]string",
245248
sortMap: map[string]string{
246249
"timestamp": "desc",
247250
"_doc": "desc",
248251
},
249-
sortColumns: []model.OrderByExpr{model.NewSortColumn("timestamp", model.DescOrder)},
252+
sortColumns: []model.OrderByExpr{model.NewSortColumn("timestamp", model.DescOrder)},
253+
sortFieldNames: []string{"timestamp", "_doc"},
250254
},
251255
{
252256
name: "map[string]interface{}",
253257
sortMap: map[string]interface{}{
254258
"timestamp": "desc",
255259
"_doc": "desc",
256260
},
257-
sortColumns: []model.OrderByExpr{model.NewSortColumn("timestamp", model.DescOrder)},
261+
sortColumns: []model.OrderByExpr{model.NewSortColumn("timestamp", model.DescOrder)},
262+
sortFieldNames: []string{"timestamp", "_doc"},
258263
}, {
259264
name: "[]map[string]string",
260265
sortMap: []any{
261266
QueryMap{"@timestamp": "asc"},
262267
QueryMap{"_doc": "asc"},
263268
},
264-
sortColumns: []model.OrderByExpr{model.NewSortColumn("@timestamp", model.AscOrder)},
269+
sortColumns: []model.OrderByExpr{model.NewSortColumn("@timestamp", model.AscOrder)},
270+
sortFieldNames: []string{"@timestamp", "_doc"},
265271
},
266272
}
267273
table, _ := clickhouse.NewTable(`CREATE TABLE `+tableName+`
@@ -272,7 +278,9 @@ func Test_parseSortFields(t *testing.T) {
272278
cw := ClickhouseQueryTranslator{Table: table, Ctx: context.Background()}
273279
for i, tt := range tests {
274280
t.Run(util.PrettyTestName(tt.name, i), func(t *testing.T) {
275-
assert.Equal(t, tt.sortColumns, cw.parseSortFields(tt.sortMap))
281+
orderBy, sortFieldNames := cw.parseSortFields(tt.sortMap)
282+
assert.Equal(t, tt.sortColumns, orderBy)
283+
assert.ElementsMatch(t, tt.sortFieldNames, sortFieldNames)
276284
})
277285
}
278286
}

0 commit comments

Comments
 (0)