Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 87c168d

Browse files
authored
A/B testing - use the ingest processor instead of calling loopback (#911)
Let's introduce `intest.Ingester` interface. We can use our ingest processor internally now.
1 parent 30fb255 commit 87c168d

File tree

12 files changed

+74
-36
lines changed

12 files changed

+74
-36
lines changed

quesma/ab_testing/collector/collector.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"quesma/ab_testing"
88
"quesma/buildinfo"
9+
"quesma/ingest"
910
"quesma/logger"
1011
"quesma/quesma/recovery"
1112
"time"
@@ -65,15 +66,12 @@ func (r *InMemoryCollector) String() string {
6566
return "InMemoryCollector(sends data to Quesma)"
6667
}
6768

68-
func NewCollector(ctx context.Context, healthQueue chan<- ab_testing.HealthMessage) *InMemoryCollector {
69+
func NewCollector(ctx context.Context, ingester ingest.Ingester, healthQueue chan<- ab_testing.HealthMessage) *InMemoryCollector {
6970

7071
ctx, cancel := context.WithCancel(ctx)
7172

7273
// TODO read config here
7374

74-
// avoid unused struct error
75-
var _ = &mismatchedOnlyFilter{}
76-
7775
return &InMemoryCollector{
7876
receiveQueue: make(chan ab_testing.Result, 1000),
7977
ctx: ctx,
@@ -85,9 +83,13 @@ func NewCollector(ctx context.Context, healthQueue chan<- ab_testing.HealthMessa
8583
&diffTransformer{},
8684
//&ppPrintFanout{},
8785
//&mismatchedOnlyFilter{},
88-
&elasticSearchFanout{
89-
url: "http://localhost:8080",
90-
indexName: "ab_testing_logs",
86+
//&elasticSearchFanout{
87+
// url: "http://localhost:8080",
88+
// indexName: "ab_testing_logs",
89+
//},
90+
&internalIngestFanout{
91+
indexName: "ab_testing_logs",
92+
ingestProcessor: ingester,
9193
},
9294
},
9395
healthQueue: healthQueue,

quesma/ab_testing/collector/fanout.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ package collector
44

55
import (
66
"bytes"
7+
"context"
78
"encoding/json"
89
"fmt"
910
"net/http"
11+
"quesma/ingest"
1012
"quesma/logger"
13+
"quesma/quesma/types"
1114
)
1215

1316
type elasticSearchFanout struct {
@@ -57,3 +60,35 @@ func (t *elasticSearchFanout) process(in EnrichedResults) (out EnrichedResults,
5760
// Elasticsearch logic here
5861
return in, false, nil
5962
}
63+
64+
type internalIngestFanout struct {
65+
indexName string
66+
ingestProcessor ingest.Ingester
67+
}
68+
69+
func (t *internalIngestFanout) name() string {
70+
return "internalIngestFanout"
71+
}
72+
73+
func (t *internalIngestFanout) process(in EnrichedResults) (out EnrichedResults, drop bool, err error) {
74+
75+
asBytes, err := json.Marshal(in)
76+
if err != nil {
77+
logger.Error().Msgf("failed to marshal A/B results line: %v", err)
78+
return in, false, err
79+
}
80+
81+
asJson, err := types.ParseJSON(string(asBytes))
82+
83+
if err != nil {
84+
logger.Error().Msgf("failed to parse A/B results line: %v", err)
85+
return
86+
}
87+
88+
err = t.ingestProcessor.Ingest(context.Background(), t.indexName, []types.JSON{asJson})
89+
90+
return in, false, err
91+
}
92+
93+
var _ = &internalIngestFanout{}
94+
var _ = &elasticSearchFanout{}

quesma/ab_testing/collector/filters.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,3 +37,6 @@ func (t *mismatchedOnlyFilter) process(in EnrichedResults) (out EnrichedResults,
3737

3838
return in, false, nil
3939
}
40+
41+
// avoid unused struct error
42+
var _ = &mismatchedOnlyFilter{}

quesma/ab_testing/sender/coordinator.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"context"
77
"quesma/ab_testing"
88
"quesma/ab_testing/collector"
9+
"quesma/ingest"
910
"quesma/logger"
1011
"quesma/quesma/config"
1112
"quesma/quesma/recovery"
@@ -20,10 +21,12 @@ type SenderCoordinator struct {
2021

2122
sender *sender // sender managed by this coordinator
2223

24+
ingester ingest.Ingester
25+
2326
enabled bool
2427
}
2528

26-
func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator {
29+
func NewSenderCoordinator(cfg *config.QuesmaConfiguration, ingester ingest.Ingester) *SenderCoordinator {
2730

2831
ctx, cancel := context.WithCancel(context.Background())
2932

@@ -44,6 +47,7 @@ func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator {
4447
ctx: ctx,
4548
cancelFunc: cancel,
4649
enabled: len(enabledForIndex) > 0,
50+
ingester: ingester,
4751
// add quesma health monitor service here
4852
}
4953
}
@@ -57,7 +61,7 @@ func (c *SenderCoordinator) GetSender() ab_testing.Sender {
5761
}
5862

5963
func (c *SenderCoordinator) newInMemoryProcessor(healthQueue chan<- ab_testing.HealthMessage) *collector.InMemoryCollector {
60-
repo := collector.NewCollector(c.ctx, healthQueue)
64+
repo := collector.NewCollector(c.ctx, c.ingester, healthQueue)
6165
repo.Start()
6266
return repo
6367
}

quesma/clickhouse/util.go

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -12,25 +12,6 @@ import (
1212
"time"
1313
)
1414

15-
type TableColumNameFormatter interface {
16-
Format(namespace, columnName string) string
17-
}
18-
19-
type columNameFormatter struct {
20-
separator string
21-
}
22-
23-
func (t *columNameFormatter) Format(namespace, columnName string) string {
24-
if namespace == "" {
25-
return columnName
26-
}
27-
return fmt.Sprintf("%s%s%s", namespace, t.separator, columnName)
28-
}
29-
30-
func DefaultColumnNameFormatter() TableColumNameFormatter {
31-
return &columNameFormatter{separator: "_"}
32-
}
33-
3415
// Code doesn't need to be pretty, 99.9% it's just for our purposes
3516
// Parses type from SHOW COLUMNS FROM "table"
3617
func parseTypeFromShowColumns(typ, name string) (Type, string) {

quesma/ingest/common_table_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func TestIngestToCommonTable(t *testing.T) {
231231
}
232232

233233
ctx := context.Background()
234-
formatter := clickhouse.DefaultColumnNameFormatter()
234+
formatter := DefaultColumnNameFormatter()
235235

236236
transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig)
237237

quesma/ingest/insert_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
443443
ingest.tableResolver = resolver
444444

445445
ctx := context.Background()
446-
formatter := clickhouse.DefaultColumnNameFormatter()
446+
formatter := DefaultColumnNameFormatter()
447447

448448
transformer := jsonprocessor.IngestTransformerFor(indexName, quesmaConfig)
449449

quesma/ingest/processor.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ type (
5151
IngestFieldStatistics map[IngestFieldBucketKey]int64
5252
)
5353

54+
type Ingester interface {
55+
Ingest(ctx context.Context, tableName string, jsonData []types.JSON) error
56+
}
57+
5458
type (
5559
IngestProcessor struct {
5660
ctx context.Context
@@ -701,6 +705,13 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
701705
return generateSqlStatements(createTableCmd, alterCmd, insert), nil
702706
}
703707

708+
func (lm *IngestProcessor) Ingest(ctx context.Context, tableName string, jsonData []types.JSON) error {
709+
710+
nameFormatter := DefaultColumnNameFormatter()
711+
transformer := jsonprocessor.IngestTransformerFor(tableName, lm.cfg)
712+
return lm.ProcessInsertQuery(ctx, tableName, jsonData, transformer, nameFormatter)
713+
}
714+
704715
func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName string,
705716
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
706717
tableFormatter TableColumNameFormatter) error {

quesma/ingest/processor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func TestAddTimestamp(t *testing.T) {
139139
CastUnsupportedAttrValueTypesToString: false,
140140
PreferCastingToOthers: false,
141141
}
142-
nameFormatter := clickhouse.DefaultColumnNameFormatter()
142+
nameFormatter := DefaultColumnNameFormatter()
143143
ip := newIngestProcessorEmpty()
144144
ip.schemaRegistry = &schema.StaticRegistry{}
145145
jsonData := types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`)

quesma/ingest/util.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (t *columNameFormatter) Format(namespace, columnName string) string {
2525
return fmt.Sprintf("%s%s%s", namespace, t.separator, columnName)
2626
}
2727

28+
func DefaultColumnNameFormatter() TableColumNameFormatter {
29+
return &columNameFormatter{separator: "_"}
30+
}
31+
2832
// Code doesn't need to be pretty, 99.9% it's just for our purposes
2933
// Parses type from SHOW COLUMNS FROM "table"
3034
func parseTypeFromShowColumns(typ, name string) (clickhouse.Type, string) {

0 commit comments

Comments
 (0)