Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ci/it/configs/quesma-only-common-table.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: e
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: c
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
logging:
path: "logs"
level: "info"
disableFileLogging: false
enableSQLTracing: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
useCommonTable: true
indexes:
"*":
target:
- c
- name: IP
type: quesma-v1-processor-ingest
config:
useCommonTable: true
indexes:
"*":
target:
- c

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ e, c ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ e, c ]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ func TestSplitTimeRange(t *testing.T) {
testCase := testcases.NewSplitTimeRangeTestcase()
runIntegrationTest(t, testCase)
}

func TestOnlyCommonTable(t *testing.T) {
testCase := testcases.NewOnlyCommonTableTestcase()
runIntegrationTest(t, testCase)
}
67 changes: 67 additions & 0 deletions ci/it/testcases/test_only_common_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

type OnlyCommonTableTestcase struct {
IntegrationTestcaseBase
}

func NewOnlyCommonTableTestcase() *OnlyCommonTableTestcase {
return &OnlyCommonTableTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-only-common-table.yml.template",
},
}
}

func (a *OnlyCommonTableTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *OnlyCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error {

t.Run("test alter virtual table", func(t *testing.T) { a.testAlterVirtualTable(ctx, t) })
return nil
}

func (a *OnlyCommonTableTestcase) testAlterVirtualTable(ctx context.Context, t *testing.T) {

reloadTables := func() {
resp, body := a.RequestToQuesma(ctx, t, "POST", "/_quesma/reload-tables", nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should put that method under our admin port, not on ElasticSearch frontend.

As we are rushing with release fine to do in next PR.

fmt.Println(string(body))
assert.Equal(t, http.StatusOK, resp.StatusCode)
}

resp, body := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
fmt.Println(string(body))
assert.Equal(t, http.StatusOK, resp.StatusCode)

reloadTables()

resp, body = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337, "this-is-a-new-field": "new-field"}`))
fmt.Println(string(body))
assert.Equal(t, http.StatusOK, resp.StatusCode)

fieldCapsQuery := `{"fields": [ "*" ]}`

_, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery))
assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`)

reloadTables()

_, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery))
fmt.Println(string(bodyBytes))
assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`)

}
6 changes: 5 additions & 1 deletion platform/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,11 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
VirtualTable: resTable.virtualTable,
ExistsOnAllNodes: resTable.existsOnAllNodes,
}
if containsAttributes(resTable.columnTypes) {

// We're adding default attributes to the virtual tables. We store virtual tables in the elastic as a list of essential column names.
// Quesma heavily relies on the attributes when it alters schema on ingest (see processor.go)
// If we don't add attributes to the virtual tables, virtual tables will be not altered on ingest.
if containsAttributes(resTable.columnTypes) || resTable.virtualTable {
table.Config.Attributes = []Attribute{NewDefaultStringAttribute()}
}

Expand Down
11 changes: 11 additions & 0 deletions platform/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,5 +310,16 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
return &quesma_api.Result{Body: string(body), StatusCode: http.StatusOK, GenericResult: body}, nil
})

router.Register(routes.QuesmaReloadTablsPath, method("POST"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above, please move this to admin interface.


lm.ReloadTables()

return &quesma_api.Result{
Body: "Table reloaded successfully",
StatusCode: http.StatusOK,
GenericResult: []byte("Table reloaded successfully"),
}, nil
})

return router
}
32 changes: 32 additions & 0 deletions platform/persistence/elastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/elasticsearch"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/types"
"github.com/goccy/go-json"
"io"
Expand Down Expand Up @@ -34,6 +35,29 @@ func NewElasticJSONDatabase(cfg config.ElasticsearchConfiguration, indexName str
}
}

func (p *ElasticJSONDatabase) refresh() error {

elasticsearchURL := fmt.Sprintf("%s/_refresh", p.indexName)

resp, err := p.httpClient.Request(context.Background(), "POST", elasticsearchURL, nil)
if err != nil {
return err
}
resp.Body.Close()
switch resp.StatusCode {
case http.StatusOK, http.StatusAccepted:
return nil
default:

body, err := io.ReadAll(resp.Body)
if err != nil {
return err
}
logger.Error().Msgf("Failed to flush elastic index: %s", string(body))
return fmt.Errorf("failed to flush elastic: %v", resp.Status)
}
}

func (p *ElasticJSONDatabase) Put(key string, data string) error {

elasticsearchURL := fmt.Sprintf("%s/_update/%s", p.indexName, key)
Expand All @@ -57,6 +81,14 @@ func (p *ElasticJSONDatabase) Put(key string, data string) error {

switch resp.StatusCode {
case http.StatusCreated, http.StatusOK:

// We need to flush the index to make sure the data is available for search.
err = p.refresh()
if err != nil {
log.Printf("Failed to flush elastic: %v", err)
return err
}

return nil
default:
respBody, err := io.ReadAll(resp.Body)
Expand Down
1 change: 1 addition & 0 deletions platform/v2/core/routes/paths.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ const (
// Quesma internal paths

QuesmaTableResolverPath = "/:index/_quesma_table_resolver"
QuesmaReloadTablsPath = "/_quesma/reload-tables"
)

var notQueryPaths = []string{
Expand Down
Loading