Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 752cecc

Browse files
authored
Lack of schema refresh after new field in common table (#1461)
Quesma relies on `attributes` columns when it calculates the schema change on ingest. Table discovery didn't add these fields to virtual tables. Therefore, the schema of the index stored in the `common table` becomes immutable within 60 seconds.
1 parent 2506adb commit 752cecc

File tree

7 files changed

+173
-1
lines changed

7 files changed

+173
-1
lines changed
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
frontendConnectors:
2+
- name: elastic-ingest
3+
type: elasticsearch-fe-ingest
4+
config:
5+
listenPort: 8080
6+
- name: elastic-query
7+
type: elasticsearch-fe-query
8+
config:
9+
listenPort: 8080
10+
backendConnectors:
11+
- name: e
12+
type: elasticsearch
13+
config:
14+
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
15+
user: elastic
16+
password: quesmaquesma
17+
- name: c
18+
type: clickhouse-os
19+
config:
20+
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
21+
logging:
22+
path: "logs"
23+
level: "info"
24+
disableFileLogging: false
25+
enableSQLTracing: true
26+
processors:
27+
- name: QP
28+
type: quesma-v1-processor-query
29+
config:
30+
useCommonTable: true
31+
indexes:
32+
"*":
33+
target:
34+
- c
35+
- name: IP
36+
type: quesma-v1-processor-ingest
37+
config:
38+
useCommonTable: true
39+
indexes:
40+
"*":
41+
target:
42+
- c
43+
44+
pipelines:
45+
- name: my-elasticsearch-proxy-read
46+
frontendConnectors: [ elastic-query ]
47+
processors: [ QP ]
48+
backendConnectors: [ e, c ]
49+
- name: my-elasticsearch-proxy-write
50+
frontendConnectors: [ elastic-ingest ]
51+
processors: [ IP ]
52+
backendConnectors: [ e, c ]

ci/it/integration_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,8 @@ func TestSplitTimeRange(t *testing.T) {
8686
testCase := testcases.NewSplitTimeRangeTestcase()
8787
runIntegrationTest(t, testCase)
8888
}
89+
90+
func TestOnlyCommonTable(t *testing.T) {
91+
testCase := testcases.NewOnlyCommonTableTestcase()
92+
runIntegrationTest(t, testCase)
93+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
package testcases
5+
6+
import (
7+
"context"
8+
"fmt"
9+
"github.com/stretchr/testify/assert"
10+
"net/http"
11+
"testing"
12+
)
13+
14+
type OnlyCommonTableTestcase struct {
15+
IntegrationTestcaseBase
16+
}
17+
18+
func NewOnlyCommonTableTestcase() *OnlyCommonTableTestcase {
19+
return &OnlyCommonTableTestcase{
20+
IntegrationTestcaseBase: IntegrationTestcaseBase{
21+
ConfigTemplate: "quesma-only-common-table.yml.template",
22+
},
23+
}
24+
}
25+
26+
func (a *OnlyCommonTableTestcase) SetupContainers(ctx context.Context) error {
27+
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
28+
a.Containers = containers
29+
return err
30+
}
31+
32+
func (a *OnlyCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error {
33+
34+
t.Run("test alter virtual table", func(t *testing.T) { a.testAlterVirtualTable(ctx, t) })
35+
return nil
36+
}
37+
38+
func (a *OnlyCommonTableTestcase) testAlterVirtualTable(ctx context.Context, t *testing.T) {
39+
40+
reloadTables := func() {
41+
resp, body := a.RequestToQuesma(ctx, t, "POST", "/_quesma/reload-tables", nil)
42+
fmt.Println(string(body))
43+
assert.Equal(t, http.StatusOK, resp.StatusCode)
44+
}
45+
46+
resp, body := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
47+
fmt.Println(string(body))
48+
assert.Equal(t, http.StatusOK, resp.StatusCode)
49+
50+
reloadTables()
51+
52+
resp, body = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337, "this-is-a-new-field": "new-field"}`))
53+
fmt.Println(string(body))
54+
assert.Equal(t, http.StatusOK, resp.StatusCode)
55+
56+
fieldCapsQuery := `{"fields": [ "*" ]}`
57+
58+
_, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery))
59+
assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`)
60+
61+
reloadTables()
62+
63+
_, bodyBytes = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(fieldCapsQuery))
64+
fmt.Println(string(bodyBytes))
65+
assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`)
66+
67+
}

platform/clickhouse/table_discovery.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,11 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
427427
VirtualTable: resTable.virtualTable,
428428
ExistsOnAllNodes: resTable.existsOnAllNodes,
429429
}
430-
if containsAttributes(resTable.columnTypes) {
430+
431+
// We're adding default attributes to the virtual tables. We store virtual tables in the elastic as a list of essential column names.
432+
// Quesma heavily relies on the attributes when it alters schema on ingest (see processor.go)
433+
// If we don't add attributes to the virtual tables, virtual tables will be not altered on ingest.
434+
if containsAttributes(resTable.columnTypes) || resTable.virtualTable {
431435
table.Config.Attributes = []Attribute{NewDefaultStringAttribute()}
432436
}
433437

platform/frontend_connectors/router_v2.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -310,5 +310,16 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesm
310310
return &quesma_api.Result{Body: string(body), StatusCode: http.StatusOK, GenericResult: body}, nil
311311
})
312312

313+
router.Register(routes.QuesmaReloadTablsPath, method("POST"), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
314+
315+
lm.ReloadTables()
316+
317+
return &quesma_api.Result{
318+
Body: "Table reloaded successfully",
319+
StatusCode: http.StatusOK,
320+
GenericResult: []byte("Table reloaded successfully"),
321+
}, nil
322+
})
323+
313324
return router
314325
}

platform/persistence/elastic.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"github.com/QuesmaOrg/quesma/platform/config"
99
"github.com/QuesmaOrg/quesma/platform/elasticsearch"
10+
"github.com/QuesmaOrg/quesma/platform/logger"
1011
"github.com/QuesmaOrg/quesma/platform/types"
1112
"github.com/goccy/go-json"
1213
"io"
@@ -34,6 +35,29 @@ func NewElasticJSONDatabase(cfg config.ElasticsearchConfiguration, indexName str
3435
}
3536
}
3637

38+
func (p *ElasticJSONDatabase) refresh() error {
39+
40+
elasticsearchURL := fmt.Sprintf("%s/_refresh", p.indexName)
41+
42+
resp, err := p.httpClient.Request(context.Background(), "POST", elasticsearchURL, nil)
43+
if err != nil {
44+
return err
45+
}
46+
resp.Body.Close()
47+
switch resp.StatusCode {
48+
case http.StatusOK, http.StatusAccepted:
49+
return nil
50+
default:
51+
52+
body, err := io.ReadAll(resp.Body)
53+
if err != nil {
54+
return err
55+
}
56+
logger.Error().Msgf("Failed to flush elastic index: %s", string(body))
57+
return fmt.Errorf("failed to flush elastic: %v", resp.Status)
58+
}
59+
}
60+
3761
func (p *ElasticJSONDatabase) Put(key string, data string) error {
3862

3963
elasticsearchURL := fmt.Sprintf("%s/_update/%s", p.indexName, key)
@@ -57,6 +81,14 @@ func (p *ElasticJSONDatabase) Put(key string, data string) error {
5781

5882
switch resp.StatusCode {
5983
case http.StatusCreated, http.StatusOK:
84+
85+
// We need to flush the index to make sure the data is available for search.
86+
err = p.refresh()
87+
if err != nil {
88+
log.Printf("Failed to flush elastic: %v", err)
89+
return err
90+
}
91+
6092
return nil
6193
default:
6294
respBody, err := io.ReadAll(resp.Body)

platform/v2/core/routes/paths.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ const (
3636
// Quesma internal paths
3737

3838
QuesmaTableResolverPath = "/:index/_quesma_table_resolver"
39+
QuesmaReloadTablsPath = "/_quesma/reload-tables"
3940
)
4041

4142
var notQueryPaths = []string{

0 commit comments

Comments
 (0)