Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit d061b72

Browse files
authored
Transparent proxy fixes (#826)
My trial run found several DX issues and addressed them - we inspect packages in transparent proxy and write weird errors when JSON or NJSON is here - disabled for now - we show ClickHouse DB error - don't do try on moc error - in fact remove `mock-for-transparent-proxy` as this is confusing - A/B logs are there even if there is no test - do detection on that Along the way added constant and removed some dead code.
1 parent 9ffb7cd commit d061b72

File tree

8 files changed

+47
-139
lines changed

8 files changed

+47
-139
lines changed

quesma/ab_testing/sender/coordinator.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"quesma/logger"
1010
"quesma/quesma/config"
1111
"quesma/quesma/recovery"
12+
"strings"
1213
"time"
1314
)
1415

@@ -26,11 +27,23 @@ func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator {
2627

2728
ctx, cancel := context.WithCancel(context.Background())
2829

30+
var enabledForIndex []string
31+
for indexName, indexConfig := range cfg.IndexConfig {
32+
_, disabledAb := indexConfig.GetOptimizerConfiguration(config.ElasticABOptimizerName)
33+
if !disabledAb {
34+
enabledForIndex = append(enabledForIndex, indexName)
35+
}
36+
}
37+
38+
if len(enabledForIndex) > 0 {
39+
logger.Info().Msgf("A/B Testing is enabled for indexes: %s", strings.Join(enabledForIndex, ","))
40+
}
41+
2942
return &SenderCoordinator{
3043
sender: newSender(ctx),
3144
ctx: ctx,
3245
cancelFunc: cancel,
33-
enabled: true, // TODO this should be read from config
46+
enabled: len(enabledForIndex) > 0,
3447
// add quesma health monitor service here
3548
}
3649
}

quesma/clickhouse/connection.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql
5252
}
5353

5454
func InitDBConnectionPool(c *config.QuesmaConfiguration) *sql.DB {
55+
if c.ClickHouse.Url == nil {
56+
return nil
57+
}
5558

5659
db := initDBConnection(c, &tls.Config{})
5760

quesma/connectors/connector.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,5 +64,13 @@ func registerConnectors(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHome
6464
logger.Error().Msgf("Unknown connector type [%s]", conn.ConnectorType)
6565
}
6666
}
67+
68+
// Mock connector for transparent proxy, perhaps improve at some point
69+
if len(cfg.Connectors) == 0 && cfg.TransparentProxy {
70+
conns = append(conns, &ClickHouseOSConnector{
71+
Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
72+
})
73+
}
74+
6775
return conns
6876
}

quesma/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ func main() {
131131

132132
func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender) *quesma.Quesma {
133133
if cfg.TransparentProxy {
134-
return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, true)
134+
return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false)
135135
} else {
136136
return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, logChan, abResultsrepository)
137137
}

