Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions ci/it/configs/quesma-only-common-table.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: e
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: c
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
logging:
path: "logs"
level: "info"
disableFileLogging: false
enableSQLTracing: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
useCommonTable: true
indexes:
"*":
target:
- c
- name: IP
type: quesma-v1-processor-ingest
config:
useCommonTable: true
indexes:
"*":
target:
- c

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ e, c ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ e, c ]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,8 @@ func TestSplitTimeRange(t *testing.T) {
testCase := testcases.NewSplitTimeRangeTestcase()
runIntegrationTest(t, testCase)
}

func TestOnlyCommonTable(t *testing.T) {
testCase := testcases.NewOnlyCommonTableTestcase()
runIntegrationTest(t, testCase)
}
60 changes: 60 additions & 0 deletions ci/it/testcases/test_only_common_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
"time"
)

type OnlyCommonTableTestcase struct {
IntegrationTestcaseBase
}

func NewOnlyCommonTableTestcase() *OnlyCommonTableTestcase {
return &OnlyCommonTableTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-only-common-table.yml.template",
},
}
}

func (a *OnlyCommonTableTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *OnlyCommonTableTestcase) RunTests(ctx context.Context, t *testing.T) error {

t.Run("test alter virtual table", func(t *testing.T) { a.testAlterVirtualTable(ctx, t) })
return nil
}

func (a *OnlyCommonTableTestcase) testAlterVirtualTable(ctx context.Context, t *testing.T) {

resp, body := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
fmt.Println(string(body))
assert.Equal(t, http.StatusOK, resp.StatusCode)

// wait for internal processing, especially for the periodic task that updates the schema
time.Sleep(60 * time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, this is very tricky to add. Slow IT are hard to debug and takes a lot to some engineering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I know.

I will add an endpoint to reload tables.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the test. Added endpoint. And fixed "eventually consistent" index that keeps a list of virtual tables.


resp, body = a.RequestToQuesma(ctx, t, "POST", "/logs-6/_doc", []byte(`{"name": "Przemyslaw", "age": 31337, "this-is-a-new-field": "new-field"}`))
fmt.Println(string(body))
assert.Equal(t, http.StatusOK, resp.StatusCode)

q := `{ "fields": [ "*" ]}`

_, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/logs-6/_field_caps", []byte(q))

fmt.Println(string(bodyBytes))

assert.Contains(t, string(bodyBytes), `"this-is-a-new-field"`)

}
6 changes: 5 additions & 1 deletion platform/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,11 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
VirtualTable: resTable.virtualTable,
ExistsOnAllNodes: resTable.existsOnAllNodes,
}
if containsAttributes(resTable.columnTypes) {

// We're adding default attributes to the virtual tables. We store virtual tables in the elastic as a list of essential column names.
// Quesma heavily relies on the attributes when it alters schema on ingest (see processor.go)
// If we don't add attributes to the virtual tables, virtual tables will be not altered on ingest.
if containsAttributes(resTable.columnTypes) || resTable.virtualTable {
table.Config.Attributes = []Attribute{NewDefaultStringAttribute()}
}

Expand Down
Loading