diff --git a/ci/it/configs/quesma-only-common-table.yml.template b/ci/it/configs/quesma-only-common-table.yml.template new file mode 100644 index 000000000..e639980bd --- /dev/null +++ b/ci/it/configs/quesma-only-common-table.yml.template @@ -0,0 +1,52 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: e + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: c + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +logging: + path: "logs" + level: "info" + disableFileLogging: false + enableSQLTracing: true +processors: + - name: QP + type: quesma-v1-processor-query + config: + useCommonTable: true + indexes: + "*": + target: + - c + - name: IP + type: quesma-v1-processor-ingest + config: + useCommonTable: true + indexes: + "*": + target: + - c + +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ QP ] + backendConnectors: [ e, c ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ IP ] + backendConnectors: [ e, c ] \ No newline at end of file diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index 7794470ef..2625abb46 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -81,3 +81,8 @@ func TestSplitTimeRange(t *testing.T) { testCase := testcases.NewSplitTimeRangeTestcase() runIntegrationTest(t, testCase) } + +func TestOnlyCommonTable(t *testing.T) { + testCase := testcases.NewOnlyCommonTableTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_only_common_table.go b/ci/it/testcases/test_only_common_table.go new file mode 100644 index 000000000..74d9ca036 --- /dev/null +++ b/ci/it/testcases/test_only_common_table.go @@ -0,0 +1,67 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package testcases + +import ( + "context" + "fmt" + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +type OnlyCommonTableTestcase struct { + IntegrationTestcaseBase +} + +func NewOnlyCommonTableTestcase() *OnlyCommonTableTestcase { + return &OnlyCommonTableTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-only-common-table.yml.template", + }, + } +} + +func (a *OnlyCommonTableTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + a.Containers = containers + return err +} + +func (a *OnlyCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error { + + t.Run("test alter virtual table", func(t *testing.T) { a.testAlterVirtualTable(ctx, t) }) + return nil +} + +func (a *OnlyCommonTableTestcase) testAlterVirtualTable(ctx context.Context, t *testing.T) { + + reloadTables := func() { + resp, body := a.RequestToQuesma(ctx, t, "POST", "/_quesma/reload-tables", nil) + fmt.Println(string(body)) + assert.Equal(t, http.StatusOK, resp.StatusCode) + } + + resp, body := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`)) + fmt.Println(string(body)) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + reloadTables() + + resp, body = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337, "this-is-a-new-field": "new-field"}`)) + fmt.Println(string(body)) + assert.Equal(t, http.StatusOK, resp.StatusCode) + + fieldCapsQuery := `{"fields": [ "*" ]}` + + _, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery)) + assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`) + + reloadTables() + + _, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery)) + fmt.Println(string(bodyBytes)) + assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`) + +} diff --git a/platform/clickhouse/table_discovery.go b/platform/clickhouse/table_discovery.go index 20164e9b2..4ce327c59 100644 --- a/platform/clickhouse/table_discovery.go +++ b/platform/clickhouse/table_discovery.go @@ -427,7 +427,11 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d VirtualTable: resTable.virtualTable, ExistsOnAllNodes: resTable.existsOnAllNodes, } - if containsAttributes(resTable.columnTypes) { + + // We're adding default attributes to the virtual tables. We store virtual tables in the elastic as a list of essential column names. + // Quesma heavily relies on the attributes when it alters schema on ingest (see processor.go) + // If we don't add attributes to the virtual tables, virtual tables will be not altered on ingest. + if containsAttributes(resTable.columnTypes) || resTable.virtualTable { table.Config.Attributes = []Attribute{NewDefaultStringAttribute()} } diff --git a/platform/frontend_connectors/router_v2.go b/platform/frontend_connectors/router_v2.go index 203ae09d7..0bf64c7c3 100644 --- a/platform/frontend_connectors/router_v2.go +++ b/platform/frontend_connectors/router_v2.go @@ -310,5 +310,16 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm return &quesma_api.Result{Body: string(body), StatusCode: http.StatusOK, GenericResult: body}, nil }) + router.Register(routes.QuesmaReloadTablsPath, method("POST"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) { + + lm.ReloadTables() + + return &quesma_api.Result{ + Body: "Table reloaded successfully", + StatusCode: http.StatusOK, + GenericResult: []byte("Table reloaded successfully"), + }, nil + }) + return router } diff --git a/platform/persistence/elastic.go b/platform/persistence/elastic.go index c422a9b27..414b446bf 100644 --- a/platform/persistence/elastic.go +++ b/platform/persistence/elastic.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/elasticsearch" + "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/types" "github.com/goccy/go-json" "io" @@ -34,6 +35,29 @@ func NewElasticJSONDatabase(cfg config.ElasticsearchConfiguration, indexName str } } +func (p *ElasticJSONDatabase) refresh() error { + + elasticsearchURL := fmt.Sprintf("%s/_refresh", p.indexName) + + resp, err := p.httpClient.Request(context.Background(), "POST", elasticsearchURL, nil) + if err != nil { + return err + } + resp.Body.Close() + switch resp.StatusCode { + case http.StatusOK, http.StatusAccepted: + return nil + default: + + body, err := io.ReadAll(resp.Body) + if err != nil { + return err + } + logger.Error().Msgf("Failed to flush elastic index: %s", string(body)) + return fmt.Errorf("failed to flush elastic: %v", resp.Status) + } +} + func (p *ElasticJSONDatabase) Put(key string, data string) error { elasticsearchURL := fmt.Sprintf("%s/_update/%s", p.indexName, key) @@ -57,6 +81,14 @@ func (p *ElasticJSONDatabase) Put(key string, data string) error { switch resp.StatusCode { case http.StatusCreated, http.StatusOK: + + // We need to flush the index to make sure the data is available for search. + err = p.refresh() + if err != nil { + log.Printf("Failed to flush elastic: %v", err) + return err + } + return nil default: respBody, err := io.ReadAll(resp.Body) diff --git a/platform/v2/core/routes/paths.go b/platform/v2/core/routes/paths.go index e17cc1088..f7921b121 100644 --- a/platform/v2/core/routes/paths.go +++ b/platform/v2/core/routes/paths.go @@ -36,6 +36,7 @@ const ( // Quesma internal paths QuesmaTableResolverPath = "/:index/_quesma_table_resolver" + QuesmaReloadTablsPath = "/_quesma/reload-tables" ) var notQueryPaths = []string{