Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions platform/database_common/log_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package database_common
import (
"context"
"fmt"
"github.com/QuesmaOrg/quesma/platform/common_table"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/end_user_errors"
"github.com/QuesmaOrg/quesma/platform/logger"
Expand Down Expand Up @@ -199,13 +200,25 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Reg
return util.Distinct(results), nil
}

// buildStringOfCountQuery builds query string which will be sent to DB
// Takes care whether `table` is "normal" or virtual
func (lm *LogManager) buildStringOfCountQuery(table *Table) string {
if table.VirtualTable {
// CAUTION: Using only table.Name (and discarding table.DatabaseName) on purpose
// Can be changed if needed, but that'd complicate the usual case
return fmt.Sprintf("SELECT count(*) FROM %s WHERE %s='%s'", common_table.TableName, common_table.IndexNameColumn, table.Name)
} else {
return fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName())
}
}

func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (count int64, err error) {
if len(tables) == 0 {
return
}
var subCountStatements []string
for _, t := range tables {
subCountStatements = append(subCountStatements, fmt.Sprintf("(SELECT count(*) FROM %s)", t.FullTableName()))
subCountStatements = append(subCountStatements, fmt.Sprintf("(%s)", lm.buildStringOfCountQuery(t)))
}
err = lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT sum(*) as count FROM (%s)", strings.Join(subCountStatements, " UNION ALL "))).Scan(&count)
if err != nil {
Expand All @@ -216,7 +229,9 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun

func (lm *LogManager) Count(ctx context.Context, table *Table) (int64, error) {
var count int64
err := lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName())).Scan(&count)
fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildStringOfCountQuery(table))

err := lm.chDb.QueryRow(ctx, lm.buildStringOfCountQuery(table)).Scan(&count)
if err != nil {
return 0, fmt.Errorf("clickhouse: query row failed: %v", err)
}
Expand Down Expand Up @@ -315,7 +330,7 @@ func (lm *LogManager) GetTableDefinitions() (TableMap, error) {
return *lm.tableDiscovery.TableDefinitions(), nil
}

