Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 0e2d4b8

Browse files
authored
V2 - Utility dependencies clean up, part 1 (#1107)
This is the first PR to clean utility dependencies. It contains: 1. Split `PhoneHomeAgent` interface into a client and statistic provider 2. Extract `DebugInfoCollector` interface. It is passed instead of the whole console management instance. More work is required in that area. The management console is quite bulky. 3. Add a `Diagnostic` interface that will provide components with telemetry/debug/... functionality. We should discuss component granularity here. 4. Introduce a dependency injection framework. Building Quesma infrastructure (pipelines, processors, etc) is a two-phase process: 1. initialize components and connect them 2. inject utilities @pdelewski please review this PR
1 parent cea1f7a commit 0e2d4b8

39 files changed

+658
-394
lines changed

quesma/clickhouse/clickhouse.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
"quesma/quesma/config"
1313
"quesma/quesma/recovery"
1414
"quesma/schema"
15-
"quesma/telemetry"
1615
"quesma/util"
16+
"quesma_v2/core/diag"
1717
"slices"
1818
"strings"
1919
"sync/atomic"
@@ -32,7 +32,7 @@ type (
3232
chDb *sql.DB
3333
tableDiscovery TableDiscovery
3434
cfg *config.QuesmaConfiguration
35-
phoneHomeAgent telemetry.PhoneHomeAgent
35+
phoneHomeAgent diag.PhoneHomeClient
3636
}
3737
TableMap = util.SyncMap[string, *Table]
3838
SchemaMap = map[string]interface{} // TODO remove
@@ -317,7 +317,7 @@ func (lm *LogManager) Ping() error {
317317
return lm.chDb.Ping()
318318
}
319319

320-
func NewEmptyLogManager(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader TableDiscovery) *LogManager {
320+
func NewEmptyLogManager(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent diag.PhoneHomeClient, loader TableDiscovery) *LogManager {
321321
ctx, cancel := context.WithCancel(context.Background())
322322
return &LogManager{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent}
323323
}
@@ -326,22 +326,22 @@ func NewLogManager(tables *TableMap, cfg *config.QuesmaConfiguration) *LogManage
326326
var tableDefinitions = atomic.Pointer[TableMap]{}
327327
tableDefinitions.Store(tables)
328328
return &LogManager{chDb: nil, tableDiscovery: NewTableDiscoveryWith(cfg, nil, *tables),
329-
cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(),
329+
cfg: cfg, phoneHomeAgent: diag.NewPhoneHomeEmptyAgent(),
330330
}
331331
}
332332

333333
// right now only for tests purposes
334334
func NewLogManagerWithConnection(db *sql.DB, tables *TableMap) *LogManager {
335335
return &LogManager{chDb: db, tableDiscovery: NewTableDiscoveryWith(&config.QuesmaConfiguration{}, db, *tables),
336-
phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()}
336+
phoneHomeAgent: diag.NewPhoneHomeEmptyAgent()}
337337
}
338338

339339
func NewLogManagerEmpty() *LogManager {
340340
var tableDefinitions = atomic.Pointer[TableMap]{}
341341
tableDefinitions.Store(NewTableMap())
342342
cfg := &config.QuesmaConfiguration{}
343343
return &LogManager{tableDiscovery: NewTableDiscovery(cfg, nil, persistence.NewStaticJSONDatabase()), cfg: cfg,
344-
phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent()}
344+
phoneHomeAgent: diag.NewPhoneHomeEmptyAgent()}
345345
}
346346

