Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 9e47844

Browse files
nablaonetrzysiek
andauthored
Fix "Field statistics" panel (#1076)
This PR fixes the following cases: 1. simplify "exists" query parser 2. handle non-existing fields 3. handle falling back to the `attributes_values` column if available 2,3 may introduce a new kind of SQL error. Some functions do not handle `NULL` well. There are skipped tests. @trzysiek please take a look --------- Co-authored-by: Krzysztof Kiewicz <[email protected]>
1 parent 4351eef commit 9e47844

File tree

12 files changed

+379
-129
lines changed

12 files changed

+379
-129
lines changed

quesma/clickhouse/table_discovery.go

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,25 @@ type TableDiscoveryTableProviderAdapter struct {
8080
}
8181

8282
func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema.Table {
83+
84+
// here we filter out our internal columns
85+
86+
internalColumn := make(map[string]bool)
87+
internalColumn[AttributesValuesColumn] = true
88+
internalColumn[AttributesMetadataColumn] = true
89+
internalColumn[DeprecatedAttributesKeyColumn] = true
90+
internalColumn[DeprecatedAttributesValueColumn] = true
91+
internalColumn[DeprecatedAttributesValueType] = true
92+
8393
tableMap := t.TableDiscovery.TableDefinitions()
8494
tables := make(map[string]schema.Table)
8595
tableMap.Range(func(tableName string, value *Table) bool {
8696
table := schema.Table{Columns: make(map[string]schema.Column)}
8797
for _, column := range value.Cols {
98+
if internalColumn[column.Name] {
99+
continue
100+
}
101+
88102
table.Columns[column.Name] = schema.Column{
89103
Name: column.Name,
90104
Type: column.Type.String(),
@@ -294,13 +308,6 @@ func (td *tableDiscovery) autoConfigureTables(tables map[string]map[string]colum
294308

295309
func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]discoveredTable, databaseName string, cfg *config.QuesmaConfiguration) {
296310

297-
internalColumn := make(map[string]bool)
298-
internalColumn[AttributesValuesColumn] = true
299-
internalColumn[AttributesMetadataColumn] = true
300-
internalColumn[DeprecatedAttributesKeyColumn] = true
301-
internalColumn[DeprecatedAttributesValueColumn] = true
302-
internalColumn[DeprecatedAttributesValueType] = true
303-
304311
tableMap := NewTableMap()
305312
for tableName, resTable := range configuredTables {
306313
var columnsMap = make(map[string]*Column)
@@ -312,16 +319,16 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
312319
continue
313320
}
314321
}
315-
if !internalColumn[col] {
316-
column := resolveColumn(col, columnMeta.colType)
317-
if column != nil {
318-
column.Comment = columnMeta.comment
319-
columnsMap[col] = column
320-
} else {
321-
logger.Warn().Msgf("column '%s.%s' type: '%s' not resolved. table will be skipped", tableName, col, columnMeta.colType)
322-
partiallyResolved = true
323-
}
322+
323+
column := resolveColumn(col, columnMeta.colType)
324+
if column != nil {
325+
column.Comment = columnMeta.comment
326+
columnsMap[col] = column
327+
} else {
328+
logger.Warn().Msgf("column '%s.%s' type: '%s' not resolved. table will be skipped", tableName, col, columnMeta.colType)
329+
partiallyResolved = true
324330
}
331+
325332
}
326333

327334
var timestampFieldName *string

quesma/queryparser/query_parser.go

Lines changed: 2 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ import (
1616
"quesma/model/typical_queries"
1717
"quesma/queryparser/lucene"
1818
"quesma/quesma/types"
19-
"quesma/schema"
2019
"quesma/util"
2120
"strconv"
2221
"strings"
@@ -894,38 +893,10 @@ func (cw *ClickhouseQueryTranslator) parseExists(queryMap QueryMap) model.Simple
894893
logger.WarnWithCtx(cw.Ctx).Msgf("invalid exists type: %T, value: %v", v, v)
895894
return model.NewSimpleQuery(nil, false)
896895
}
897-
fieldName = cw.ResolveField(cw.Ctx, fieldName)
898-
fieldNameQuoted := strconv.Quote(fieldName)
899-
900-
switch cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)) {
901-
case clickhouse.ExistsAndIsBaseType:
902-
sql = model.NewInfixExpr(model.NewColumnRef(fieldName), "IS", model.NewLiteral("NOT NULL"))
903-
case clickhouse.ExistsAndIsArray:
904-
sql = model.NewInfixExpr(model.NewNestedProperty(
905-
model.NewColumnRef(fieldName),
906-
model.NewLiteral("size0"),
907-
), "=", model.NewLiteral("0"))
908-
case clickhouse.NotExists:
909-
// TODO this is a workaround for the case when the field is a point
910-
schemaInstance := cw.Schema
911-
if value, ok := schemaInstance.ResolveFieldByInternalName(fieldName); ok && value.Type.Equal(schema.QuesmaTypePoint) {
912-
return model.NewSimpleQuery(sql, true)
913-
}
914896

915-
attrs := cw.Table.GetAttributesList()
916-
stmts := make([]model.Expr, len(attrs))
917-
for i, a := range attrs {
918-
hasFunc := model.NewFunction("has", []model.Expr{model.NewColumnRef(a.KeysArrayName), model.NewColumnRef(fieldName)}...)
919-
arrayAccess := model.NewArrayAccess(model.NewColumnRef(a.ValuesArrayName), model.NewFunction("indexOf", []model.Expr{model.NewColumnRef(a.KeysArrayName), model.NewLiteral(fieldNameQuoted)}...))
920-
isNotNull := model.NewInfixExpr(arrayAccess, "IS", model.NewLiteral("NOT NULL"))
921-
compoundStatementNoFieldName := model.NewInfixExpr(hasFunc, "AND", isNotNull)
922-
stmts[i] = compoundStatementNoFieldName
923-
}
924-
sql = model.Or(stmts)
925-
default:
926-
logger.WarnWithCtx(cw.Ctx).Msgf("invalid field type: %T for exists: %s", cw.Table.GetFieldInfo(cw.Ctx, cw.ResolveField(cw.Ctx, fieldName)), fieldName)
927-
}
897+
sql = model.NewInfixExpr(model.NewColumnRef(fieldName), "IS", model.NewLiteral("NOT NULL"))
928898
}
899+
929900
return model.NewSimpleQuery(sql, true)
930901
}
931902

quesma/queryparser/query_parser_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
func TestQueryParserStringAttrConfig(t *testing.T) {
3030
tableName := "logs-generic-default"
3131
table, err := clickhouse.NewTable(`CREATE TABLE `+tableName+`
32-
( "message" String, "@timestamp" DateTime64(3, 'UTC') )
32+
( "message" String, "@timestamp" DateTime64(3, 'UTC'), "attributes_values" Map(String,String))
3333
ENGINE = Memory`,
3434
clickhouse.NewNoTimestampOnlyStringAttrCHConfig(),
3535
)
@@ -48,6 +48,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) {
4848
Fields: map[schema.FieldName]schema.Field{
4949
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
5050
"type": {PropertyName: "type", InternalPropertyName: "type", Type: schema.QuesmaTypeText},
51+
"task.enabled": {PropertyName: "task.enabled", InternalPropertyName: "task_enabled", Type: schema.QuesmaTypeBoolean},
5152
"name": {PropertyName: "name", InternalPropertyName: "name", Type: schema.QuesmaTypeText},
5253
"content": {PropertyName: "content", InternalPropertyName: "content", Type: schema.QuesmaTypeText},
5354
"message": {PropertyName: "message", InternalPropertyName: "message", Type: schema.QuesmaTypeText},
@@ -156,7 +157,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) {
156157
func TestQueryParserNoAttrsConfig(t *testing.T) {
157158
tableName := "logs-generic-default"
158159
table, err := clickhouse.NewTable(`CREATE TABLE `+tableName+`
159-
( "message" String, "@timestamp" DateTime64(3, 'UTC') )
160+
( "message" String, "@timestamp" DateTime64(3, 'UTC'), "attributes_values" Map(String,String)))
160161
ENGINE = Memory`,
161162
clickhouse.NewChTableConfigNoAttrs(),
162163
)

quesma/quesma/dual_write_proxy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (q *dualWriteHttpProxy) Stop(ctx context.Context) {
8484
}
8585

8686
func newDualWriteProxy(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, processor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxy {
87-
queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver)
87+
queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver, schemaLoader)
8888
// not sure how we should configure our query translator ???
8989
// is this a config option??
9090

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
7373
}
7474

7575
func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
76-
queryProcessor := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver)
76+
queryProcessor := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver, schemaLoader)
77+
7778
// not sure how we should configure our query translator ???
7879
// is this a config option??
7980

quesma/quesma/schema_transformer.go

Lines changed: 87 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ package quesma
44

55
import (
66
"fmt"
7+
"quesma/clickhouse"
78
"quesma/common_table"
89
"quesma/logger"
910
"quesma/model"
@@ -15,7 +16,8 @@ import (
1516
)
1617

1718
type SchemaCheckPass struct {
18-
cfg *config.QuesmaConfiguration
19+
cfg *config.QuesmaConfiguration
20+
tableDiscovery clickhouse.TableDiscovery
1921
}
2022

2123
func (s *SchemaCheckPass) applyBooleanLiteralLowering(index schema.Schema, query *model.Query) (*model.Query, error) {
@@ -466,7 +468,7 @@ func (s *SchemaCheckPass) applyWildcardExpansion(indexSchema schema.Schema, quer
466468
for _, col := range indexSchema.Fields {
467469
// Take only fields that are ingested
468470
if col.Origin == schema.FieldSourceIngest {
469-
cols = append(cols, col.InternalPropertyName.AsString())
471+
cols = append(cols, col.PropertyName.AsString())
470472
}
471473
}
472474

@@ -630,29 +632,96 @@ func (s *SchemaCheckPass) applyTimestampField(indexSchema schema.Schema, query *
630632
}
631633

632634
func (s *SchemaCheckPass) applyFieldEncoding(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
635+
table, ok := s.tableDiscovery.TableDefinitions().Load(query.TableName)
636+
if !ok {
637+
return nil, fmt.Errorf("table %s not found", query.TableName)
638+
}
639+
_, hasAttributesValuesColumn := table.Cols[clickhouse.AttributesValuesColumn]
633640

634641
visitor := model.NewBaseVisitor()
635642

636-
var err error
637-
638643
visitor.OverrideVisitColumnRef = func(b *model.BaseExprVisitor, e model.ColumnRef) interface{} {
639644

645+
// we don't want to resolve our well know technical fields
646+
if e.ColumnName == model.FullTextFieldNamePlaceHolder || e.ColumnName == common_table.IndexNameColumn {
647+
return e
648+
}
649+
650+
// This is workaround.
651+
// Our query parse resolves columns sometimes. Here we detect it and skip the resolution.
652+
if _, ok := indexSchema.ResolveFieldByInternalName(e.ColumnName); ok {
653+
logger.Warn().Msgf("Got field already resolved %s", e.ColumnName)
654+
return e
655+
}
656+
640657
if resolvedField, ok := indexSchema.ResolveField(e.ColumnName); ok {
641658
return model.NewColumnRef(resolvedField.InternalPropertyName.AsString())
642659
} else {
643-
return e
660+
if hasAttributesValuesColumn {
661+
return model.NewArrayAccess(model.NewColumnRef(clickhouse.AttributesValuesColumn), model.NewLiteral(fmt.Sprintf("'%s'", e.ColumnName)))
662+
} else {
663+
return model.NewLiteral("NULL")
664+
}
644665
}
645666
}
646667

647-
expr := query.SelectCommand.Accept(visitor)
668+
visitor.OverrideVisitSelectCommand = func(v *model.BaseExprVisitor, query model.SelectCommand) interface{} {
669+
var columns, groupBy []model.Expr
670+
var orderBy []model.OrderByExpr
671+
from := query.FromClause
672+
where := query.WhereClause
648673

649-
if err != nil {
650-
return nil, err
674+
for _, expr := range query.Columns {
675+
var alias string
676+
if e, ok := expr.(model.ColumnRef); ok {
677+
alias = e.ColumnName
678+
}
679+
680+
col := expr.Accept(v).(model.Expr)
681+
682+
if e, ok := col.(model.ArrayAccess); ok && alias != "" {
683+
col = model.NewAliasedExpr(e, alias)
684+
} else if e, ok := col.(model.LiteralExpr); ok && alias != "" && e.Value == "NULL" {
685+
col = model.NewAliasedExpr(e, alias)
686+
}
687+
688+
columns = append(columns, col)
689+
}
690+
for _, expr := range query.GroupBy {
691+
groupBy = append(groupBy, expr.Accept(v).(model.Expr))
692+
}
693+
for _, expr := range query.OrderBy {
694+
orderBy = append(orderBy, expr.Accept(v).(model.OrderByExpr))
695+
}
696+
if query.FromClause != nil {
697+
from = query.FromClause.Accept(v).(model.Expr)
698+
}
699+
if query.WhereClause != nil {
700+
where = query.WhereClause.Accept(v).(model.Expr)
701+
}
702+
703+
var namedCTEs []*model.CTE
704+
if query.NamedCTEs != nil {
705+
for _, cte := range query.NamedCTEs {
706+
namedCTEs = append(namedCTEs, cte.Accept(v).(*model.CTE))
707+
}
708+
}
709+
710+
var limitBy []model.Expr
711+
if query.LimitBy != nil {
712+
for _, expr := range query.LimitBy {
713+
limitBy = append(limitBy, expr.Accept(v).(model.Expr))
714+
}
715+
}
716+
return model.NewSelectCommand(columns, groupBy, orderBy, from, where, limitBy, query.Limit, query.SampleLimit, query.IsDistinct, namedCTEs)
651717
}
652718

719+
expr := query.SelectCommand.Accept(visitor)
720+
653721
if _, ok := expr.(*model.SelectCommand); ok {
654722
query.SelectCommand = *expr.(*model.SelectCommand)
655723
}
724+
656725
return query, nil
657726
}
658727

@@ -726,7 +795,7 @@ func (s *SchemaCheckPass) convertQueryDateTimeFunctionToClickhouse(indexSchema s
726795

727796
func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema, query *model.Query) (*model.Query, error) {
728797

729-
aggFunctionPrefixes := []string{"sum", "avg"}
798+
aggFunctionPrefixes := []string{"sum", "avg", "quantiles"}
730799

731800
dbTypePrefixes := []string{"DateTime", "String", "LowCardinality(String)"}
732801

@@ -751,6 +820,15 @@ func (s *SchemaCheckPass) checkAggOverUnsupportedType(indexSchema schema.Schema,
751820
}
752821
}
753822
}
823+
// attributes values are always string,
824+
if access, ok := e.Args[0].(model.ArrayAccess); ok {
825+
if access.ColumnRef.ColumnName == clickhouse.AttributesValuesColumn {
826+
logger.Warn().Msgf("Unsupported case. Aggregation '%s' over attribute named: '%s'", e.Name, access.Index)
827+
args := b.VisitChildren(e.Args)
828+
args[0] = model.NewLiteral("NULL")
829+
return model.NewFunction(e.Name, args...)
830+
}
831+
}
754832
}
755833
}
756834
}

quesma/quesma/schema_transformer_test.go

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,15 @@ func Test_ipRangeTransform(t *testing.T) {
6060
IndexConfig: indexConfig,
6161
}
6262

63-
tableDiscovery :=
63+
tableMap := clickhouse.NewTableMap()
64+
65+
tableDiscovery := clickhouse.NewEmptyTableDiscovery()
66+
tableDiscovery.TableMap = tableMap
67+
for indexName := range indexConfig {
68+
tableMap.Store(indexName, clickhouse.NewEmptyTable(indexName))
69+
}
70+
71+
tableProvider :=
6472
fixedTableProvider{tables: map[string]schema.Table{
6573
"kibana_sample_data_flights": {Columns: map[string]schema.Column{
6674
"destlocation": {Name: "destlocation", Type: "geo_point"},
@@ -77,8 +85,8 @@ func Test_ipRangeTransform(t *testing.T) {
7785
{
7886
TableName: "kibana_sample_data_logs_nested", FieldName: "nested.clientip"}: "nested_clientip",
7987
}
80-
s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
81-
transform := &SchemaCheckPass{cfg: &cfg}
88+
s := schema.NewSchemaRegistry(tableProvider, &cfg, clickhouse.SchemaTypeAdapter{})
89+
transform := &SchemaCheckPass{cfg: &cfg, tableDiscovery: tableDiscovery}
8290
s.UpdateFieldEncodings(fieldEncodings)
8391

8492
selectColumns := []model.Expr{model.NewColumnRef("message")}
@@ -425,7 +433,15 @@ func Test_arrayType(t *testing.T) {
425433
Fields: fields,
426434
}
427435

428-
transform := &SchemaCheckPass{cfg: &config.QuesmaConfiguration{IndexConfig: indexConfig}}
436+
tableMap := clickhouse.NewTableMap()
437+
438+
tableDiscovery := clickhouse.NewEmptyTableDiscovery()
439+
tableDiscovery.TableMap = tableMap
440+
for indexName := range indexConfig {
441+
tableMap.Store(indexName, clickhouse.NewEmptyTable(indexName))
442+
}
443+
444+
transform := &SchemaCheckPass{cfg: &config.QuesmaConfiguration{IndexConfig: indexConfig}, tableDiscovery: tableDiscovery}
429445

430446
tests := []struct {
431447
name string
@@ -459,7 +475,7 @@ func Test_arrayType(t *testing.T) {
459475
FromClause: model.NewTableRef("kibana_sample_data_ecommerce"),
460476
Columns: []model.Expr{
461477
model.NewColumnRef("order_date"),
462-
model.NewFunction("sumOrNull", model.NewColumnRef("products_quantity")),
478+
model.NewFunction("sumOrNull", model.NewColumnRef("products.quantity")),
463479
},
464480
GroupBy: []model.Expr{model.NewColumnRef("order_date")},
465481
},
@@ -492,7 +508,7 @@ func Test_arrayType(t *testing.T) {
492508
model.NewCountFunc(),
493509
},
494510
WhereClause: model.NewInfixExpr(
495-
model.NewColumnRef("products_name"),
511+
model.NewColumnRef("products.name"),
496512
"ILIKE",
497513
model.NewLiteral("%foo%"),
498514
),
@@ -530,7 +546,7 @@ func Test_arrayType(t *testing.T) {
530546
model.NewCountFunc(),
531547
},
532548
WhereClause: model.NewInfixExpr(
533-
model.NewColumnRef("products_sku"),
549+
model.NewColumnRef("products.sku"),
534550
"=",
535551
model.NewLiteral("'XYZ'"),
536552
),

0 commit comments

Comments
 (0)