quesma/quesma/config/config.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,11 @@ func (c *QuesmaConfiguration) Validate() error {
150150
}
151151
connectorCount := len(c.Connectors)
152152
if connectorCount != 1 {
153-
result = multierror.Append(result, fmt.Errorf("%d connectors configured - at this moment Quesma requires **exactly** one connector specified", connectorCount))
153+
if !(connectorCount == 0 && c.TransparentProxy) { // no connectors for transparent proxy is fine
154+
result = multierror.Append(result, fmt.Errorf("%d connectors configured - at this moment Quesma requires **exactly** one connector specified", connectorCount))
155+
}
154156
}
155-
if c.ClickHouse.Url == nil && c.Hydrolix.Url == nil {
157+
if c.ClickHouse.Url == nil && c.Hydrolix.Url == nil && !c.TransparentProxy {
156158
result = multierror.Append(result, fmt.Errorf("clickHouse or hydrolix URL is required"))
157159
}
158160
if c.ClickHouse.IsNonEmpty() && c.Hydrolix.IsNonEmpty() {

quesma/quesma/config/config_v2.go

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ const (
2525
ClickHouseOSBackendConnectorName = "clickhouse-os"
2626
ClickHouseBackendConnectorName = "clickhouse"
2727
HydrolixBackendConnectorName = "hydrolix"
28+
29+
ElasticABOptimizerName = "elastic_ab_testing"
2830
)
2931

3032
type ProcessorType string
@@ -522,7 +524,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
522524
if len(indexConfig.QueryTarget) == 2 {
523525
// Turn on A/B testing
524526
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
525-
processedConfig.Optimizers["elastic_ab_testing"] = OptimizerConfiguration{
527+
processedConfig.Optimizers[ElasticABOptimizerName] = OptimizerConfiguration{
526528
Disabled: false,
527529
Properties: map[string]string{},
528530
}
@@ -584,7 +586,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
584586
if len(indexConfig.QueryTarget) == 2 {
585587
// Turn on A/B testing
586588
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
587-
processedConfig.Optimizers["elastic_ab_testing"] = OptimizerConfiguration{
589+
processedConfig.Optimizers[ElasticABOptimizerName] = OptimizerConfiguration{
588590
Disabled: false,
589591
Properties: map[string]string{},
590592
}
@@ -630,25 +632,18 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
630632

631633
END:
632634

633-
if relationalDBErr != nil && !conf.TransparentProxy {
634-
errAcc = multierror.Append(errAcc, relationalDBErr)
635-
} else if relationalDBErr != nil && conf.TransparentProxy {
636-
relDBConn := RelationalDbConfiguration{
637-
ConnectorType: ClickHouseOSBackendConnectorName,
638-
Url: &Url{
639-
Host: "localhost",
640-
},
641-
}
642-
conf.Connectors["mock-for-transparent-proxy"] = relDBConn
643-
conf.ClickHouse = relDBConn
644-
} else {
645-
relDBConn.ConnectorType = connType
646-
if connType == HydrolixBackendConnectorName {
647-
conf.Connectors["injected-hydrolix-connector"] = *relDBConn
648-
conf.Hydrolix = *relDBConn
635+
if !conf.TransparentProxy {
636+
if relationalDBErr != nil {
637+
errAcc = multierror.Append(errAcc, relationalDBErr)
649638
} else {
650-
conf.Connectors["injected-clickhouse-connector"] = *relDBConn
651-
conf.ClickHouse = *relDBConn
639+
relDBConn.ConnectorType = connType
640+
if connType == HydrolixBackendConnectorName {
641+
conf.Connectors["injected-hydrolix-connector"] = *relDBConn
642+
conf.Hydrolix = *relDBConn
643+
} else {
644+
conf.Connectors["injected-clickhouse-connector"] = *relDBConn
645+
conf.ClickHouse = *relDBConn
646+
}
652647
}
653648
}
654649

quesma/quesma/search.go

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -875,17 +875,3 @@ func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, Path string
875875
QueryTranslatedResults: QueryTranslatedResults,
876876
SecondaryTook: time.Since(startTime)})
877877
}
878-
879-
func pushAlternativeInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, OpaqueId, Path string, IncomingQueryBody []byte, QueryBodyTranslated []types.TranslatedSQLQuery, QueryTranslatedResults []byte, startTime time.Time) {
880-
qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{
881-
Id: Id,
882-
AsyncId: AsyncId,
883-
OpaqueId: OpaqueId,
884-
Path: Path,
885-
IncomingQueryBody: IncomingQueryBody,
886-
QueryBodyTranslated: QueryBodyTranslated,
887-
QueryTranslatedResults: QueryTranslatedResults,
888-
SecondaryTook: time.Since(startTime),
889-
IsAlternativePlan: true})
890-
891-
}

quesma/quesma/search_alternative.go

Lines changed: 2 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,14 @@ package quesma
55
import (
66
"bytes"
77
"context"
8-
"encoding/json"
98
"fmt"
109
"io"
1110
"net/http"
1211
"quesma/ab_testing"
1312
"quesma/clickhouse"
1413
"quesma/logger"
1514
"quesma/model"
16-
"quesma/queryparser"
15+
"quesma/quesma/config"
1716
"quesma/quesma/recovery"
1817
"quesma/quesma/types"
1918
"quesma/tracing"
@@ -124,13 +123,8 @@ func (q *QueryRunner) maybeCreateAlternativeExecutionPlan(ctx context.Context, i
124123

125124
resolvedTableName := indexes[0]
126125

127-
props, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration(queryparser.PancakeOptimizerName)
128-
if !disabled && props["mode"] == "alternative" {
129-
return q.maybeCreatePancakeExecutionPlan(ctx, resolvedTableName, plan, queryTranslator, body, table, isAsync)
130-
}
131-
132126
// TODO is should be enabled in a different way. it's not an optimizer
133-
cfg, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration("elastic_ab_testing")
127+
cfg, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration(config.ElasticABOptimizerName)
134128
if !disabled {
135129
return q.askElasticAsAnAlternative(ctx, resolvedTableName, plan, queryTranslator, body, table, isAsync, cfg)
136130
}
@@ -220,96 +214,3 @@ func (q *QueryRunner) askElasticAsAnAlternative(ctx context.Context, resolvedTab
220214
return responseBody, nil
221215
}
222216
}
223-
224-
func (q *QueryRunner) maybeCreatePancakeExecutionPlan(ctx context.Context, resolvedTableName string, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, body types.JSON, table *clickhouse.Table, isAsync bool) (*model.ExecutionPlan, executionPlanExecutor) {
225-
226-
hasAggQuery := false
227-
queriesWithoutAggr := make([]*model.Query, 0)
228-
for _, query := range plan.Queries {
229-
switch query.Type.AggregationType() {
230-
case model.MetricsAggregation, model.BucketAggregation, model.PipelineMetricsAggregation, model.PipelineBucketAggregation:
231-
hasAggQuery = true
232-
default:
233-
queriesWithoutAggr = append(queriesWithoutAggr, query)
234-
}
235-
}
236-
237-
if !hasAggQuery {
238-
return nil, nil
239-
}
240-
241-
if chQueryTranslator, ok := queryTranslator.(*queryparser.ClickhouseQueryTranslator); ok {
242-
243-
// TODO FIXME check if the original plan has count query
244-
addCount := false
245-
246-
if pancakeQueries, err := chQueryTranslator.PancakeParseAggregationJson(body, addCount); err == nil {
247-
logger.InfoWithCtx(ctx).Msgf("Running alternative pancake queries")
248-
queries := append(queriesWithoutAggr, pancakeQueries...)
249-
alternativePlan := &model.ExecutionPlan{
250-
IndexPattern: plan.IndexPattern,
251-
QueryRowsTransformers: make([]model.QueryRowsTransformer, len(queries)),
252-
Queries: queries,
253-
StartTime: plan.StartTime,
254-
Name: "pancake",
255-
}
256-
257-
return alternativePlan, func(ctx context.Context) ([]byte, error) {
258-
259-
return q.executeAlternativePlan(ctx, plan, queryTranslator, table, body, false)
260-
}
261-
262-
} else {
263-
// TODO: change to info
264-
logger.ErrorWithCtx(ctx).Msgf("Error parsing pancake queries: %v", err)
265-
}
266-
} else {
267-
logger.ErrorWithCtx(ctx).Msgf("Alternative plan is not supported for non-clickhouse query translators")
268-
}
269-
return nil, nil
270-
}
271-
272-
func (q *QueryRunner) executeAlternativePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, isAsync bool) (responseBody []byte, err error) {
273-
274-
doneCh := make(chan AsyncSearchWithError, 1)
275-
276-
err = q.transformQueries(ctx, plan)
277-
if err != nil {
278-
return responseBody, err
279-
}
280-
281-
if resp, err := q.checkProperties(ctx, plan, table, queryTranslator); err != nil {
282-
return resp, err
283-
}
284-
285-
q.runExecutePlanAsync(ctx, plan, queryTranslator, table, doneCh, nil)
286-
287-
response := <-doneCh
288-
289-
if response.err == nil {
290-
if isAsync {
291-
asyncResponse := queryparser.SearchToAsyncSearchResponse(response.response, "__quesma_alternative_plan", false, 200)
292-
responseBody, err = asyncResponse.Marshal()
293-
if err != nil {
294-
return nil, err
295-
}
296-
} else {
297-
responseBody, err = response.response.Marshal()
298-
if err != nil {
299-
return nil, err
300-
}
301-
}
302-
} else {
303-
// TODO better error handling
304-
m := make(map[string]interface{})
305-
m["error"] = fmt.Sprintf("%v", response.err.Error())
306-
responseBody, _ = json.MarshalIndent(m, "", " ")
307-
}
308-
309-
bodyAsBytes, _ := body.Bytes()
310-
contextValues := tracing.ExtractValues(ctx)
311-
pushAlternativeInfo(q.quesmaManagementConsole, contextValues.RequestId, "", contextValues.OpaqueId, contextValues.RequestPath, bodyAsBytes, response.translatedQueryBody, responseBody, plan.StartTime)
312-
313-
return responseBody, response.err
314-
315-
}

0 commit comments

Comments
 (0)