Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit fefacc5

Browse files
authored
Ingest processor v2 - cleanup passing objects (#1084)
1 parent 3918405 commit fefacc5

File tree

4 files changed

+53
-70
lines changed

4 files changed

+53
-70
lines changed

quesma/backend_connectors/elasticsearch_backend_connector.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,10 @@ func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *El
4141
return conn
4242
}
4343

44+
func (e *ElasticsearchBackendConnector) GetConfig() config.ElasticsearchConfiguration {
45+
return e.config
46+
}
47+
4448
func (e *ElasticsearchBackendConnector) RequestWithHeaders(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) {
4549
return e.doRequest(ctx, method, endpoint, body, headers)
4650
}

quesma/ingest/processor2.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"fmt"
99
"github.com/ClickHouse/clickhouse-go/v2"
1010
"github.com/goccy/go-json"
11+
"net/http"
12+
"quesma/backend_connectors"
1113
chLib "quesma/clickhouse"
1214
"quesma/comment_metadata"
1315
"quesma/common_table"
@@ -39,6 +41,7 @@ type (
3941
ctx context.Context
4042
cancel context.CancelFunc
4143
chDb quesma_api.BackendConnector
44+
es backend_connectors.ElasticsearchBackendConnector
4245
tableDiscovery chLib.TableDiscovery
4346
cfg *config.QuesmaConfiguration
4447
phoneHomeAgent telemetry.PhoneHomeAgent
@@ -104,6 +107,14 @@ func (ip *IngestProcessor2) Close() {
104107
// return count, nil
105108
//}
106109

110+
func (ip *IngestProcessor2) SendToElasticsearch(req *http.Request) *http.Response {
111+
return ip.es.Send(req)
112+
}
113+
114+
func (ip *IngestProcessor2) RequestToElasticsearch(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) {
115+
return ip.es.RequestWithHeaders(ctx, method, endpoint, body, headers)
116+
}
117+
107118
func (ip *IngestProcessor2) createTableObjectAndAttributes(ctx context.Context, query string, config *chLib.ChTableConfig, name string, tableDefinitionChangeOnly bool) (string, error) {
108119
table, err := chLib.NewTable(query, config)
109120
if err != nil {
@@ -656,9 +667,9 @@ func (ip *IngestProcessor2) Ping() error {
656667
return ip.chDb.Open()
657668
}
658669

659-
func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor2 {
670+
func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver, esBackendConn backend_connectors.ElasticsearchBackendConnector) *IngestProcessor2 {
660671
ctx, cancel := context.WithCancel(context.Background())
661-
return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver}
672+
return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver, es: esBackendConn}
662673
}
663674

664675
// validateIngest validates the document against the table schema

quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go

Lines changed: 19 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import (
1010
"github.com/rs/zerolog/log"
1111
"io"
1212
"net/http"
13-
"net/url"
1413
"quesma/backend_connectors"
1514
"quesma/clickhouse"
1615
"quesma/common_table"
@@ -47,9 +46,18 @@ func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConf
4746
func (p *ElasticsearchToClickHouseIngestProcessor) Init() error {
4847
chBackendConnector := p.GetBackendConnector(quesma_api.ClickHouseSQLBackend)
4948
if chBackendConnector == nil {
50-
return fmt.Errorf("ClickHouse backend connector not found")
49+
return fmt.Errorf("backend connector for ClickHouse not found")
50+
}
51+
esBackendConnector := p.GetBackendConnector(quesma_api.ElasticsearchBackend)
52+
if esBackendConnector == nil {
53+
return fmt.Errorf("backend connector for Elasticsearch not found")
54+
}
55+
esBackendConnectorCasted, ok := esBackendConnector.(*backend_connectors.ElasticsearchBackendConnector) // OKAY JUST FOR NOW
56+
if !ok {
57+
return fmt.Errorf("failed to cast Elasticsearch backend connector")
5158
}
52-
p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor(chBackendConnector)
59+
60+
p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor(chBackendConnector, *esBackendConnectorCasted)
5361

5462
return nil
5563
}
@@ -60,50 +68,31 @@ func (p *ElasticsearchToClickHouseIngestProcessor) GetId() string {
6068

6169
// prepareTemporaryIngestProcessor creates a temporary ingest processor which is a new version of the ingest processor,
6270
// which uses `quesma_api.BackendConnector` instead of `*sql.DB` for the database connection.
63-
func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor(connector quesma_api.BackendConnector) *ingest.IngestProcessor2 {
64-
u, _ := url.Parse("http://localhost:9200")
71+
func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor(chBackendConn quesma_api.BackendConnector, esBackendConn backend_connectors.ElasticsearchBackendConnector) *ingest.IngestProcessor2 {
6572

66-
elasticsearchConfig := config.ElasticsearchConfiguration{
67-
Url: (*config.Url)(u),
68-
}
6973
oldQuesmaConfig := &config.QuesmaConfiguration{
7074
IndexConfig: p.config.IndexConfig,
7175
}
7276

73-
virtualTableStorage := persistence.NewElasticJSONDatabase(elasticsearchConfig, common_table.VirtualTableElasticIndexName)
74-
tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, connector, virtualTableStorage)
77+
virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName)
78+
tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, chBackendConn, virtualTableStorage)
7579
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{})
7680

7781
v2TableResolver := NewNextGenTableResolver()
7882

79-
ip := ingest.NewIngestProcessor2(oldQuesmaConfig, connector, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver)
83+
ip := ingest.NewIngestProcessor2(oldQuesmaConfig, chBackendConn, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver, esBackendConn)
84+
8085
ip.Start()
8186
return ip
8287
}
8388

8489
func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) {
8590
var data []byte
86-
var chBackend, esBackend quesma_api.BackendConnector
8791
indexNameFromIncomingReq := metadata[IngestTargetKey].(string)
8892
if indexNameFromIncomingReq == "" {
8993
panic("NO INDEX NAME?!?!?")
9094
}
9195

92-
if chBackend = p.GetBackendConnector(quesma_api.ClickHouseSQLBackend); chBackend == nil {
93-
fmt.Println("Backend connector not found")
94-
return metadata, data, nil
95-
}
96-
97-
esBackend = p.GetBackendConnector(quesma_api.ElasticsearchBackend)
98-
if esBackend == nil {
99-
fmt.Println("Backend connector not found")
100-
return metadata, data, nil
101-
}
102-
es, ok := esBackend.(*backend_connectors.ElasticsearchBackendConnector) // OKAY JUST FOR NOW
103-
if !ok {
104-
panic(" !!! ")
105-
}
106-
10796
for _, m := range message {
10897
messageAsHttpReq, err := quesma_api.CheckedCast[*http.Request](m)
10998
if err != nil {
@@ -112,7 +101,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
112101

113102
if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present && metadata[IngestAction] == DocIndexAction {
114103
// route to Elasticsearch, `bulk` request might be sent to ClickHouse depending on the request payload
115-
resp := es.Send(messageAsHttpReq)
104+
resp := p.legacyIngestProcessor.SendToElasticsearch(messageAsHttpReq)
116105
respBody, err := ReadResponseBody(resp)
117106
if err != nil {
118107
println(err)
@@ -131,7 +120,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
131120
if err != nil {
132121
println(err)
133122
}
134-
result, err := handleDocIndex(payloadJson, indexNameFromIncomingReq, p.legacyIngestProcessor, p.config)
123+
result, err := p.handleDocIndex(payloadJson, indexNameFromIncomingReq)
135124
if err != nil {
136125
println(err)
137126
}
@@ -143,7 +132,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
143132
if err != nil {
144133
println(err)
145134
}
146-
results, err := handleBulkIndex(payloadNDJson, indexNameFromIncomingReq, p.legacyIngestProcessor, es, p.config)
135+
results, err := p.handleBulkIndex(payloadNDJson, indexNameFromIncomingReq)
147136
if err != nil {
148137
println(err)
149138
}

quesma/processors/es_to_ch_ingest/handlers.go

Lines changed: 17 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"fmt"
1010
"io"
1111
"net/http"
12-
"quesma/backend_connectors"
13-
"quesma/ingest"
1412
"quesma/logger"
1513
"quesma/queryparser"
1614
"quesma/quesma/config"
@@ -20,77 +18,57 @@ import (
2018
)
2119

2220
// handleDocIndex assembles the payload into bulk format to reusing existing logic of bulk ingest
23-
func handleDocIndex(payload types.JSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, indexConfig config.QuesmaProcessorConfig) (bulkmodel.BulkItem, error) {
21+
func (p *ElasticsearchToClickHouseIngestProcessor) handleDocIndex(payload types.JSON, targetTableName string) (bulkmodel.BulkItem, error) {
2422
newPayload := []types.JSON{
2523
map[string]interface{}{"index": map[string]interface{}{"_index": targetTableName}},
2624
payload,
2725
}
2826

29-
if results, err := Write(context.Background(), &targetTableName, newPayload, temporaryIngestProcessor, nil, indexConfig); err != nil {
27+
if results, err := p.Write(context.Background(), &targetTableName, newPayload); err != nil {
3028
return bulkmodel.BulkItem{}, err
3129
} else {
3230
return results[0], nil
3331
}
3432
}
3533

36-
func handleBulkIndex(payload types.NDJSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, cfg config.QuesmaProcessorConfig) ([]bulkmodel.BulkItem, error) {
37-
results, err := Write(context.Background(), &targetTableName, payload, temporaryIngestProcessor, es, cfg)
34+
func (p *ElasticsearchToClickHouseIngestProcessor) handleBulkIndex(payload types.NDJSON, targetTableName string) ([]bulkmodel.BulkItem, error) {
35+
results, err := p.Write(context.Background(), &targetTableName, payload)
3836
if err != nil {
3937
fmt.Printf("failed writing: %v", err)
4038
return []bulkmodel.BulkItem{}, err
4139
}
4240
return results, nil
4341
}
4442

45-
func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, conf config.QuesmaProcessorConfig) (results []bulkmodel.BulkItem, err error) {
43+
func (p *ElasticsearchToClickHouseIngestProcessor) Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON) (results []bulkmodel.BulkItem, err error) {
4644
defer recovery.LogPanic()
4745

4846
bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
4947

5048
// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
5149
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
52-
results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, conf)
50+
results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, p.config)
5351
if err != nil {
5452
return []bulkmodel.BulkItem{}, err
5553
}
5654

57-
// we fail if there are some documents to insert into Clickhouse but ingest processor is not available
58-
//if len(clickhouseDocumentsToInsert) > 0 && ip == nil {
59-
//
60-
// indexes := make(map[string]struct{})
61-
// for index := range clickhouseDocumentsToInsert {
62-
// indexes[index] = struct{}{}
63-
// }
64-
//
65-
// indexesAsList := make([]string, 0, len(indexes))
66-
// for index := range indexes {
67-
// indexesAsList = append(indexesAsList, index)
68-
// }
69-
// sort.Strings(indexesAsList)
70-
//
71-
// return []BulkItem{}, end_user_errors.ErrNoIngest.New(fmt.Errorf("ingest processor is not available, but documents are targeted to Clickhouse indexes: %s", strings.Join(indexesAsList, ",")))
72-
//}
73-
74-
// No place for that here
75-
err = sendToElastic(elasticRequestBody, elasticBulkEntries, es)
55+
err = p.sendToElastic(elasticRequestBody, elasticBulkEntries)
7656
if err != nil {
7757
return []bulkmodel.BulkItem{}, err
7858
}
7959

80-
//if ip != nil {
81-
fmt.Printf("woudl send to clickhouse: [%v]", clickhouseDocumentsToInsert)
82-
sendToClickhouse(ctx, clickhouseDocumentsToInsert, ip)
83-
//}
60+
fmt.Printf("would send to clickhouse: [%v]\n", clickhouseDocumentsToInsert)
61+
p.sendToClickhouse(ctx, clickhouseDocumentsToInsert)
8462

8563
return results, nil
8664
}
8765

88-
func sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEntry, es *backend_connectors.ElasticsearchBackendConnector) error {
66+
func (p *ElasticsearchToClickHouseIngestProcessor) sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEntry) error {
8967
if len(elasticRequestBody) == 0 {
9068
return nil
9169
}
9270

93-
response, err := es.RequestWithHeaders(context.Background(), "POST", "/_bulk", elasticRequestBody, http.Header{"Content-Type": {"application/x-ndjson"}})
71+
response, err := p.legacyIngestProcessor.RequestToElasticsearch(context.Background(), "POST", "/_bulk", elasticRequestBody, http.Header{"Content-Type": {"application/x-ndjson"}})
9472
if err != nil {
9573
return err
9674
}
@@ -118,7 +96,7 @@ func sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEn
11896
return nil
11997
}
12098

121-
func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry, ip *ingest.IngestProcessor2) {
99+
func (p *ElasticsearchToClickHouseIngestProcessor) sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry) {
122100
for indexName, documents := range clickhouseDocumentsToInsert {
123101
//phoneHomeAgent.IngestCounters().Add(indexName, int64(len(documents)))
124102

@@ -127,16 +105,17 @@ func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[strin
127105
//}
128106
// if the index is mapped to specified database table in the configuration, use that table
129107
// TODO: Index name override ignored for now
130-
//if len(cfg.IndexConfig[indexName].Override) > 0 {
131-
// indexName = cfg.IndexConfig[indexName].Override
132-
//}
108+
109+
if len(p.config.IndexConfig[indexName].Override) > 0 {
110+
indexName = p.config.IndexConfig[indexName].Override
111+
}
133112

134113
inserts := make([]types.JSON, len(documents))
135114
for i, document := range documents {
136115
inserts[i] = document.document
137116
}
138117

139-
err := ip.Ingest(ctx, indexName, inserts)
118+
err := p.legacyIngestProcessor.Ingest(ctx, indexName, inserts)
140119

141120
for _, document := range documents {
142121
bulkSingleResponse := bulkmodel.BulkSingleResponse{

0 commit comments

Comments
 (0)