Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
runs-on: ubuntu-latest
needs: [build-quesma-docker-image, check-comment]
if: ${{ always() && (github.event_name != 'issue_comment' || needs.check-comment.result == 'success') }}
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
with:
Expand Down Expand Up @@ -105,7 +106,7 @@ jobs:
- name: Run integration tests
working-directory: ci/it
run: go test -v
run: go test -timeout 20m -v
- name: Send Slack notification on failure
if: ${{ failure() && github.event_name == 'push' && github.ref == 'refs/heads/main' }}
Expand Down
103 changes: 103 additions & 0 deletions ci/it/configs/quesma-common-table-and-regular-table.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
flags:
defaultStringColumnType: keyword
ingestStatistics: false

frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
disableAuth: true
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080

backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-data-source
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}

processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
first:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
second:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
third:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
"*":
useCommonTable: true
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
first:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
second:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
third:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
"*":
useCommonTable: true
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
pipelines:
- name: my-pipeline-elasticsearch-query-clickhouse
frontendConnectors: [elastic-query]
processors: [my-query-processor]
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
frontendConnectors: [elastic-ingest]
processors: [my-ingest-processor]
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func TestOnlyCommonTable(t *testing.T) {
testCase := testcases.NewOnlyCommonTableTestcase()
runIntegrationTest(t, testCase)
}

func TestCommonTableAndRegularTable(t *testing.T) {
testCase := testcases.NewCommonTableAndRegularTable()
runIntegrationTest(t, testCase)
}
167 changes: 167 additions & 0 deletions ci/it/testcases/test_common_table_and_regular_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"sort"
"testing"
)

type CommonTableAndRegularTable struct {
IntegrationTestcaseBase
}

func NewCommonTableAndRegularTable() *CommonTableAndRegularTable {
return &CommonTableAndRegularTable{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-common-table-and-regular-table.yml.template",
},
}
}

func (a *CommonTableAndRegularTable) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *CommonTableAndRegularTable) RunTests(ctx context.Context, t *testing.T) error {

t.Run("all mappings", func(t *testing.T) { a.mappingsAll(ctx, t) })

return nil
}

func (a *CommonTableAndRegularTable) mappingsAll(ctx context.Context, t *testing.T) {

fetchStarFieldCaps := func() map[string]any {

resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_field_caps", nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var fieldCaps map[string]any
if err := json.Unmarshal(body, &fieldCaps); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}

return fieldCaps
}

fetchFieldCaps := func(index string) {

resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_field_caps", index), nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var fieldCaps map[string]any
if err := json.Unmarshal(body, &fieldCaps); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
if len(fieldCaps) == 0 {
t.Fatalf("Expected field caps for index %s, got empty response", index)
}
}

checkFieldCaps := func(fieldCaps map[string]any, expectedIndexes []string) {

indicesAny, ok := fieldCaps["indices"].([]any)
if !ok {
t.Fatalf("Expected 'indices' to be a slice of strings, got: %T", fieldCaps["indices"])
}

indices := make([]string, len(indicesAny))
for i, index := range indicesAny {
indexStr, ok := index.(string)
if !ok {
t.Fatalf("Expected index to be a string, got: %T", index)
}
indices[i] = indexStr
}

assert.Equal(t, len(expectedIndexes), len(indices))

sort.Strings(indices)
sort.Strings(expectedIndexes)

for i, index := range expectedIndexes {
assert.Equal(t, index, indices[i], fmt.Sprintf("Index %s should exist in field caps", index))
}

}

fetchStarMapping := func() map[string]any {

resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_mapping", nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var mappings map[string]any
if err := json.Unmarshal(body, &mappings); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
return mappings
}

fetchMappings := func(index string) {
resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_mapping", index), nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var mappings map[string]any
if err := json.Unmarshal(body, &mappings); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
if len(mappings) == 0 {
t.Fatalf("Expected mappings for index %s, got empty response", index)
}
}

checkMappings := func(mappings map[string]any, expectedIndexes []string) {

assert.Equal(t, len(expectedIndexes), len(mappings))

for _, index := range expectedIndexes {
_, exists := mappings[index]
assert.True(t, exists, fmt.Sprintf("Index %s should exist in mappings", index))
}
}

expectedIndexes := []string{"first", "second", "third"} // explicitly defined indexes in the config

mappings := fetchStarMapping()
checkMappings(mappings, expectedIndexes)

fieldCaps := fetchStarFieldCaps()
checkFieldCaps(fieldCaps, expectedIndexes)

for _, index := range expectedIndexes {
fetchFieldCaps(index)
fetchMappings(index)
}

// add a new index (common table)

a.RequestToQuesma(ctx, t, "POST", "/go_to_common_table/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))

expectedIndexes = append(expectedIndexes, "go_to_common_table")
mappings = fetchStarMapping()
checkMappings(mappings, expectedIndexes)

fieldCaps = fetchStarFieldCaps()
checkFieldCaps(fieldCaps, expectedIndexes)

for _, index := range expectedIndexes {
fetchFieldCaps(index)
fetchMappings(index)
}
}
6 changes: 6 additions & 0 deletions http_requests/_mapping.http
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
GET http://localhost:8080/kibana_sample_data_ecommerce/_mapping

####
GET localhost:8080/*/_field_caps

###
GET localhost:8080/*/_mapping
12 changes: 12 additions & 0 deletions platform/common_table/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,3 +65,15 @@ type VirtualTable struct {
StoredAt string `json:"stored_at"`
Columns []VirtualTableColumn `json:"columns"`
}

// FilterCommonTableIndex filters out the common table index from the provided list of indexes
func FilterCommonTableIndex(indexes []string) []string {
// filter out common table index
var filtered []string
for _, index := range indexes {
if index != TableName {
filtered = append(filtered, index)
}
}
return filtered
}
26 changes: 20 additions & 6 deletions platform/frontend_connectors/es_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,34 @@ func putIndexResult(index string) (*quesma_api.Result, error) {
return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
}

func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
result := map[string]any{
index: map[string]any{
"mappings": mappings,
},
func getIndexMappingResults(mappings map[string]map[string]any) (*quesma_api.Result, error) {

result := make(map[string]any)

for index, mapping := range mappings {
result[index] = map[string]any{
"mappings": mapping,
}
}

serialized, err := json.Marshal(result)
if err != nil {
return nil, err
}

return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
}

func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {

// single index mapping result

allMappings := make(map[string]map[string]any)

allMappings[index] = mappings

return getIndexMappingResults(allMappings)
}

func getIndexResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
// For now return the same as getIndexMappingResult,
// but "GET /:index" can also contain "settings" and "aliases" (in the future)
Expand Down
Loading
Loading