From 2844fa7d6c3143aa7a969326cc8425477554d44c Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Fri, 11 Jul 2025 16:22:26 +0200 Subject: [PATCH 1/4] works --- platform/database_common/log_manager.go | 29 ++++++++++---- .../database_common/quesma_communicator.go | 6 ++- platform/database_common/util.go | 11 ----- platform/frontend_connectors/count_test.go | 40 ++++++++++++++++--- platform/frontend_connectors/search_util.go | 14 ++++--- 5 files changed, 68 insertions(+), 32 deletions(-) diff --git a/platform/database_common/log_manager.go b/platform/database_common/log_manager.go index 3d1c545f6..76595392c 100644 --- a/platform/database_common/log_manager.go +++ b/platform/database_common/log_manager.go @@ -6,6 +6,7 @@ package database_common import ( "context" "fmt" + "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/end_user_errors" "github.com/QuesmaOrg/quesma/platform/logger" @@ -199,13 +200,25 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Reg return util.Distinct(results), nil } +// buildCountQueryString builds query string which will be sent to DB +// Takes care whether `table` is "normal" or virtual +func (lm *LogManager) buildCountQueryString(table *Table) string { + if table.VirtualTable { + // CAUTION: Using only table.Name (and discarding table.DatabaseName) on purpose + // Can be changed if needed, but that'd complicate the usual case + return fmt.Sprintf("SELECT count(*) FROM %s WHERE %s='%s'", common_table.TableName, common_table.IndexNameColumn, table.Name) + } else { + return fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName()) + } +} + func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (count int64, err error) { if len(tables) == 0 { return } var subCountStatements []string for _, t := range tables { - subCountStatements = append(subCountStatements, fmt.Sprintf("(SELECT count(*) FROM %s)", t.FullTableName())) + subCountStatements = append(subCountStatements, fmt.Sprintf("(%s)", lm.buildCountQueryString(t))) } err = lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT sum(*) as count FROM (%s)", strings.Join(subCountStatements, " UNION ALL "))).Scan(&count) if err != nil { @@ -216,7 +229,9 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun func (lm *LogManager) Count(ctx context.Context, table *Table) (int64, error) { var count int64 - err := lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName())).Scan(&count) + fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildCountQueryString(table)) + + err := lm.chDb.QueryRow(ctx, lm.buildCountQueryString(table)).Scan(&count) if err != nil { return 0, fmt.Errorf("clickhouse: query row failed: %v", err) } @@ -315,7 +330,7 @@ func (lm *LogManager) GetTableDefinitions() (TableMap, error) { return *lm.tableDiscovery.TableDefinitions(), nil } -// Returns if schema wasn't created (so it needs to be, and will be in a moment) +// AddTableIfDoesntExist returns if schema wasn't created (so it needs to be, and will be in a moment) func (lm *LogManager) AddTableIfDoesntExist(table *Table) bool { t := lm.FindTable(table.Name) if t == nil { @@ -331,6 +346,10 @@ func (lm *LogManager) Ping() error { return lm.chDb.Ping() } +func (lm *LogManager) IsInTransparentProxyMode() bool { + return lm.cfg.TransparentProxy +} + func NewEmptyLogManager(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent diag.PhoneHomeClient, loader TableDiscovery) *LogManager { ctx, cancel := context.WithCancel(context.Background()) return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent} @@ -423,7 +442,3 @@ func NewChTableConfigTimestampStringAttr() *ChTableConfig { func (c *ChTableConfig) GetAttributes() []Attribute { return c.Attributes } - -func (l *LogManager) IsInTransparentProxyMode() bool { - return l.cfg.TransparentProxy -} diff --git a/platform/database_common/quesma_communicator.go b/platform/database_common/quesma_communicator.go index 696aa7771..ec3bdf9c1 100644 --- a/platform/database_common/quesma_communicator.go +++ b/platform/database_common/quesma_communicator.go @@ -118,7 +118,7 @@ func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed ti defer rows.Close() if rows.Next() { - err := rows.Scan(&explain) + err = rows.Scan(&explain) if err != nil { logger.ErrorWithCtx(ctx).Msgf("failed to scan slow query explain: %v", err) return "" @@ -151,7 +151,9 @@ func getQueryId(ctx context.Context) string { return fmt.Sprintf("%s-%d", prefix, queryCounter.Add(1)) } -func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (res []model.QueryResultRow, performanceResult PerformanceResult, err error) { +func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) ( + res []model.QueryResultRow, performanceResult PerformanceResult, err error) { + span := lm.phoneHomeAgent.ClickHouseQueryDuration().Begin() queryAsString := query.SelectCommand.String() diff --git a/platform/database_common/util.go b/platform/database_common/util.go index 68e19aebf..6ac78b144 100644 --- a/platform/database_common/util.go +++ b/platform/database_common/util.go @@ -3,11 +3,8 @@ package database_common import ( - "bytes" - "fmt" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/model" - "github.com/goccy/go-json" "strings" "time" ) @@ -104,14 +101,6 @@ func parseTypeFromShowColumns(typ, name string) (Type, string) { return parseTypeRec(typ, name) } -func PrettyJson(jsonStr string) string { - var prettyJSON bytes.Buffer - if err := json.Indent(&prettyJSON, []byte(jsonStr), "", " "); err != nil { - return fmt.Sprintf("PrettyJson err: %v\n", err) - } - return prettyJSON.String() -} - // TimestampGroupBy returns string to be used in the select part of Clickhouse query, when grouping by timestamp interval. // e.g. // - timestampGroupBy("@timestamp", DateTime64, 30 seconds) --> toInt64(toUnixTimestamp64Milli(`@timestamp`)/30000) diff --git a/platform/frontend_connectors/count_test.go b/platform/frontend_connectors/count_test.go index cecde9bf1..605f25ef2 100644 --- a/platform/frontend_connectors/count_test.go +++ b/platform/frontend_connectors/count_test.go @@ -3,9 +3,11 @@ package frontend_connectors import ( + "fmt" "github.com/DATA-DOG/go-sqlmock" "github.com/QuesmaOrg/quesma/platform/ab_testing" "github.com/QuesmaOrg/quesma/platform/backend_connectors" + "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/database_common" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/schema" @@ -20,10 +22,12 @@ import ( func TestCountEndpoint(t *testing.T) { staticRegistry := &schema.StaticRegistry{ Tables: map[schema.IndexName]schema.Schema{ - "no_db_name": {Fields: map[schema.FieldName]schema.Field{}}, - "with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"}, - "common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"}, - "common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}}, + "no_db_name": {Fields: map[schema.FieldName]schema.Field{}}, + "with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"}, + "common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"}, + "common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}}, + "in_common_table_1": {Fields: map[schema.FieldName]schema.Field{}}, + "in_common_table_2": {Fields: map[schema.FieldName]schema.Field{}}, }, } @@ -40,6 +44,14 @@ func TestCountEndpoint(t *testing.T) { tables.Store("common_prefix_2", &database_common.Table{ Name: "common_prefix_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{}, }) + tables.Store("in_common_table_1", &database_common.Table{ + Name: "in_common_table_1", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{}, + VirtualTable: true, + }) + tables.Store("in_common_table_2", &database_common.Table{ + Name: "in_common_table_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{}, DatabaseName: "db_name", + VirtualTable: true, + }) conn, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer conn.Close() @@ -63,8 +75,24 @@ func TestCountEndpoint(t *testing.T) { }{ {"no_db_name", `SELECT count(*) FROM "no_db_name"`}, {"with_db_name", `SELECT count(*) FROM "db_name"."with_db_name"`}, - {"common_prefix*", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`}, - {"common_prefix_1,common_prefix_2", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`}, + {"common_prefix*", + `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`}, + {"common_prefix_1,common_prefix_2", + `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`}, + {"in_common_table_1", + fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_1'`, common_table.TableName, common_table.IndexNameColumn)}, + {"in_common_table_2", + fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_2'`, common_table.TableName, common_table.IndexNameColumn)}, + {"in_common_table*", + fmt.Sprintf(`SELECT sum(*) as count +FROM (( + SELECT count(*) + FROM %s + WHERE %s='in_common_table_1') UNION ALL ( + SELECT count(*) + FROM %s + WHERE %s='in_common_table_2')) +`, common_table.TableName, common_table.IndexNameColumn, common_table.TableName, common_table.IndexNameColumn)}, } for _, tc := range testcases { diff --git a/platform/frontend_connectors/search_util.go b/platform/frontend_connectors/search_util.go index ba4dd8a5b..cc6beea07 100644 --- a/platform/frontend_connectors/search_util.go +++ b/platform/frontend_connectors/search_util.go @@ -15,7 +15,8 @@ import ( quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core" ) -func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) (clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) { +func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) ( + clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) { for _, connector := range decision.UseConnectors { switch c := connector.(type) { @@ -41,7 +42,8 @@ func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decis return clickhouseConnector, nil } -func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) (respIfWeEndSearch []byte, err error, weEndSearch bool) { +func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) ( + respIfWeEndSearch []byte, err error, weEndSearch bool) { if decision.Err != nil { if optAsync != nil { @@ -74,8 +76,8 @@ func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.De return []byte{}, nil, false } -func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, - tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) { +func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap, + optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) { if clickhouseConnector.IsCommonTable { return q.resolveIndexesCommonTable(ctx, clickhouseConnector, tables, optAsync) @@ -134,8 +136,8 @@ func (q *QueryRunner) resolveIndexesNonCommonTable(ctx context.Context, clickhou return } -func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, - tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) { +func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap, + optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) { // here we filter out indexes that are not stored in the common table var virtualOnlyTables []string From 3716d36270d6e5bbd84ae552c5450d209293e283 Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Fri, 11 Jul 2025 16:39:56 +0200 Subject: [PATCH 2/4] better name I think --- platform/database_common/log_manager.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/platform/database_common/log_manager.go b/platform/database_common/log_manager.go index 76595392c..389f52178 100644 --- a/platform/database_common/log_manager.go +++ b/platform/database_common/log_manager.go @@ -200,9 +200,9 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Reg return util.Distinct(results), nil } -// buildCountQueryString builds query string which will be sent to DB +// buildStringOfCountQuery builds query string which will be sent to DB // Takes care whether `table` is "normal" or virtual -func (lm *LogManager) buildCountQueryString(table *Table) string { +func (lm *LogManager) buildStringOfCountQuery(table *Table) string { if table.VirtualTable { // CAUTION: Using only table.Name (and discarding table.DatabaseName) on purpose // Can be changed if needed, but that'd complicate the usual case @@ -218,7 +218,7 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun } var subCountStatements []string for _, t := range tables { - subCountStatements = append(subCountStatements, fmt.Sprintf("(%s)", lm.buildCountQueryString(t))) + subCountStatements = append(subCountStatements, fmt.Sprintf("(%s)", lm.buildStringOfCountQuery(t))) } err = lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT sum(*) as count FROM (%s)", strings.Join(subCountStatements, " UNION ALL "))).Scan(&count) if err != nil { @@ -229,9 +229,9 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun func (lm *LogManager) Count(ctx context.Context, table *Table) (int64, error) { var count int64 - fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildCountQueryString(table)) + fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildStringOfCountQuery(table)) - err := lm.chDb.QueryRow(ctx, lm.buildCountQueryString(table)).Scan(&count) + err := lm.chDb.QueryRow(ctx, lm.buildStringOfCountQuery(table)).Scan(&count) if err != nil { return 0, fmt.Errorf("clickhouse: query row failed: %v", err) } From 62d87599901df655fdb96b54f8a97533744622a4 Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Sat, 12 Jul 2025 10:59:26 +0200 Subject: [PATCH 3/4] remove debug --- platform/database_common/log_manager.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/platform/database_common/log_manager.go b/platform/database_common/log_manager.go index 389f52178..a736b7098 100644 --- a/platform/database_common/log_manager.go +++ b/platform/database_common/log_manager.go @@ -229,8 +229,6 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun func (lm *LogManager) Count(ctx context.Context, table *Table) (int64, error) { var count int64 - fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildStringOfCountQuery(table)) - err := lm.chDb.QueryRow(ctx, lm.buildStringOfCountQuery(table)).Scan(&count) if err != nil { return 0, fmt.Errorf("clickhouse: query row failed: %v", err) From bc3572aa60a17e16853017256299784c2cb10487 Mon Sep 17 00:00:00 2001 From: Krzysztof Kiewicz Date: Sat, 12 Jul 2025 11:20:13 +0200 Subject: [PATCH 4/4] better comment --- platform/database_common/log_manager.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/platform/database_common/log_manager.go b/platform/database_common/log_manager.go index a736b7098..570d48e45 100644 --- a/platform/database_common/log_manager.go +++ b/platform/database_common/log_manager.go @@ -200,8 +200,9 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Reg return util.Distinct(results), nil } -// buildStringOfCountQuery builds query string which will be sent to DB -// Takes care whether `table` is "normal" or virtual +// buildStringOfCountQuery builds string of count query which will be sent to DB (for 1 table). +// If we query multiple tables at once, it needs to be called multiple times. +// Takes care of whether `table` is "normal" or virtual (common_table) func (lm *LogManager) buildStringOfCountQuery(table *Table) string { if table.VirtualTable { // CAUTION: Using only table.Name (and discarding table.DatabaseName) on purpose