Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 96e92be

Browse files
authored
Add support for common table // kick out TableResolver (#1089)
In this next-gen processor, we don't need an instance of table resolver. Each decision can be derived straight from the processor configuration.
1 parent 1d82268 commit 96e92be

File tree

4 files changed

+42
-85
lines changed

4 files changed

+42
-85
lines changed

quesma/ingest/processor2.go

Lines changed: 31 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"quesma/quesma/types"
2424
"quesma/schema"
2525
"quesma/stats"
26-
"quesma/table_resolver"
2726
"quesma/telemetry"
2827
"quesma/util"
2928
quesma_api "quesma_v2/core"
@@ -50,7 +49,6 @@ type (
5049
ingestFieldStatistics IngestFieldStatistics
5150
ingestFieldStatisticsLock sync.Mutex
5251
virtualTableStorage persistence.JSONDatabase
53-
tableResolver table_resolver.TableResolver
5452
}
5553
)
5654

@@ -442,63 +440,46 @@ func (lm *IngestProcessor2) Ingest(ctx context.Context, tableName string, jsonDa
442440
return lm.ProcessInsertQuery(ctx, tableName, jsonData, transformer, nameFormatter)
443441
}
444442

443+
func (lm *IngestProcessor2) useCommonTable(tableName string) bool {
444+
if tableConfig, ok := lm.cfg.IndexConfig[tableName]; ok {
445+
return tableConfig.UseCommonTable
446+
}
447+
return false
448+
}
449+
445450
func (lm *IngestProcessor2) ProcessInsertQuery(ctx context.Context, tableName string,
446451
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
447452
tableFormatter TableColumNameFormatter) error {
448453

449-
decision := lm.tableResolver.Resolve(quesma_api.IngestPipeline, tableName)
450-
451-
if decision.Err != nil {
452-
return decision.Err
453-
}
454-
455-
if decision.IsEmpty { // TODO
456-
return fmt.Errorf("table %s not found", tableName)
457-
}
458-
459-
if decision.IsClosed { // TODO
460-
return fmt.Errorf("table %s is closed", tableName)
461-
}
462-
463-
for _, connectorDecision := range decision.UseConnectors {
464-
465-
var clickhouseDecision *quesma_api.ConnectorDecisionClickhouse
466-
467-
var ok bool
468-
if clickhouseDecision, ok = connectorDecision.(*quesma_api.ConnectorDecisionClickhouse); !ok {
469-
continue
454+
if lm.useCommonTable(tableName) {
455+
// we have clone the data, because we want to process it twice
456+
var clonedJsonData []types.JSON
457+
for _, jsonValue := range jsonData {
458+
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
470459
}
471460

472-
if clickhouseDecision.IsCommonTable { // TODO: TABLE_RESOLVER DECIDES WHETHER WE'RE DEALING WITH COMMON TABLE
473-
// we have clone the data, because we want to process it twice
474-
var clonedJsonData []types.JSON
475-
for _, jsonValue := range jsonData {
476-
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
477-
}
478-
479-
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
480-
if err != nil {
481-
// we ignore an error here, because we want to process the data and don't lose it
482-
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
483-
}
484-
485-
pipeline := jsonprocessor.IngestTransformerPipeline{}
486-
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
487-
pipeline = append(pipeline, transformer)
461+
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
462+
if err != nil {
463+
// we ignore an error here, because we want to process the data and don't lose it
464+
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
465+
}
488466

489-
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
490-
if err != nil {
491-
return fmt.Errorf("error processing insert query to a common table: %w", err)
492-
}
467+
pipeline := jsonprocessor.IngestTransformerPipeline{}
468+
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
469+
pipeline = append(pipeline, transformer)
493470

494-
} else {
495-
err := lm.processInsertQueryInternal(ctx, clickhouseDecision.ClickhouseTableName, jsonData, transformer, tableFormatter, false)
496-
if err != nil {
497-
return fmt.Errorf("error processing insert query: %w", err)
498-
}
471+
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
472+
if err != nil {
473+
return fmt.Errorf("error processing insert query to a common table: %w", err)
499474
}
500475

476+
} else {
477+
err := lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false)
478+
if err != nil {
479+
return fmt.Errorf("error processing insert query: %w", err)
480+
}
501481
}
482+
502483
return nil
503484
}
504485

@@ -667,9 +648,9 @@ func (ip *IngestProcessor2) Ping() error {
667648
return ip.chDb.Open()
668649
}
669650

670-
func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver, esBackendConn backend_connectors.ElasticsearchBackendConnector) *IngestProcessor2 {
651+
func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, esBackendConn backend_connectors.ElasticsearchBackendConnector) *IngestProcessor2 {
671652
ctx, cancel := context.WithCancel(context.Background())
672-
return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver, es: esBackendConn}
653+
return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, es: esBackendConn}
673654
}
674655

675656
// validateIngest validates the document against the table schema

quesma/main.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,14 @@ func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder {
6666
"test_index_2": {
6767
Name: "test_index_2",
6868
},
69+
"tab1": {
70+
Name: "tab1",
71+
UseCommonTable: true,
72+
},
73+
"tab2": {
74+
Name: "tab2",
75+
UseCommonTable: true,
76+
},
6977
"*": {
7078
IngestTarget: []string{config.ElasticsearchTarget},
7179
},

quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcess
7878
tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, chBackendConn, virtualTableStorage)
7979
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{})
8080

81-
v2TableResolver := NewNextGenTableResolver()
82-
83-
ip := ingest.NewIngestProcessor2(oldQuesmaConfig, chBackendConn, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver, esBackendConn)
81+
ip := ingest.NewIngestProcessor2(oldQuesmaConfig, chBackendConn, nil, tableDisco, schemaRegistry, virtualTableStorage, esBackendConn)
8482

8583
ip.Start()
8684
return ip
@@ -103,7 +101,8 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
103101
messageAsHttpReq := mCasted.OriginalRequest
104102

105103
if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present && metadata[IngestAction] == DocIndexAction {
106-
// route to Elasticsearch, `bulk` request might be sent to ClickHouse depending on the request payload
104+
// `_doc` at this point can go directly to Elasticsearch,
105+
// `_bulk` request might be still sent to ClickHouse as the req payload may contain documents targeting CH tables
107106
resp := p.legacyIngestProcessor.SendToElasticsearch(messageAsHttpReq)
108107
respBody, err := ReadResponseBody(resp)
109108
if err != nil {

quesma/processors/es_to_ch_ingest/table_resolver_nextgen.go

Lines changed: 0 additions & 31 deletions
This file was deleted.

0 commit comments

Comments
 (0)