Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion quesma/ab_testing/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ func NewCollector(ctx context.Context, ingester ingest.Ingester, healthQueue cha
&diffTransformer{},
//&ppPrintFanout{},
//&mismatchedOnlyFilter{},
&redactOkResults{},
//&elasticSearchFanout{
// url: "http://localhost:8080",
// indexName: "ab_testing_logs",
//},
&internalIngestFanout{
indexName: "ab_testing_logs",
indexName: ab_testing.ABTestingTableName,
ingestProcessor: ingester,
},
},
Expand Down
23 changes: 16 additions & 7 deletions quesma/ab_testing/collector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,31 @@ func (t *diffTransformer) process(in EnrichedResults) (out EnrichedResults, drop
}

if len(mismatches) > 0 {
b, err := json.MarshalIndent(mismatches, "", " ")

if err != nil {
return in, false, fmt.Errorf("failed to marshal mismatches: %w", err)
}

in.Mismatch.Mismatches = string(b)
in.Mismatch.IsOK = false
in.Mismatch.Count = len(mismatches)
in.Mismatch.Message = mismatches.String()

topMismatchType, _ := t.mostCommonMismatchType(mismatches)
if topMismatchType != "" {
in.Mismatch.TopMismatchType = topMismatchType
}

// if there are too many mismatches, we only show the first 20
// this is to avoid overwhelming the user with too much information
const mismatchesSize = 20

if len(mismatches) > mismatchesSize {
mismatches = mismatches[:mismatchesSize]
}

b, err := json.MarshalIndent(mismatches, "", " ")

if err != nil {
return in, false, fmt.Errorf("failed to marshal mismatches: %w", err)
}
in.Mismatch.Mismatches = string(b)
in.Mismatch.Message = mismatches.String()

} else {
in.Mismatch.Mismatches = "[]"
in.Mismatch.IsOK = true
Expand Down
24 changes: 24 additions & 0 deletions quesma/ab_testing/collector/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,27 @@ func (t *mismatchedOnlyFilter) process(in EnrichedResults) (out EnrichedResults,

// avoid unused struct error
var _ = &mismatchedOnlyFilter{}

type redactOkResults struct {
}

func (t *redactOkResults) name() string {
return "redactOkResults"
}

func (t *redactOkResults) process(in EnrichedResults) (out EnrichedResults, drop bool, err error) {

// we're not interested in the details of the request and responses if the mismatch is OK

redactMsg := "***REDACTED***"
if in.Mismatch.IsOK {
in.Request.Body = redactMsg
in.A.Body = redactMsg
in.B.Body = redactMsg
in.Mismatch.Message = "OK"
}

return in, false, nil
}

var _ = &redactOkResults{}
2 changes: 2 additions & 0 deletions quesma/ab_testing/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Elastic-2.0
package ab_testing

const ABTestingTableName = "ab_testing_logs"

type Request struct {
Path string `json:"path"`
IndexName string `json:"index_name"`
Expand Down
2 changes: 1 addition & 1 deletion quesma/jsondiff/elastic_response_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"

// NewElasticResponseJSONDiff creates a JSONDiff instance that is tailored to compare Elasticsearch response JSONs.
func NewElasticResponseJSONDiff() (*JSONDiff, error) {
d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$")
d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$", ".*__quesma_total_count", ".*\\._id", "^_shards.*", ".*\\._score", ".*\\._source", ".*\\.__quesma_originalKey")

if err != nil {
return nil, fmt.Errorf("could not create JSONDiff: %v", err)
Expand Down
41 changes: 37 additions & 4 deletions quesma/jsondiff/jsondiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func newType(code, message string) mismatchType {
var (
invalidType = newType("invalid_type", "Types are not equal")
invalidValue = newType("invalid_value", "Values are not equal")
invalidNumberValue = newType("invalid_number_value", "Numbers are not equal")
invalidDateValue = newType("invalid_date_value", "Dates are not equal")
invalidArrayLength = newType("invalid_array_length", "Array lengths are not equal")
invalidArrayLengthOffByOne = newType("invalid_array_length_off_by_one", "Array lengths are off by one.")
objectDifference = newType("object_difference", "Objects are different")
Expand Down Expand Up @@ -355,6 +357,8 @@ func (d *JSONDiff) asType(a any) string {
return fmt.Sprintf("%T", a)
}

var dateRx = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}`)

func (d *JSONDiff) compare(expected any, actual any) {

if d.isIgnoredPath() {
Expand Down Expand Up @@ -399,9 +403,9 @@ func (d *JSONDiff) compare(expected any, actual any) {
case float64:

// float operations are noisy, we need to compare them with desired precision

epsilon := 1e-9
relativeTolerance := 1e-9
// this is lousy, but it works for now
epsilon := 1e-3
relativeTolerance := 1e-3
aFloat := expected.(float64)
bFloat := actual.(float64)

Expand All @@ -411,8 +415,37 @@ func (d *JSONDiff) compare(expected any, actual any) {
relativeDiff := absDiff / math.Max(math.Abs(aFloat), math.Abs(bFloat))

if relativeDiff > relativeTolerance {
d.addMismatch(invalidValue, d.asValue(expected), d.asValue(actual))
d.addMismatch(invalidNumberValue, d.asValue(expected), d.asValue(actual))
}
}

default:
d.addMismatch(invalidType, d.asType(expected), d.asType(actual))
}

case string:

switch actualString := actual.(type) {
case string:

if dateRx.MatchString(aVal) && dateRx.MatchString(actualString) {

// TODO add better date comparison here
// parse both date and compare them with desired precision

// elastics returns date in formats
// "2024-10-24T00:00:00.000+02:00"
// "2024-10-24T00:00:00.000Z"

// quesma returns
// 2024-10-23T22:00:00.000
compareOnly := "2000-01-"

if aVal[:len(compareOnly)] != actualString[:len(compareOnly)] {
d.addMismatch(invalidDateValue, d.asValue(expected), d.asValue(actual))
}

return
}

default:
Expand Down
12 changes: 6 additions & 6 deletions quesma/jsondiff/jsondiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestJSONDiff(t *testing.T) {
name: "Test 2",
expected: `{"a": 1, "b": 2, "c": 3}`,
actual: `{"a": 1, "b": 3, "c": 3}`,
problems: []JSONMismatch{mismatch("b", invalidValue)},
problems: []JSONMismatch{mismatch("b", invalidNumberValue)},
},

{
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestJSONDiff(t *testing.T) {
name: "array element difference",
expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`,
actual: `{"a": [1, 2, 4], "b": 2, "c": 3}`,
problems: []JSONMismatch{mismatch("a.[2]", invalidValue)},
problems: []JSONMismatch{mismatch("a.[2]", invalidNumberValue)},
},

{
Expand All @@ -81,28 +81,28 @@ func TestJSONDiff(t *testing.T) {
name: "object difference",
expected: `{"a": {"b": 1}, "c": 3}`,
actual: `{"a": {"b": 2}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.b", invalidNumberValue)},
},

{
name: "deep path difference",
expected: `{"a": {"d": {"b": 1}}, "c": 3}`,
actual: `{"a": {"d": {"b": 2}}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.d.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)},
},

{
name: "deep path difference",
expected: `{"a": {"d": {"b": 1}}, "c": 3, "_ignore": 1}`,
actual: `{"a": {"d": {"b": 2}}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.d.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)},
},

{
name: "array sort difference ",
expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`,
actual: `{"a": [1, 3, 2], "b": 2, "c": 3}`,
problems: []JSONMismatch{mismatch("a.[1]", invalidValue), mismatch("a.[2]", invalidValue)},
problems: []JSONMismatch{mismatch("a.[1]", invalidNumberValue), mismatch("a.[2]", invalidNumberValue)},
},

{
Expand Down
11 changes: 7 additions & 4 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,9 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
continue
}

Expand Down Expand Up @@ -676,10 +677,12 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
continue
}

if len(processedConfig.QueryTarget) == 2 {
// Turn on A/B testing
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
Expand Down
33 changes: 12 additions & 21 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.Execu
}()
}

func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult) (responseBody []byte, err error) {
func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult, abTestingMainPlan bool) (responseBody []byte, err error) {
contextValues := tracing.ExtractValues(ctx)
id := contextValues.RequestId
path := contextValues.RequestPath
Expand All @@ -214,7 +214,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
sendMainPlanResult := func(responseBody []byte, err error) {
if optComparePlansCh != nil {
optComparePlansCh <- executionPlanResult{
isMain: true,
isMain: abTestingMainPlan,
plan: plan,
err: err,
responseBody: responseBody,
Expand Down Expand Up @@ -300,31 +300,27 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
return nil, end_user_errors.ErrSearchCondition.New(fmt.Errorf("no connectors to use"))
}

var clickhouseDecision *table_resolver.ConnectorDecisionClickhouse
var elasticDecision *table_resolver.ConnectorDecisionElastic
var clickhouseConnector *table_resolver.ConnectorDecisionClickhouse

for _, connector := range decision.UseConnectors {
switch c := connector.(type) {

case *table_resolver.ConnectorDecisionClickhouse:
clickhouseDecision = c
clickhouseConnector = c

case *table_resolver.ConnectorDecisionElastic:
elasticDecision = c
// NOP

default:
return nil, fmt.Errorf("unknown connector type: %T", c)
}
}

// it's impossible here to don't have a clickhouse decision
if clickhouseDecision == nil {
if clickhouseConnector == nil {
return nil, fmt.Errorf("no clickhouse connector")
}

if elasticDecision != nil {
fmt.Println("elastic", elasticDecision)
}

var responseBody []byte

startTime := time.Now()
Expand All @@ -343,7 +339,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

var table *clickhouse.Table // TODO we should use schema here only
var currentSchema schema.Schema
resolvedIndexes := clickhouseDecision.ClickhouseTables
resolvedIndexes := clickhouseConnector.ClickhouseTables

if len(resolvedIndexes) == 1 {
indexName := resolvedIndexes[0] // we got exactly one table here because of the check above
Expand Down Expand Up @@ -446,17 +442,12 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
plan.StartTime = startTime
plan.Name = model.MainExecutionPlan

// Some flags may trigger alternative execution plans, this is primary for dev

alternativePlan, alternativePlanExecutor := q.maybeCreateAlternativeExecutionPlan(ctx, resolvedIndexes, plan, queryTranslator, body, table, optAsync != nil)

var optComparePlansCh chan<- executionPlanResult

if alternativePlan != nil {
optComparePlansCh = q.runAlternativePlanAndComparison(ctx, alternativePlan, alternativePlanExecutor, body)
if decision.EnableABTesting {
return q.executeABTesting(ctx, plan, queryTranslator, table, body, optAsync, decision, indexPattern)
}

return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, optComparePlansCh)
return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, nil, true)

}

func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string,
Expand Down
Loading
Loading