Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 2844fa7

Browse files
committed
works
1 parent b97b980 commit 2844fa7

File tree

5 files changed

+68
-32
lines changed

5 files changed

+68
-32
lines changed

platform/database_common/log_manager.go

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package database_common
66
import (
77
"context"
88
"fmt"
9+
"github.com/QuesmaOrg/quesma/platform/common_table"
910
"github.com/QuesmaOrg/quesma/platform/config"
1011
"github.com/QuesmaOrg/quesma/platform/end_user_errors"
1112
"github.com/QuesmaOrg/quesma/platform/logger"
@@ -199,13 +200,25 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, schema schema.Reg
199200
return util.Distinct(results), nil
200201
}
201202

203+
// buildCountQueryString builds query string which will be sent to DB
204+
// Takes care whether `table` is "normal" or virtual
205+
func (lm *LogManager) buildCountQueryString(table *Table) string {
206+
if table.VirtualTable {
207+
// CAUTION: Using only table.Name (and discarding table.DatabaseName) on purpose
208+
// Can be changed if needed, but that'd complicate the usual case
209+
return fmt.Sprintf("SELECT count(*) FROM %s WHERE %s='%s'", common_table.TableName, common_table.IndexNameColumn, table.Name)
210+
} else {
211+
return fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName())
212+
}
213+
}
214+
202215
func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (count int64, err error) {
203216
if len(tables) == 0 {
204217
return
205218
}
206219
var subCountStatements []string
207220
for _, t := range tables {
208-
subCountStatements = append(subCountStatements, fmt.Sprintf("(SELECT count(*) FROM %s)", t.FullTableName()))
221+
subCountStatements = append(subCountStatements, fmt.Sprintf("(%s)", lm.buildCountQueryString(t)))
209222
}
210223
err = lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT sum(*) as count FROM (%s)", strings.Join(subCountStatements, " UNION ALL "))).Scan(&count)
211224
if err != nil {
@@ -216,7 +229,9 @@ func (lm *LogManager) CountMultiple(ctx context.Context, tables ...*Table) (coun
216229

217230
func (lm *LogManager) Count(ctx context.Context, table *Table) (int64, error) {
218231
var count int64
219-
err := lm.chDb.QueryRow(ctx, fmt.Sprintf("SELECT count(*) FROM %s", table.FullTableName())).Scan(&count)
232+
fmt.Printf("HEHE ExecuteCount\ntable: %+v\nquery: %s", table, lm.buildCountQueryString(table))
233+
234+
err := lm.chDb.QueryRow(ctx, lm.buildCountQueryString(table)).Scan(&count)
220235
if err != nil {
221236
return 0, fmt.Errorf("clickhouse: query row failed: %v", err)
222237
}
@@ -315,7 +330,7 @@ func (lm *LogManager) GetTableDefinitions() (TableMap, error) {
315330
return *lm.tableDiscovery.TableDefinitions(), nil
316331
}
317332

318-
// Returns if schema wasn't created (so it needs to be, and will be in a moment)
333+
// AddTableIfDoesntExist returns if schema wasn't created (so it needs to be, and will be in a moment)
319334
func (lm *LogManager) AddTableIfDoesntExist(table *Table) bool {
320335
t := lm.FindTable(table.Name)
321336
if t == nil {
@@ -331,6 +346,10 @@ func (lm *LogManager) Ping() error {
331346
return lm.chDb.Ping()
332347
}
333348

349+
func (lm *LogManager) IsInTransparentProxyMode() bool {
350+
return lm.cfg.TransparentProxy
351+
}
352+
334353
func NewEmptyLogManager(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent diag.PhoneHomeClient, loader TableDiscovery) *LogManager {
335354
ctx, cancel := context.WithCancel(context.Background())
336355
return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent}
@@ -423,7 +442,3 @@ func NewChTableConfigTimestampStringAttr() *ChTableConfig {
423442
func (c *ChTableConfig) GetAttributes() []Attribute {
424443
return c.Attributes
425444
}
426-
427-
func (l *LogManager) IsInTransparentProxyMode() bool {
428-
return l.cfg.TransparentProxy
429-
}

platform/database_common/quesma_communicator.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ func (lm *LogManager) explainQuery(ctx context.Context, query string, elapsed ti
118118

119119
defer rows.Close()
120120
if rows.Next() {
121-
err := rows.Scan(&explain)
121+
err = rows.Scan(&explain)
122122
if err != nil {
123123
logger.ErrorWithCtx(ctx).Msgf("failed to scan slow query explain: %v", err)
124124
return ""
@@ -151,7 +151,9 @@ func getQueryId(ctx context.Context) string {
151151
return fmt.Sprintf("%s-%d", prefix, queryCounter.Add(1))
152152
}
153153

154-
func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (res []model.QueryResultRow, performanceResult PerformanceResult, err error) {
154+
func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, fields []string, rowToScan []interface{}) (
155+
res []model.QueryResultRow, performanceResult PerformanceResult, err error) {
156+
155157
span := lm.phoneHomeAgent.ClickHouseQueryDuration().Begin()
156158

157159
queryAsString := query.SelectCommand.String()

platform/database_common/util.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,8 @@
33
package database_common
44

55
import (
6-
"bytes"
7-
"fmt"
86
"github.com/QuesmaOrg/quesma/platform/logger"
97
"github.com/QuesmaOrg/quesma/platform/model"
10-
"github.com/goccy/go-json"
118
"strings"
129
"time"
1310
)
@@ -104,14 +101,6 @@ func parseTypeFromShowColumns(typ, name string) (Type, string) {
104101
return parseTypeRec(typ, name)
105102
}
106103

107-
func PrettyJson(jsonStr string) string {
108-
var prettyJSON bytes.Buffer
109-
if err := json.Indent(&prettyJSON, []byte(jsonStr), "", " "); err != nil {
110-
return fmt.Sprintf("PrettyJson err: %v\n", err)
111-
}
112-
return prettyJSON.String()
113-
}
114-
115104
// TimestampGroupBy returns string to be used in the select part of Clickhouse query, when grouping by timestamp interval.
116105
// e.g.
117106
// - timestampGroupBy("@timestamp", DateTime64, 30 seconds) --> toInt64(toUnixTimestamp64Milli(`@timestamp`)/30000)

platform/frontend_connectors/count_test.go

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33
package frontend_connectors
44

55
import (
6+
"fmt"
67
"github.com/DATA-DOG/go-sqlmock"
78
"github.com/QuesmaOrg/quesma/platform/ab_testing"
89
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
10+
"github.com/QuesmaOrg/quesma/platform/common_table"
911
"github.com/QuesmaOrg/quesma/platform/database_common"
1012
"github.com/QuesmaOrg/quesma/platform/logger"
1113
"github.com/QuesmaOrg/quesma/platform/schema"
@@ -20,10 +22,12 @@ import (
2022
func TestCountEndpoint(t *testing.T) {
2123
staticRegistry := &schema.StaticRegistry{
2224
Tables: map[schema.IndexName]schema.Schema{
23-
"no_db_name": {Fields: map[schema.FieldName]schema.Field{}},
24-
"with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
25-
"common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
26-
"common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}},
25+
"no_db_name": {Fields: map[schema.FieldName]schema.Field{}},
26+
"with_db_name": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
27+
"common_prefix_1": {Fields: map[schema.FieldName]schema.Field{}, DatabaseName: "db_name"},
28+
"common_prefix_2": {Fields: map[schema.FieldName]schema.Field{}},
29+
"in_common_table_1": {Fields: map[schema.FieldName]schema.Field{}},
30+
"in_common_table_2": {Fields: map[schema.FieldName]schema.Field{}},
2731
},
2832
}
2933

@@ -40,6 +44,14 @@ func TestCountEndpoint(t *testing.T) {
4044
tables.Store("common_prefix_2", &database_common.Table{
4145
Name: "common_prefix_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{},
4246
})
47+
tables.Store("in_common_table_1", &database_common.Table{
48+
Name: "in_common_table_1", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{},
49+
VirtualTable: true,
50+
})
51+
tables.Store("in_common_table_2", &database_common.Table{
52+
Name: "in_common_table_2", Config: database_common.NewChTableConfigTimestampStringAttr(), Cols: map[string]*database_common.Column{}, DatabaseName: "db_name",
53+
VirtualTable: true,
54+
})
4355

4456
conn, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false)
4557
defer conn.Close()
@@ -63,8 +75,24 @@ func TestCountEndpoint(t *testing.T) {
6375
}{
6476
{"no_db_name", `SELECT count(*) FROM "no_db_name"`},
6577
{"with_db_name", `SELECT count(*) FROM "db_name"."with_db_name"`},
66-
{"common_prefix*", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
67-
{"common_prefix_1,common_prefix_2", `SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
78+
{"common_prefix*",
79+
`SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
80+
{"common_prefix_1,common_prefix_2",
81+
`SELECT sum(*) as count FROM ((SELECT count(*) FROM "db_name"."common_prefix_1") UNION ALL (SELECT count(*) FROM "common_prefix_2"))`},
82+
{"in_common_table_1",
83+
fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_1'`, common_table.TableName, common_table.IndexNameColumn)},
84+
{"in_common_table_2",
85+
fmt.Sprintf(`SELECT count(*) FROM %s WHERE %s='in_common_table_2'`, common_table.TableName, common_table.IndexNameColumn)},
86+
{"in_common_table*",
87+
fmt.Sprintf(`SELECT sum(*) as count
88+
FROM ((
89+
SELECT count(*)
90+
FROM %s
91+
WHERE %s='in_common_table_1') UNION ALL (
92+
SELECT count(*)
93+
FROM %s
94+
WHERE %s='in_common_table_2'))
95+
`, common_table.TableName, common_table.IndexNameColumn, common_table.TableName, common_table.IndexNameColumn)},
6896
}
6997

7098
for _, tc := range testcases {

platform/frontend_connectors/search_util.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import (
1515
quesma_api "github.com/QuesmaOrg/quesma/platform/v2/core"
1616
)
1717

18-
func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) (clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) {
18+
func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decision *quesma_api.Decision) (
19+
clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, err error) {
1920

2021
for _, connector := range decision.UseConnectors {
2122
switch c := connector.(type) {
@@ -41,7 +42,8 @@ func (q *QueryRunner) clickhouseConnectorFromDecision(ctx context.Context, decis
4142
return clickhouseConnector, nil
4243
}
4344

44-
func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) (respIfWeEndSearch []byte, err error, weEndSearch bool) {
45+
func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.Decision, optAsync *AsyncQuery) (
46+
respIfWeEndSearch []byte, err error, weEndSearch bool) {
4547

4648
if decision.Err != nil {
4749
if optAsync != nil {
@@ -74,8 +76,8 @@ func (q *QueryRunner) checkDecision(ctx context.Context, decision *quesma_api.De
7476
return []byte{}, nil, false
7577
}
7678

77-
func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse,
78-
tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
79+
func (q *QueryRunner) resolveIndexes(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap,
80+
optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
7981

8082
if clickhouseConnector.IsCommonTable {
8183
return q.resolveIndexesCommonTable(ctx, clickhouseConnector, tables, optAsync)
@@ -134,8 +136,8 @@ func (q *QueryRunner) resolveIndexesNonCommonTable(ctx context.Context, clickhou
134136
return
135137
}
136138

137-
func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse,
138-
tables database_common.TableMap, optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
139+
func (q *QueryRunner) resolveIndexesCommonTable(ctx context.Context, clickhouseConnector *quesma_api.ConnectorDecisionClickhouse, tables database_common.TableMap,
140+
optAsync *AsyncQuery) (resolvedIndexes []string, currentSchema schema.Schema, table *database_common.Table, respWhenError []byte, err error) {
139141

140142
// here we filter out indexes that are not stored in the common table
141143
var virtualOnlyTables []string

0 commit comments

Comments
 (0)