347347
func NewDefaultCHConfig() *ChTableConfig {

quesma/frontend_connectors/router_v2.go

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@ import (
2222
"quesma/quesma/types"
2323
"quesma/quesma/ui"
2424
"quesma/schema"
25-
"quesma/telemetry"
2625
"quesma/util"
2726
quesma_api "quesma_v2/core"
27+
"quesma_v2/core/diag"
2828
"quesma_v2/core/routes"
2929
"quesma_v2/core/tracing"
3030
"strings"
@@ -72,15 +72,20 @@ func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseW
7272
}
7373

7474
type RouterV2 struct {
75-
Config *config.QuesmaConfiguration
76-
RequestPreprocessors quesma_api.ProcessorChain
77-
QuesmaManagementConsole *ui.QuesmaManagementConsole
78-
PhoneHomeAgent telemetry.PhoneHomeAgent
79-
HttpClient *http.Client
80-
FailedRequests atomic.Int64
75+
Config *config.QuesmaConfiguration
76+
RequestPreprocessors quesma_api.ProcessorChain
77+
78+
HttpClient *http.Client
79+
FailedRequests atomic.Int64
80+
81+
diagnostic diag.Diagnostic
8182
}
8283

83-
func NewRouterV2(config *config.QuesmaConfiguration, qmc *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent) *RouterV2 {
84+
func (r *RouterV2) InjectDiagnostic(s diag.Diagnostic) {
85+
r.diagnostic = s
86+
}
87+
88+
func NewRouterV2(config *config.QuesmaConfiguration) *RouterV2 {
8489
tr := &http.Transport{
8590
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
8691
}
@@ -91,11 +96,9 @@ func NewRouterV2(config *config.QuesmaConfiguration, qmc *ui.QuesmaManagementCon
9196
requestProcessors := quesma_api.ProcessorChain{}
9297
requestProcessors = append(requestProcessors, quesma_api.NewTraceIdPreprocessor())
9398
return &RouterV2{
94-
Config: config,
95-
RequestPreprocessors: requestProcessors,
96-
QuesmaManagementConsole: qmc,
97-
PhoneHomeAgent: agent,
98-
HttpClient: client,
99+
Config: config,
100+
RequestPreprocessors: requestProcessors,
101+
HttpClient: client,
99102
}
100103
}
101104

@@ -258,7 +261,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
258261
}
259262
dispatcher := &quesma_api.Dispatcher{}
260263
if handlersPipe != nil {
261-
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.QuesmaManagementConsole, func() (*quesma_api.Result, error) {
264+
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.diagnostic.DebugInfoCollector(), func() (*quesma_api.Result, error) {
262265
var result *quesma_api.Result
263266
result, err = handlersPipe.Handler(ctx, quesmaRequest)
264267

@@ -338,22 +341,24 @@ func (r *RouterV2) sendHttpRequestToElastic(ctx context.Context, req *http.Reque
338341
}
339342

340343
go func() {
341-
elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.QuesmaManagementConsole, func() elasticResultV2 {
344+
elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.diagnostic.DebugInfoCollector(), func() elasticResultV2 {
342345

343346
isWrite := elasticsearch.IsWriteRequest(req)
344347

345-
var span telemetry.Span
348+
phoneHome := r.diagnostic.PhoneHomeAgent()
349+
350+
var span diag.Span
346351
if isManagement {
347352
if isWrite {
348-
span = r.PhoneHomeAgent.ElasticBypassedWriteRequestsDuration().Begin()
353+
span = phoneHome.ElasticBypassedWriteRequestsDuration().Begin()
349354
} else {
350-
span = r.PhoneHomeAgent.ElasticBypassedReadRequestsDuration().Begin()
355+
span = phoneHome.ElasticBypassedReadRequestsDuration().Begin()
351356
}
352357
} else {
353358
if isWrite {
354-
span = r.PhoneHomeAgent.ElasticWriteRequestsDuration().Begin()
359+
span = phoneHome.ElasticWriteRequestsDuration().Begin()
355360
} else {
356-
span = r.PhoneHomeAgent.ElasticReadRequestsDuration().Begin()
361+
span = phoneHome.ElasticReadRequestsDuration().Begin()
357362
}
358363
}
359364

@@ -373,7 +378,7 @@ func isIngestV2(path string) bool {
373378
return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create`
374379
}
375380

376-
func recordRequestToClickhouseV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
381+
func recordRequestToClickhouseV2(path string, qmc diag.DebugInfoCollector, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
377382
statName := ui.RequestStatisticKibana2Clickhouse
378383
if isIngestV2(path) {
379384
statName = ui.RequestStatisticIngest2Clickhouse
@@ -384,7 +389,7 @@ func recordRequestToClickhouseV2(path string, qmc *ui.QuesmaManagementConsole, r
384389
return response, err
385390
}
386391

387-
func recordRequestToElasticV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() elasticResultV2) elasticResultV2 {
392+
func recordRequestToElasticV2(path string, qmc diag.DebugInfoCollector, requestFunc func() elasticResultV2) elasticResultV2 {
388393
statName := ui.RequestStatisticKibana2Elasticsearch
389394
if isIngestV2(path) {
390395
statName = ui.RequestStatisticIngest2Elasticsearch

quesma/ingest/processor.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"quesma/telemetry"
2626
"quesma/util"
2727
"quesma_v2/core"
28+
"quesma_v2/core/diag"
2829
"slices"
2930
"sort"
3031
"strings"
@@ -61,7 +62,7 @@ type (
6162
chDb *sql.DB
6263
tableDiscovery chLib.TableDiscovery
6364
cfg *config.QuesmaConfiguration
64-
phoneHomeAgent telemetry.PhoneHomeAgent
65+
phoneHomeAgent diag.PhoneHomeClient
6566
schemaRegistry schema.Registry
6667
ingestCounter int64
6768
ingestFieldStatistics IngestFieldStatistics

quesma/ingest/processor_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import (
99
"quesma/quesma/config"
1010
"quesma/quesma/types"
1111
"quesma/schema"
12-
"quesma/telemetry"
1312
"quesma/util"
13+
"quesma_v2/core/diag"
1414
"strings"
1515
"sync/atomic"
1616
"testing"
@@ -22,7 +22,7 @@ func newIngestProcessorWithEmptyTableMap(tables *TableMap, cfg *config.QuesmaCon
2222
var tableDefinitions = atomic.Pointer[TableMap]{}
2323
tableDefinitions.Store(tables)
2424
return &IngestProcessor{chDb: nil, tableDiscovery: clickhouse.NewTableDiscoveryWith(cfg, nil, *tables),
25-
cfg: cfg, phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(),
25+
cfg: cfg, phoneHomeAgent: diag.NewPhoneHomeEmptyAgent(),
2626
ingestFieldStatistics: make(IngestFieldStatistics),
2727
virtualTableStorage: persistence.NewStaticJSONDatabase(),
2828
}
@@ -33,7 +33,7 @@ func newIngestProcessorEmpty() *IngestProcessor {
3333
tableDefinitions.Store(NewTableMap())
3434
cfg := &config.QuesmaConfiguration{}
3535
return &IngestProcessor{tableDiscovery: clickhouse.NewTableDiscovery(cfg, nil, persistence.NewStaticJSONDatabase()), cfg: cfg,
36-
phoneHomeAgent: telemetry.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)}
36+
phoneHomeAgent: diag.NewPhoneHomeEmptyAgent(), ingestFieldStatistics: make(IngestFieldStatistics)}
3737
}
3838

3939
var hasOthersConfig = &clickhouse.ChTableConfig{

quesma/main.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ const EnableConcurrencyProfiling = false
5151
// buildIngestOnlyQuesma is for now a helper function to help establishing the way of v2 module api import
5252
func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder {
5353
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
54+
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
55+
5456
ingestFrontendConnector := frontend_connectors.NewElasticsearchIngestFrontendConnector(":8080")
5557

5658
var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
@@ -206,7 +208,7 @@ func main() {
206208

207209
func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *quesma.Quesma {
208210
if cfg.TransparentProxy {
209-
return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false)
211+
return quesma.NewQuesmaTcpProxy(cfg, quesmaManagementConsole, logChan, false)
210212
} else {
211213
const quesma_v2 = false
212214
return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry, quesma_v2)

quesma/main_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ func Test_backendConnectorValidation(t *testing.T) {
4141
var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline()
4242
postgressPipeline.AddProcessor(tcpProcessor)
4343
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
44+
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
4445
const endpoint = "root:password@tcp(127.0.0.1:3306)/test"
4546
var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{
4647
Endpoint: endpoint,
@@ -62,6 +63,7 @@ func fallback(_ context.Context, _ *quesma_api.Request) (*quesma_api.Result, err
6263

6364
func ab_testing_scenario() quesma_api.QuesmaBuilder {
6465
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
66+
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
6567

6668
ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888")
6769
ingestHTTPRouter := quesma_api.NewPathRouter()
@@ -105,6 +107,7 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder {
105107

106108
func fallbackScenario() quesma_api.QuesmaBuilder {
107109
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
110+
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
108111

109112
ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888")
110113
ingestHTTPRouter := quesma_api.NewPathRouter()

quesma/queryparser/query_parser_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@ import (
1212
"quesma/quesma/config"
1313
"quesma/quesma/types"
1414
"quesma/schema"
15-
"quesma/telemetry"
1615
"quesma/testdata"
1716
"quesma/util"
17+
"quesma_v2/core/diag"
1818
"strconv"
1919
"strings"
2020
"testing"
@@ -40,7 +40,7 @@ func TestQueryParserStringAttrConfig(t *testing.T) {
4040

4141
cfg.IndexConfig["logs-generic-default"] = config.IndexConfiguration{}
4242

43-
lm := clickhouse.NewEmptyLogManager(&cfg, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(&config.QuesmaConfiguration{}, nil, persistence.NewStaticJSONDatabase()))
43+
lm := clickhouse.NewEmptyLogManager(&cfg, nil, diag.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(&config.QuesmaConfiguration{}, nil, persistence.NewStaticJSONDatabase()))
4444
lm.AddTableIfDoesntExist(table)
4545
s := schema.StaticRegistry{
4646
Tables: map[schema.IndexName]schema.Schema{
@@ -99,7 +99,7 @@ func TestQueryParserNoFullTextFields(t *testing.T) {
9999
},
100100
Created: true,
101101
}
102-
lm := clickhouse.NewEmptyLogManager(&config.QuesmaConfiguration{}, nil, telemetry.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(&config.QuesmaConfiguration{}, nil, persistence.NewStaticJSONDatabase()))
102+
lm := clickhouse.NewEmptyLogManager(&config.QuesmaConfiguration{}, nil, diag.NewPhoneHomeEmptyAgent(), clickhouse.NewTableDiscovery(&config.QuesmaConfiguration{}, nil, persistence.NewStaticJSONDatabase()))
103103
lm.AddTableIfDoesntExist(&table)
104104
cfg := config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{}}
105105

quesma/quesma/dual_write_proxy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"quesma/telemetry"
3131
"quesma/util"
3232
quesma_api "quesma_v2/core"
33+
"quesma_v2/core/diag"
3334
"quesma_v2/core/routes"
3435
tracing "quesma_v2/core/tracing"
3536
"strconv"
@@ -447,7 +448,7 @@ func (r *router) sendHttpRequestToElastic(ctx context.Context, req *http.Request
447448

448449
isWrite := elasticsearch.IsWriteRequest(req)
449450

450-
var span telemetry.Span
451+
var span diag.Span
451452
if isManagement {
452453
if isWrite {
453454
span = r.phoneHomeAgent.ElasticBypassedWriteRequestsDuration().Begin()

quesma/quesma/dual_write_proxy_v2.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,8 @@ import (
1515
"quesma/queryparser"
1616
"quesma/quesma/async_search_storage"
1717
"quesma/quesma/config"
18-
"quesma/quesma/ui"
1918
"quesma/schema"
2019
"quesma/table_resolver"
21-
"quesma/telemetry"
2220
"quesma/util"
2321
quesma_api "quesma_v2/core"
2422
"strconv"
@@ -70,8 +68,9 @@ func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
7068
q.Close(ctx)
7169
}
7270

73-
func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
74-
queryProcessor := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver, schemaLoader)
71+
func newDualWriteProxyV2(dependencies *quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
72+
73+
queryProcessor := NewQueryRunner(logManager, config, indexManager, dependencies.Diagnostic.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader)
7574

7675
// not sure how we should configure our query translator ???
7776
// is this a config option??
@@ -81,22 +80,23 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
8180
// tests should not be run with optimization enabled by default
8281
queryProcessor.EnableQueryOptimization(config)
8382

84-
routerInstance := frontend_connectors.NewRouterV2(config,
85-
quesmaManagementConsole, agent)
83+
routerInstance := frontend_connectors.NewRouterV2(config)
84+
85+
dependencies.Diagnostic.PhoneHomeAgent().FailedRequestsCollector(func() int64 {
8686

87-
agent.FailedRequestsCollector(func() int64 {
8887
return routerInstance.FailedRequests.Load()
8988
})
9089

91-
ingestRouter := ConfigureIngestRouterV2(config, ingestProcessor, agent, resolver)
92-
searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryProcessor, resolver)
90+
ingestRouter := ConfigureIngestRouterV2(config, dependencies, ingestProcessor, resolver)
91+
searchRouter := ConfigureSearchRouterV2(config, dependencies, registry, logManager, queryProcessor, resolver)
9392

9493
elasticHttpIngestFrontendConnector := NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
95-
routerInstance, logManager, registry, agent)
94+
95+
routerInstance, logManager, registry)
9696
elasticHttpIngestFrontendConnector.AddRouter(ingestRouter)
9797

9898
elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
99-
routerInstance, logManager, registry, agent)
99+
routerInstance, logManager, registry)
100100
elasticHttpQueryFrontendConnector.AddRouter(searchRouter)
101101

102102
quesmaBuilder := quesma_api.NewQuesma()
@@ -107,6 +107,7 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
107107
queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector)
108108
quesmaBuilder.AddPipeline(ingestPipeline)
109109
quesmaBuilder.AddPipeline(queryPipeline)
110+
quesmaBuilder.SetDependencies(dependencies)
110111
_, err := quesmaBuilder.Build()
111112
if err != nil {
112113
logger.Fatal().Msgf("Error building Quesma: %v", err)

0 commit comments

Comments
 (0)