// Returns if schema wasn't created (so it needs to be, and will be in a moment)
// AddTableIfDoesntExist returns if schema wasn't created (so it needs to be, and will be in a moment)
func (lm *LogManager) AddTableIfDoesntExist(table *Table) bool {
t := lm.FindTable(table.Name)
if t == nil {
Expand All @@ -331,6 +346,10 @@ func (lm *LogManager) Ping() error {
return lm.chDb.Ping()
}

func (lm *LogManager) IsInTransparentProxyMode() bool {
return lm.cfg.TransparentProxy
}

Comment on lines +348 to +351
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

moved up so all methods are grouped together

func NewEmptyLogManager(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent diag.PhoneHomeClient, loader TableDiscovery) *LogManager {
ctx, cancel := context.WithCancel(context.Background())
return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent}
Expand Down Expand Up @@ -423,7 +442,3 @@ func NewChTableConfigTimestampStringAttr() *ChTableConfig {
func (c *ChTableConfig) GetAttributes() []Attribute {
return c.Attributes
}

func (l *LogManager) IsInTransparentProxyMode() bool {
return l.cfg.TransparentProxy
}
6 changes: 4 additions & 2 deletions platform/database_common/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed ti

defer rows.Close()
if rows.Next() {
err := rows.Scan(&explain)
err = rows.Scan(&explain)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("failed to scan slow query explain: %v", err)
return ""
Expand Down Expand Up @@ -151,7 +151,9 @@ func getQueryId(ctx context.Context) string {
return fmt.Sprintf("%s-%d", prefix, queryCounter.Add(1))
}

func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (res []model.QueryResultRow, performanceResult PerformanceResult, err error) {
func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (
res []model.QueryResultRow, performanceResult PerformanceResult, err error) {

Comment on lines -154 to +156
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

broke down this and a few other lines to make them fit in 1 line on laptop

span := lm.phoneHomeAgent.ClickHouseQueryDuration().Begin()

queryAsString := query.SelectCommand.String()
Expand Down
11 changes: 0 additions & 11 deletions platform/database_common/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,8 @@
package database_common

import (
"bytes"
"fmt"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/model"
"github.com/goccy/go-json"
"strings"
"time"
)
Expand Down Expand Up @@ -104,14 +101,6 @@ func parseTypeFromShowColumns(typ, name string) (Type, string) {
return parseTypeRec(typ, name)
}

func PrettyJson(jsonStr string) string {
var prettyJSON bytes.Buffer
if err := json.Indent(&prettyJSON, []byte(jsonStr), "", " "); err != nil {
return fmt.Sprintf("PrettyJson err: %v\n", err)
}
return prettyJSON.String()
}

Comment on lines -107 to -114
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unused, we have better one already

// TimestampGroupBy returns string to be used in the select part of Clickhouse query, when grouping by timestamp interval.
// e.g.
// - timestampGroupBy("@timestamp", DateTime64, 30 seconds) --> toInt64(toUnixTimestamp64Milli(`@timestamp`)/30000)
Expand Down
40 changes: 34 additions & 6 deletions platform/frontend_connectors/count_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
package frontend_connectors

import (
"fmt"
"github.com/DATA-DOG/go-sqlmock"
"github.com/QuesmaOrg/quesma/platform/ab_testing"
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
"github.com/QuesmaOrg/quesma/platform/common_table"
"github.com/QuesmaOrg/quesma/platform/database_common"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/schema"
Expand All @@ -20,10 +22,12 @@ import (
func TestCountEndpoint(t *testing.T) {
staticRegistry := &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"no_db_name": {Fields: map[schema.FieldName]schema.Field{}},
"with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
"common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
"common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}},
"no_db_name": {Fields: map[schema.FieldName]schema.Field{}},
"with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
"common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
"common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}},
"in_common_table_1": {Fields: map[schema.FieldName]schema.Field{}},
"in_common_table_2": {Fields: map[schema.FieldName]schema.Field{}},
},
}

Expand All @@ -40,6 +44,14 @@ func TestCountEndpoint(t *testing.T) {
tables.Store("common_prefix_2", &database_common.Table{
Name: "common_prefix_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{},
})
tables.Store("in_common_table_1", &database_common.Table{
Name: "in_common_table_1", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{},
VirtualTable: true,
})
tables.Store("in_common_table_2", &database_common.Table{
Name: "in_common_table_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{}, DatabaseName: "db_name",
VirtualTable: true,
})

conn, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false)
defer conn.Close()
Expand All @@ -63,8 +75,24 @@ func TestCountEndpoint(t *testing.T) {
}{
{"no_db_name", `SELECT count(*) FROM "no_db_name"`},
{"with_db_name", `SELECT count(*) FROM "db_name"."with_db_name"`},
{"common_prefix*", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
{"common_prefix_1,common_prefix_2", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
{"common_prefix*",
`SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
{"common_prefix_1,common_prefix_2",
`SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
{"in_common_table_1",
fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_1'`, common_table.TableName, common_table.IndexNameColumn)},
{"in_common_table_2",
fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_2'`, common_table.TableName, common_table.IndexNameColumn)},
{"in_common_table*",
fmt.Sprintf(`SELECT sum(*) as count
FROM ((
SELECT count(*)
FROM %s
WHERE %s='in_common_table_1') UNION ALL (
SELECT count(*)
FROM %s
WHERE %s='in_common_table_2'))
`, common_table.TableName, common_table.IndexNameColumn, common_table.TableName, common_table.IndexNameColumn)},
}

for _, tc := range testcases {
Expand Down
14 changes: 8 additions & 6 deletions platform/frontend_connectors/search_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@ import (
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
)

func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) (clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) {
func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) (
clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) {

for _, connector := range decision.UseConnectors {
switch c := connector.(type) {
Expand All @@ -41,7 +42,8 @@ func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decis
return clickhouseConnector, nil
}

func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) (respIfWeEndSearch []byte, err error, weEndSearch bool) {
func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) (
respIfWeEndSearch []byte, err error, weEndSearch bool) {

if decision.Err != nil {
if optAsync != nil {
Expand Down Expand Up @@ -74,8 +76,8 @@ func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.De
return []byte{}, nil, false
}

func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse,
tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap,
optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {

if clickhouseConnector.IsCommonTable {
return q.resolveIndexesCommonTable(ctx, clickhouseConnector, tables, optAsync)
Expand Down Expand Up @@ -134,8 +136,8 @@ func (q *QueryRunner) resolveIndexesNonCommonTable(ctx context.Context, clickhou
return
}

func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse,
tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap,
optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {

// here we filter out indexes that are not stored in the common table
var virtualOnlyTables []string
Expand Down
Loading