Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 90400ea

Browse files
Table resolver - apply decision (#870)
This is second part of the table resolver introduction. In this PR: 1. Decisions made by resolved are applied. 2. Some rules were simplified 3. Add test. 4. Add endpoint `/index:/_quesma_table_resolver` to get a decision (it will be used in e2e tests) ``` curl 'http://localhost:8080/logs*/_quesma_table_resolver ```` Limitations: - Table name overrides are handled as before. - Rules may be incorrect. Tests are passing. But this is complex rewrite. --------- Co-authored-by: Piotr Grabowski <[email protected]>
1 parent 7e78e1b commit 90400ea

31 files changed

+1036
-593
lines changed

http_requests/disabled.http

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
POST localhost:8080/logs_disabled/_doc
2+
Content-Type: application/json
3+
4+
{
5+
"message": "Hello World!"
6+
}

http_requests/table_resolver.http

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
GET http://localhost:8080/logs*/_quesma_table_resolver

quesma/elasticsearch/index_management.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"quesma/logger"
88
"quesma/quesma/config"
99
"quesma/quesma/recovery"
10+
"quesma/util"
1011
"strings"
1112
"sync/atomic"
1213
"time"
@@ -115,3 +116,40 @@ func (im *indexManagement) Start() {
115116
func (im *indexManagement) Stop() {
116117
im.cancel()
117118
}
119+
120+
func NewFixedIndexManagement(indexes ...string) IndexManagement {
121+
return stubIndexManagement{indexes: indexes}
122+
}
123+
124+
type stubIndexManagement struct {
125+
indexes []string
126+
}
127+
128+
func (s stubIndexManagement) Start() {}
129+
func (s stubIndexManagement) Stop() {}
130+
func (s stubIndexManagement) ReloadIndices() {}
131+
func (s stubIndexManagement) GetSources() Sources {
132+
var dataStreams = []DataStream{}
133+
for _, index := range s.indexes {
134+
dataStreams = append(dataStreams, DataStream{Name: index})
135+
}
136+
return Sources{DataStreams: dataStreams}
137+
}
138+
139+
func (s stubIndexManagement) GetSourceNames() map[string]bool {
140+
var result = make(map[string]bool)
141+
for _, index := range s.indexes {
142+
result[index] = true
143+
}
144+
return result
145+
}
146+
147+
func (s stubIndexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool {
148+
var result = make(map[string]bool)
149+
for _, index := range s.indexes {
150+
if util.IndexPatternMatches(indexPattern, index) {
151+
result[index] = true
152+
}
153+
}
154+
return result
155+
}

quesma/end_user_errors/end_user.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ var ErrSearchCondition = errorType(2001, "Not supported search condition.")
9595
var ErrNoSuchTable = errorType(2002, "Missing table.")
9696
var ErrNoSuchSchema = errorType(2003, "Missing schema.")
9797
var ErrNoIngest = errorType(2004, "Ingest is not enabled.")
98+
var ErrNoConnector = errorType(2005, "No connector found.")
9899

99100
var ErrDatabaseTableNotFound = errorType(3001, "Table not found in database.")
100101
var ErrDatabaseFieldNotFound = errorType(3002, "Field not found in database.")

quesma/ingest/common_table_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,18 @@ func TestIngestToCommonTable(t *testing.T) {
191191

192192
resolver := table_resolver.NewEmptyTableResolver()
193193

194+
decision := &table_resolver.Decision{
195+
UseConnectors: []table_resolver.ConnectorDecision{
196+
&table_resolver.ConnectorDecisionClickhouse{
197+
ClickhouseTableName: common_table.TableName,
198+
ClickhouseTables: []string{indexName},
199+
IsCommonTable: true,
200+
},
201+
},
202+
}
203+
204+
resolver.Decisions[indexName] = decision
205+
194206
ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
195207
ingest.chDb = db
196208
ingest.virtualTableStorage = virtualTableStorage

quesma/ingest/ingest_validator_test.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,16 @@ func TestIngestValidation(t *testing.T) {
170170
ip := newIngestProcessorEmpty()
171171
ip.chDb = db
172172
ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap)
173-
ip.tableResolver = table_resolver.NewEmptyTableResolver()
173+
174+
resolver := table_resolver.NewEmptyTableResolver()
175+
decision := &table_resolver.Decision{
176+
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
177+
ClickhouseTableName: "test_table",
178+
}}}
179+
resolver.Decisions["test_table"] = decision
180+
181+
ip.tableResolver = resolver
182+
174183
defer db.Close()
175184

176185
mock.ExpectExec(EscapeBrackets(expectedInsertJsons[i])).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))

quesma/ingest/insert_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,12 @@ func TestProcessInsertQuery(t *testing.T) {
242242
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
243243
ip.ip.chDb = db
244244
resolver := table_resolver.NewEmptyTableResolver()
245+
decision := &table_resolver.Decision{
246+
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
247+
ClickhouseTableName: "test_table",
248+
}}}
249+
resolver.Decisions["test_table"] = decision
250+
245251
ip.ip.tableResolver = resolver
246252
defer db.Close()
247253

@@ -420,15 +426,21 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
420426
}
421427
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema
422428

423-
indexRegistry := table_resolver.NewEmptyTableResolver()
429+
resolver := table_resolver.NewEmptyTableResolver()
430+
decision := &table_resolver.Decision{
431+
UseConnectors: []table_resolver.ConnectorDecision{&table_resolver.ConnectorDecisionClickhouse{
432+
ClickhouseTableName: "test_index",
433+
}}}
434+
resolver.Decisions["test_index"] = decision
435+
424436
schemaRegistry.FieldEncodings = make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
425437
schemaRegistry.FieldEncodings[schema.FieldEncodingKey{TableName: indexName, FieldName: "schema_field"}] = "schema_field"
426438

427439
ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
428440
ingest.chDb = db
429441
ingest.virtualTableStorage = virtualTableStorage
430442
ingest.schemaRegistry = schemaRegistry
431-
ingest.tableResolver = indexRegistry
443+
ingest.tableResolver = resolver
432444

433445
ctx := context.Background()
434446
formatter := clickhouse.DefaultColumnNameFormatter()

quesma/ingest/processor.go

Lines changed: 45 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -706,38 +706,60 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
706706
tableFormatter TableColumNameFormatter) error {
707707

708708
decision := lm.tableResolver.Resolve(table_resolver.IngestPipeline, tableName)
709-
table_resolver.TODO("processInsertQuery", decision)
710709

711-
indexConf, ok := lm.cfg.IndexConfig[tableName]
712-
if ok && indexConf.UseCommonTable {
710+
if decision.Err != nil {
711+
return decision.Err
712+
}
713713

714-
// we have clone the data, because we want to process it twice
715-
var clonedJsonData []types.JSON
716-
for _, jsonValue := range jsonData {
717-
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
718-
}
714+
if decision.IsEmpty { // TODO
715+
return fmt.Errorf("table %s not found", tableName)
716+
}
719717

720-
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
721-
if err != nil {
722-
// we ignore an error here, because we want to process the data and don't lose it
723-
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
724-
}
718+
if decision.IsClosed { // TODO
719+
return fmt.Errorf("table %s is closed", tableName)
720+
}
725721

726-
pipeline := jsonprocessor.IngestTransformerPipeline{}
727-
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
728-
pipeline = append(pipeline, transformer)
729-
tableName = common_table.TableName
722+
for _, connectorDecision := range decision.UseConnectors {
730723

731-
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
732-
if err != nil {
733-
return fmt.Errorf("error processing insert query to a common table: %w", err)
724+
var clickhouseDecision *table_resolver.ConnectorDecisionClickhouse
725+
726+
var ok bool
727+
if clickhouseDecision, ok = connectorDecision.(*table_resolver.ConnectorDecisionClickhouse); !ok {
728+
continue
734729
}
735730

736-
return nil
737-
}
731+
if clickhouseDecision.IsCommonTable {
732+
733+
// we have clone the data, because we want to process it twice
734+
var clonedJsonData []types.JSON
735+
for _, jsonValue := range jsonData {
736+
clonedJsonData = append(clonedJsonData, jsonValue.Clone())
737+
}
738738

739-
return lm.processInsertQueryInternal(ctx, tableName, jsonData, transformer, tableFormatter, false)
739+
err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true)
740+
if err != nil {
741+
// we ignore an error here, because we want to process the data and don't lose it
742+
logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err)
743+
}
740744

745+
pipeline := jsonprocessor.IngestTransformerPipeline{}
746+
pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName})
747+
pipeline = append(pipeline, transformer)
748+
749+
err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false)
750+
if err != nil {
751+
return fmt.Errorf("error processing insert query to a common table: %w", err)
752+
}
753+
754+
} else {
755+
err := lm.processInsertQueryInternal(ctx, clickhouseDecision.ClickhouseTableName, jsonData, transformer, tableFormatter, false)
756+
if err != nil {
757+
return fmt.Errorf("error processing insert query: %w", err)
758+
}
759+
}
760+
761+
}
762+
return nil
741763
}
742764

743765
func (ip *IngestProcessor) processInsertQueryInternal(ctx context.Context, tableName string,

quesma/main.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,13 +90,13 @@ func main() {
9090
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
9191
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
9292

93+
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
94+
9395
connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco)
9496
lm := connManager.GetConnector()
9597

96-
elasticIndexResolver := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String())
97-
9898
// TODO index configuration for ingest and query is the same for now
99-
tableResolver := table_resolver.NewTableResolver(cfg, tableDisco, elasticIndexResolver)
99+
tableResolver := table_resolver.NewTableResolver(cfg, tableDisco, im)
100100
tableResolver.Start()
101101

102102
var ingestProcessor *ingest.IngestProcessor
@@ -112,8 +112,6 @@ func main() {
112112
logger.Info().Msg("Ingest processor is disabled.")
113113
}
114114

115-
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
116-
117115
logger.Info().Msgf("loaded config: %s", cfg.String())
118116

119117
quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, im, qmcLogChannel, phoneHomeAgent, schemaRegistry, tableResolver) //FIXME no ingest processor here just for now

quesma/quesma/functionality/bulk/bulk.go

Lines changed: 44 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,6 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
122122
index := op.GetIndex()
123123
operation := op.GetOperation()
124124

125-
decision := tableResolver.Resolve(table_resolver.IngestPipeline, index)
126-
table_resolver.TODO("splitBulk", decision)
127-
128125
entryWithResponse := BulkRequestEntry{
129126
operation: operation,
130127
index: index,
@@ -142,36 +139,13 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
142139
}
143140
}
144141

145-
indexConfig, found := cfg.IndexConfig[index]
146-
if !found || indexConfig.IsElasticIngestEnabled() {
147-
// Bulk entry for Elastic - forward the request as-is
148-
opBytes, err := rawOp.Bytes()
149-
if err != nil {
150-
return err
151-
}
152-
elasticRequestBody = append(elasticRequestBody, opBytes...)
153-
elasticRequestBody = append(elasticRequestBody, '\n')
154-
155-
documentBytes, err := document.Bytes()
156-
if err != nil {
157-
return err
158-
}
159-
elasticRequestBody = append(elasticRequestBody, documentBytes...)
160-
elasticRequestBody = append(elasticRequestBody, '\n')
142+
decision := tableResolver.Resolve(table_resolver.IngestPipeline, index)
161143

162-
elasticBulkEntries = append(elasticBulkEntries, entryWithResponse)
144+
if decision.Err != nil {
145+
return decision.Err
163146
}
164-
if found && indexConfig.IsClickhouseIngestEnabled() {
165-
// Bulk entry for Clickhouse
166-
if operation != "create" && operation != "index" {
167-
// Elastic also fails the entire bulk in such case
168-
logger.ErrorWithCtxAndReason(ctx, "unsupported bulk operation type").Msgf("unsupported bulk operation type: %s", operation)
169-
return fmt.Errorf("unsupported bulk operation type: %s. Operation: %v, Document: %v", operation, rawOp, document)
170-
}
171147

172-
clickhouseDocumentsToInsert[index] = append(clickhouseDocumentsToInsert[index], entryWithResponse)
173-
}
174-
if indexConfig.IsIngestDisabled() {
148+
if decision.IsClosed || len(decision.UseConnectors) == 0 {
175149
bulkSingleResponse := BulkSingleResponse{
176150
Shards: BulkShardsResponse{
177151
Failed: 1,
@@ -202,6 +176,46 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
202176
return fmt.Errorf("unsupported bulk operation type: %s. Document: %v", operation, document)
203177
}
204178
}
179+
180+
for _, connector := range decision.UseConnectors {
181+
182+
switch connector.(type) {
183+
184+
case *table_resolver.ConnectorDecisionElastic:
185+
// Bulk entry for Elastic - forward the request as-is
186+
opBytes, err := rawOp.Bytes()
187+
if err != nil {
188+
return err
189+
}
190+
elasticRequestBody = append(elasticRequestBody, opBytes...)
191+
elasticRequestBody = append(elasticRequestBody, '\n')
192+
193+
documentBytes, err := document.Bytes()
194+
if err != nil {
195+
return err
196+
}
197+
elasticRequestBody = append(elasticRequestBody, documentBytes...)
198+
elasticRequestBody = append(elasticRequestBody, '\n')
199+
200+
elasticBulkEntries = append(elasticBulkEntries, entryWithResponse)
201+
202+
case *table_resolver.ConnectorDecisionClickhouse:
203+
204+
// Bulk entry for Clickhouse
205+
if operation != "create" && operation != "index" {
206+
// Elastic also fails the entire bulk in such case
207+
logger.ErrorWithCtxAndReason(ctx, "unsupported bulk operation type").Msgf("unsupported bulk operation type: %s", operation)
208+
return fmt.Errorf("unsupported bulk operation type: %s. Operation: %v, Document: %v", operation, rawOp, document)
209+
}
210+
211+
clickhouseDocumentsToInsert[index] = append(clickhouseDocumentsToInsert[index], entryWithResponse)
212+
213+
default:
214+
return fmt.Errorf("unsupported connector type: %T", connector)
215+
}
216+
217+
}
218+
205219
return nil
206220
})
207221

0 commit comments

Comments
 (0)