diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 209dc0c5a..a7e03234c 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -52,6 +52,7 @@ jobs: runs-on: ubuntu-latest needs: [build-quesma-docker-image, check-comment] if: ${{ always() && (github.event_name != 'issue_comment' || needs.check-comment.result == 'success') }} + timeout-minutes: 20 steps: - uses: actions/checkout@v4 with: @@ -105,7 +106,7 @@ jobs: - name: Run integration tests working-directory: ci/it - run: go test -v + run: go test -timeout 20m -v - name: Send Slack notification on failure if: ${{ failure() && github.event_name == 'push' && github.ref == 'refs/heads/main' }} diff --git a/ci/it/configs/quesma-common-table-and-regular-table.yml.template b/ci/it/configs/quesma-common-table-and-regular-table.yml.template new file mode 100644 index 000000000..219210f3f --- /dev/null +++ b/ci/it/configs/quesma-common-table-and-regular-table.yml.template @@ -0,0 +1,103 @@ +flags: + defaultStringColumnType: keyword +ingestStatistics: false + +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + disableAuth: true + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 + +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: my-clickhouse-data-source + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} + +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + first: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + second: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + third: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + "*": + useCommonTable: true + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + first: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + second: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + third: + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source + "*": + useCommonTable: true + schemaOverrides: + fields: + "message": + type: text + target: + - my-clickhouse-data-source +pipelines: + - name: my-pipeline-elasticsearch-query-clickhouse + frontendConnectors: [elastic-query] + processors: [my-query-processor] + backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source] + - name: my-pipeline-elasticsearch-ingest-to-clickhouse + frontendConnectors: [elastic-ingest] + processors: [my-ingest-processor] + backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source] diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 7600b6ad1..402a0a063 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -91,3 +91,8 @@ func TestOnlyCommonTable(t *testing.T) { testCase := testcases.NewOnlyCommonTableTestcase() runIntegrationTest(t, testCase) } + +func TestCommonTableAndRegularTable(t *testing.T) { + testCase := testcases.NewCommonTableAndRegularTable() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_common_table_and_regular_table.go b/ci/it/testcases/test_common_table_and_regular_table.go new file mode 100644 index 000000000..5b4cea817 --- /dev/null +++ b/ci/it/testcases/test_common_table_and_regular_table.go @@ -0,0 +1,167 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package testcases + +import ( + "context" + "encoding/json" + "fmt" + "github.com/stretchr/testify/assert" + "sort" + "testing" +) + +type CommonTableAndRegularTable struct { + IntegrationTestcaseBase +} + +func NewCommonTableAndRegularTable() *CommonTableAndRegularTable { + return &CommonTableAndRegularTable{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-common-table-and-regular-table.yml.template", + }, + } +} + +func (a *CommonTableAndRegularTable) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + a.Containers = containers + return err +} + +func (a *CommonTableAndRegularTable) RunTests(ctx context.Context, t *testing.T) error { + + t.Run("all mappings", func(t *testing.T) { a.mappingsAll(ctx, t) }) + + return nil +} + +func (a *CommonTableAndRegularTable) mappingsAll(ctx context.Context, t *testing.T) { + + fetchStarFieldCaps := func() map[string]any { + + resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_field_caps", nil) + if resp.StatusCode != 200 { + t.Fatalf("Failed to fetch mappings: %s", body) + } + + var fieldCaps map[string]any + if err := json.Unmarshal(body, &fieldCaps); err != nil { + t.Fatalf("Failed to unmarshal mappings: %v", err) + } + + return fieldCaps + } + + fetchFieldCaps := func(index string) { + + resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_field_caps", index), nil) + if resp.StatusCode != 200 { + t.Fatalf("Failed to fetch mappings: %s", body) + } + + var fieldCaps map[string]any + if err := json.Unmarshal(body, &fieldCaps); err != nil { + t.Fatalf("Failed to unmarshal mappings: %v", err) + } + if len(fieldCaps) == 0 { + t.Fatalf("Expected field caps for index %s, got empty response", index) + } + } + + checkFieldCaps := func(fieldCaps map[string]any, expectedIndexes []string) { + + indicesAny, ok := fieldCaps["indices"].([]any) + if !ok { + t.Fatalf("Expected 'indices' to be a slice of strings, got: %T", fieldCaps["indices"]) + } + + indices := make([]string, len(indicesAny)) + for i, index := range indicesAny { + indexStr, ok := index.(string) + if !ok { + t.Fatalf("Expected index to be a string, got: %T", index) + } + indices[i] = indexStr + } + + assert.Equal(t, len(expectedIndexes), len(indices)) + + sort.Strings(indices) + sort.Strings(expectedIndexes) + + for i, index := range expectedIndexes { + assert.Equal(t, index, indices[i], fmt.Sprintf("Index %s should exist in field caps", index)) + } + + } + + fetchStarMapping := func() map[string]any { + + resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_mapping", nil) + if resp.StatusCode != 200 { + t.Fatalf("Failed to fetch mappings: %s", body) + } + + var mappings map[string]any + if err := json.Unmarshal(body, &mappings); err != nil { + t.Fatalf("Failed to unmarshal mappings: %v", err) + } + return mappings + } + + fetchMappings := func(index string) { + resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_mapping", index), nil) + if resp.StatusCode != 200 { + t.Fatalf("Failed to fetch mappings: %s", body) + } + + var mappings map[string]any + if err := json.Unmarshal(body, &mappings); err != nil { + t.Fatalf("Failed to unmarshal mappings: %v", err) + } + if len(mappings) == 0 { + t.Fatalf("Expected mappings for index %s, got empty response", index) + } + } + + checkMappings := func(mappings map[string]any, expectedIndexes []string) { + + assert.Equal(t, len(expectedIndexes), len(mappings)) + + for _, index := range expectedIndexes { + _, exists := mappings[index] + assert.True(t, exists, fmt.Sprintf("Index %s should exist in mappings", index)) + } + } + + expectedIndexes := []string{"first", "second", "third"} // explicitly defined indexes in the config + + mappings := fetchStarMapping() + checkMappings(mappings, expectedIndexes) + + fieldCaps := fetchStarFieldCaps() + checkFieldCaps(fieldCaps, expectedIndexes) + + for _, index := range expectedIndexes { + fetchFieldCaps(index) + fetchMappings(index) + } + + // add a new index (common table) + + a.RequestToQuesma(ctx, t, "POST", "/go_to_common_table/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + + expectedIndexes = append(expectedIndexes, "go_to_common_table") + mappings = fetchStarMapping() + checkMappings(mappings, expectedIndexes) + + fieldCaps = fetchStarFieldCaps() + checkFieldCaps(fieldCaps, expectedIndexes) + + for _, index := range expectedIndexes { + fetchFieldCaps(index) + fetchMappings(index) + } +} diff --git a/http_requests/_mapping.http b/http_requests/_mapping.http index b5b6a1ad1..c1d50b268 100644 --- a/http_requests/_mapping.http +++ b/http_requests/_mapping.http @@ -1 +1,7 @@ GET http://localhost:8080/kibana_sample_data_ecommerce/_mapping + +#### +GET localhost:8080/*/_field_caps + +### +GET localhost:8080/*/_mapping \ No newline at end of file diff --git a/platform/common_table/const.go b/platform/common_table/const.go index 60fe19e16..4d65051f1 100644 --- a/platform/common_table/const.go +++ b/platform/common_table/const.go @@ -65,3 +65,15 @@ type VirtualTable struct { StoredAt string `json:"stored_at"` Columns []VirtualTableColumn `json:"columns"` } + +// FilterCommonTableIndex filters out the common table index from the provided list of indexes +func FilterCommonTableIndex(indexes []string) []string { + // filter out common table index + var filtered []string + for _, index := range indexes { + if index != TableName { + filtered = append(filtered, index) + } + } + return filtered +} diff --git a/platform/frontend_connectors/es_responses.go b/platform/frontend_connectors/es_responses.go index 9e9d30d1a..75d2a5716 100644 --- a/platform/frontend_connectors/es_responses.go +++ b/platform/frontend_connectors/es_responses.go @@ -183,20 +183,34 @@ func putIndexResult(index string) (*quesma_api.Result, error) { return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil } -func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) { - result := map[string]any{ - index: map[string]any{ - "mappings": mappings, - }, +func getIndexMappingResults(mappings map[string]map[string]any) (*quesma_api.Result, error) { + + result := make(map[string]any) + + for index, mapping := range mappings { + result[index] = map[string]any{ + "mappings": mapping, + } } + serialized, err := json.Marshal(result) if err != nil { return nil, err } - return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil } +func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) { + + // single index mapping result + + allMappings := make(map[string]map[string]any) + + allMappings[index] = mappings + + return getIndexMappingResults(allMappings) +} + func getIndexResult(index string, mappings map[string]any) (*quesma_api.Result, error) { // For now return the same as getIndexMappingResult, // but "GET /:index" can also contain "settings" and "aliases" (in the future) diff --git a/platform/frontend_connectors/route_handlers.go b/platform/frontend_connectors/route_handlers.go index a364580b0..2288f1bf9 100644 --- a/platform/frontend_connectors/route_handlers.go +++ b/platform/frontend_connectors/route_handlers.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/QuesmaOrg/quesma/platform/backend_connectors" "github.com/QuesmaOrg/quesma/platform/clickhouse" + "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/elasticsearch" quesma_errors "github.com/QuesmaOrg/quesma/platform/errors" @@ -177,16 +178,35 @@ func HandleIndexRefresh() (*quesma_api.Result, error) { return ElasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil } -func HandleGetIndexMapping(sr schema.Registry, index string) (*quesma_api.Result, error) { - foundSchema, found := sr.FindSchema(schema.IndexName(index)) - if !found { - return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil +func HandleGetIndexMapping(ctx context.Context, sr schema.Registry, lm clickhouse.LogManagerIFace, index string) (*quesma_api.Result, error) { + + indexes, err := lm.ResolveIndexPattern(ctx, sr, index) + if err != nil { + return nil, err } - hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) - mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + allMappings := make(map[string]map[string]any) + + indexes = common_table.FilterCommonTableIndex(indexes) + + for _, resolvedIndex := range indexes { + + foundSchema, found := sr.FindSchema(schema.IndexName(resolvedIndex)) + if !found { + continue + } + + hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) + mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + + allMappings[resolvedIndex] = mappings + } + + if len(allMappings) == 0 { + return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil + } - return getIndexMappingResult(index, mappings) + return getIndexMappingResults(allMappings) } func HandlePitStore(indexPattern string) (*quesma_api.Result, error) { diff --git a/platform/frontend_connectors/router_v2.go b/platform/frontend_connectors/router_v2.go index 0bf64c7c3..b14f22c1e 100644 --- a/platform/frontend_connectors/router_v2.go +++ b/platform/frontend_connectors/router_v2.go @@ -207,11 +207,11 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm return HandleMultiSearch(ctx, req, "", queryRunner) }) - router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { + router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchAgainstTableResolver(tableResolver, quesma_api.MetaPipeline)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { index := req.Params["index"] switch req.Method { case "GET": - return HandleGetIndexMapping(sr, index) + return HandleGetIndexMapping(ctx, sr, lm, index) case "PUT": if body, err := types.ExpectJSON(req.ParsedBody); err != nil { return nil, err @@ -236,7 +236,7 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm return nil, errors.New("unsupported method") }) - router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { + router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchAgainstTableResolver(tableResolver, quesma_api.MetaPipeline)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { return HandleFieldCaps(ctx, req.Params["index"], req.QueryParams.Get("allow_no_indices") == "true", req.QueryParams.Get("ignore_unavailable") == "true", diff --git a/platform/functionality/field_capabilities/field_caps.go b/platform/functionality/field_capabilities/field_caps.go index f89915fc6..4c879fb10 100644 --- a/platform/functionality/field_capabilities/field_caps.go +++ b/platform/functionality/field_capabilities/field_caps.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" + "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/elasticsearch" "github.com/QuesmaOrg/quesma/platform/elasticsearch/elasticsearch_field_types" @@ -47,6 +48,8 @@ func handleFieldCapsIndex(cfg map[string]config.IndexConfiguration, schemaRegist schemas := schemaRegistry.AllSchemas() + indexes = common_table.FilterCommonTableIndex(indexes) + for _, resolvedIndex := range indexes { if len(resolvedIndex) == 0 { continue diff --git a/platform/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go b/platform/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go index 2c21dd524..c206e5a56 100644 --- a/platform/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go +++ b/platform/processors/es_to_ch_query/elasticsearch_to_clickhouse_query_processor.go @@ -124,7 +124,7 @@ func (p *ElasticsearchToClickHouseQueryProcessor) Handle(metadata map[string]int if indexNotInConfig { return p.routeToElasticsearch(metadata, req) } - res, err := frontend_connectors.HandleGetIndexMapping(p.queryRunner.GetSchemaRegistry(), indexPattern) + res, err := frontend_connectors.HandleGetIndexMapping(ctx, p.queryRunner.GetSchemaRegistry(), p.queryRunner.GetLogManager(), indexPattern) if err != nil { return metadata, nil, err } diff --git a/platform/table_resolver/rules.go b/platform/table_resolver/rules.go index 91d319a03..9d1c7f29b 100644 --- a/platform/table_resolver/rules.go +++ b/platform/table_resolver/rules.go @@ -122,7 +122,8 @@ func makeDefaultWildcard(quesmaConf config.QuesmaConfiguration, pipeline string) switch pipeline { case quesma_api.IngestPipeline: targets = quesmaConf.DefaultIngestTarget - case quesma_api.QueryPipeline: + + case quesma_api.QueryPipeline, quesma_api.MetaPipeline: targets = quesmaConf.DefaultQueryTarget default: return &quesma_api.Decision{ @@ -215,7 +216,7 @@ func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfi &quesma_api.ConnectorDecisionElastic{}}, } - case quesma_api.QueryPipeline: + case quesma_api.QueryPipeline, quesma_api.MetaPipeline: if targets[0] == config.ClickhouseTarget && targets[1] == config.ElasticsearchTarget { @@ -319,7 +320,15 @@ func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexC } } -func mergeUseConnectors(lhs []quesma_api.ConnectorDecision, rhs []quesma_api.ConnectorDecision, rhsIndexName string) ([]quesma_api.ConnectorDecision, *quesma_api.Decision) { +type basicDecisionMerger struct { + checkIfMatchingDifferentTables bool +} + +func (b *basicDecisionMerger) name() string { + return "basicDecisionMerger" +} + +func (b *basicDecisionMerger) mergeUseConnectors(lhs []quesma_api.ConnectorDecision, rhs []quesma_api.ConnectorDecision, rhsIndexName string) ([]quesma_api.ConnectorDecision, *quesma_api.Decision) { for _, connDecisionRhs := range rhs { foundMatching := false for _, connDecisionLhs := range lhs { @@ -330,29 +339,33 @@ func mergeUseConnectors(lhs []quesma_api.ConnectorDecision, rhs []quesma_api.Con } if rhsClickhouse, ok := connDecisionRhs.(*quesma_api.ConnectorDecisionClickhouse); ok { if lhsClickhouse, ok := connDecisionLhs.(*quesma_api.ConnectorDecisionClickhouse); ok { - if lhsClickhouse.ClickhouseTableName != rhsClickhouse.ClickhouseTableName { - return nil, &quesma_api.Decision{ - Reason: "Incompatible decisions for two indexes - they use a different ClickHouse table", - Err: fmt.Errorf("incompatible decisions for two indexes (different ClickHouse table) - %s and %s", connDecisionRhs, connDecisionLhs), - } - } - if lhsClickhouse.IsCommonTable { - if !rhsClickhouse.IsCommonTable { + + if b.checkIfMatchingDifferentTables { + if lhsClickhouse.ClickhouseTableName != rhsClickhouse.ClickhouseTableName { return nil, &quesma_api.Decision{ - Reason: "Incompatible decisions for two indexes - one uses the common table, the other does not", - Err: fmt.Errorf("incompatible decisions for two indexes (common table usage) - %s and %s", connDecisionRhs, connDecisionLhs), + Reason: "Incompatible decisions for two indexes - they use a different ClickHouse table", + Err: fmt.Errorf("incompatible decisions for two indexes (different ClickHouse table) - %s and %s", connDecisionRhs, connDecisionLhs), } } - lhsClickhouse.ClickhouseIndexes = append(lhsClickhouse.ClickhouseIndexes, rhsClickhouse.ClickhouseIndexes...) - lhsClickhouse.ClickhouseIndexes = util.Distinct(lhsClickhouse.ClickhouseIndexes) - } else { - if !reflect.DeepEqual(lhsClickhouse, rhsClickhouse) { - return nil, &quesma_api.Decision{ - Reason: "Incompatible decisions for two indexes - they use ClickHouse tables differently", - Err: fmt.Errorf("incompatible decisions for two indexes (different usage of ClickHouse) - %s and %s", connDecisionRhs, connDecisionLhs), + if lhsClickhouse.IsCommonTable { + if !rhsClickhouse.IsCommonTable { + return nil, &quesma_api.Decision{ + Reason: "Incompatible decisions for two indexes - one uses the common table, the other does not", + Err: fmt.Errorf("incompatible decisions for two indexes (common table usage) - %s and %s", connDecisionRhs, connDecisionLhs), + } + } + lhsClickhouse.ClickhouseIndexes = append(lhsClickhouse.ClickhouseIndexes, rhsClickhouse.ClickhouseIndexes...) + lhsClickhouse.ClickhouseIndexes = util.Distinct(lhsClickhouse.ClickhouseIndexes) + } else { + if !reflect.DeepEqual(lhsClickhouse, rhsClickhouse) { + return nil, &quesma_api.Decision{ + Reason: "Incompatible decisions for two indexes - they use ClickHouse tables differently", + Err: fmt.Errorf("incompatible decisions for two indexes (different usage of ClickHouse) - %s and %s", connDecisionRhs, connDecisionLhs), + } } } } + foundMatching = true } } @@ -368,7 +381,7 @@ func mergeUseConnectors(lhs []quesma_api.ConnectorDecision, rhs []quesma_api.Con return lhs, nil } -func basicDecisionMerger(decisions []*quesma_api.Decision) *quesma_api.Decision { +func (b *basicDecisionMerger) merge(decisions []*quesma_api.Decision) *quesma_api.Decision { if len(decisions) == 0 { return &quesma_api.Decision{ IsEmpty: true, @@ -435,7 +448,7 @@ func basicDecisionMerger(decisions []*quesma_api.Decision) *quesma_api.Decision } } - newUseConnectors, mergeDecision := mergeUseConnectors(useConnectors, decision.UseConnectors, decision.IndexPattern) + newUseConnectors, mergeDecision := b.mergeUseConnectors(useConnectors, decision.UseConnectors, decision.IndexPattern) if mergeDecision != nil { return mergeDecision } diff --git a/platform/table_resolver/table_resolver.go b/platform/table_resolver/table_resolver.go index c771fa966..616227e7e 100644 --- a/platform/table_resolver/table_resolver.go +++ b/platform/table_resolver/table_resolver.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "github.com/QuesmaOrg/quesma/platform/clickhouse" + "github.com/QuesmaOrg/quesma/platform/common_table" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/elasticsearch" "github.com/QuesmaOrg/quesma/platform/logger" @@ -40,9 +41,9 @@ type basicResolver struct { resolver func(part string) *quesma_api.Decision } -type decisionMerger struct { - name string - merger func(decisions []*quesma_api.Decision) *quesma_api.Decision +type decisionMerger interface { + name() string + merge(decisions []*quesma_api.Decision) *quesma_api.Decision } // Compound resolver works in the following way: @@ -75,7 +76,7 @@ func (ir *compoundResolver) resolve(indexName string) *quesma_api.Decision { } } - return ir.decisionMerger.merger(decisions) + return ir.decisionMerger.merge(decisions) } // HACK: we should have separate config for each pipeline @@ -154,6 +155,9 @@ func (r *tableRegistryImpl) updateIndexes() { clickhouseIndexes := make(map[string]table) tableMap.Range(func(name string, tableDef *clickhouse.Table) bool { + if name == common_table.TableName { + return true + } clickhouseIndexes[name] = table{ name: name, isVirtual: tableDef.VirtualTable, @@ -314,10 +318,7 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous {"defaultWildcard", makeDefaultWildcard(quesmaConf, quesma_api.IngestPipeline)}, }, - decisionMerger: decisionMerger{ - name: "basicDecisionMerger", - merger: basicDecisionMerger, - }, + decisionMerger: &basicDecisionMerger{checkIfMatchingDifferentTables: true}, }, recentDecisions: make(map[string]*quesma_api.Decision), } @@ -343,15 +344,38 @@ func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhous // default action {"defaultWildcard", makeDefaultWildcard(quesmaConf, quesma_api.QueryPipeline)}, }, - decisionMerger: decisionMerger{ - name: "basicDecisionMerger", - merger: basicDecisionMerger, - }, + decisionMerger: &basicDecisionMerger{checkIfMatchingDifferentTables: true}, }, recentDecisions: make(map[string]*quesma_api.Decision), } res.pipelineResolvers[quesma_api.QueryPipeline] = queryResolver + + metaResolver := &pipelineResolver{ + pipelineName: quesma_api.MetaPipeline, + + resolver: &compoundResolver{ + patternSplitter: patternSplitter{ + name: "wildcardPatternSplitter", + resolver: res.wildcardPatternSplitter, + }, + decisionLadder: []basicResolver{ + // checking if we can handle the parsedPattern + {"kibanaInternal", resolveInternalElasticName}, + {"disabled", makeIsDisabledInConfig(indexConf, quesma_api.QueryPipeline)}, + + {"singleIndex", res.singleIndex(indexConf, quesma_api.QueryPipeline)}, + {"commonTable", res.makeCommonTableResolver(indexConf, quesma_api.QueryPipeline)}, + + // default action + {"defaultWildcard", makeDefaultWildcard(quesmaConf, quesma_api.QueryPipeline)}, + }, + decisionMerger: &basicDecisionMerger{checkIfMatchingDifferentTables: false}, + }, + recentDecisions: make(map[string]*quesma_api.Decision), + } + + res.pipelineResolvers[quesma_api.MetaPipeline] = metaResolver // update the state ASAP res.updateState() return res diff --git a/platform/v2/core/decision.go b/platform/v2/core/decision.go index c3b191d7d..7b15174d0 100644 --- a/platform/v2/core/decision.go +++ b/platform/v2/core/decision.go @@ -108,4 +108,5 @@ type PatternDecisions struct { const ( QueryPipeline = "Query" IngestPipeline = "Ingest" + MetaPipeline = "Meta" )