Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
103 changes: 103 additions & 0 deletions ci/it/configs/quesma-common-table-and-regular-table.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
flags:
defaultStringColumnType: keyword
ingestStatistics: false

frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
disableAuth: true
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080

backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: my-clickhouse-data-source
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}

processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
first:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
second:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
third:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
"*":
useCommonTable: true
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
first:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
second:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
third:
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
"*":
useCommonTable: true
schemaOverrides:
fields:
"message":
type: text
target:
- my-clickhouse-data-source
pipelines:
- name: my-pipeline-elasticsearch-query-clickhouse
frontendConnectors: [elastic-query]
processors: [my-query-processor]
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
frontendConnectors: [elastic-ingest]
processors: [my-ingest-processor]
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,3 +91,8 @@ func TestOnlyCommonTable(t *testing.T) {
testCase := testcases.NewOnlyCommonTableTestcase()
runIntegrationTest(t, testCase)
}

func TestCommonTableAndRegularTable(t *testing.T) {
testCase := testcases.NewCommonTableAndRegularTable()
runIntegrationTest(t, testCase)
}
167 changes: 167 additions & 0 deletions ci/it/testcases/test_common_table_and_regular_table.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package testcases

import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"sort"
"testing"
)

type CommonTableAndRegularTable struct {
IntegrationTestcaseBase
}

func NewCommonTableAndRegularTable() *CommonTableAndRegularTable {
return &CommonTableAndRegularTable{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-common-table-and-regular-table.yml.template",
},
}
}

func (a *CommonTableAndRegularTable) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *CommonTableAndRegularTable) RunTests(ctx context.Context, t *testing.T) error {

t.Run("all mappings", func(t *testing.T) { a.mappingsAll(ctx, t) })

return nil
}

func (a *CommonTableAndRegularTable) mappingsAll(ctx context.Context, t *testing.T) {

fetchStarFieldCaps := func() map[string]any {

resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_field_caps", nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var fieldCaps map[string]any
if err := json.Unmarshal(body, &fieldCaps); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}

return fieldCaps
}

fetchFieldCaps := func(index string) {

resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_field_caps", index), nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var fieldCaps map[string]any
if err := json.Unmarshal(body, &fieldCaps); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
if len(fieldCaps) == 0 {
t.Fatalf("Expected field caps for index %s, got empty response", index)
}
}

checkFieldCaps := func(fieldCaps map[string]any, expectedIndexes []string) {

indicesAny, ok := fieldCaps["indices"].([]any)
if !ok {
t.Fatalf("Expected 'indices' to be a slice of strings, got: %T", fieldCaps["indices"])
}

indices := make([]string, len(indicesAny))
for i, index := range indicesAny {
indexStr, ok := index.(string)
if !ok {
t.Fatalf("Expected index to be a string, got: %T", index)
}
indices[i] = indexStr
}

assert.Equal(t, len(expectedIndexes), len(indices))

sort.Strings(indices)
sort.Strings(expectedIndexes)

for i, index := range expectedIndexes {
assert.Equal(t, index, indices[i], fmt.Sprintf("Index %s should exist in field caps", index))
}

}

fetchStarMapping := func() map[string]any {

resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_mapping", nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var mappings map[string]any
if err := json.Unmarshal(body, &mappings); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
return mappings
}

fetchMappings := func(index string) {
resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_mapping", index), nil)
if resp.StatusCode != 200 {
t.Fatalf("Failed to fetch mappings: %s", body)
}

var mappings map[string]any
if err := json.Unmarshal(body, &mappings); err != nil {
t.Fatalf("Failed to unmarshal mappings: %v", err)
}
if len(mappings) == 0 {
t.Fatalf("Expected mappings for index %s, got empty response", index)
}
}

checkMappings := func(mappings map[string]any, expectedIndexes []string) {

assert.Equal(t, len(expectedIndexes), len(mappings))

for _, index := range expectedIndexes {
_, exists := mappings[index]
assert.True(t, exists, fmt.Sprintf("Index %s should exist in mappings", index))
}
}

expectedIndexes := []string{"first", "second", "third"} // explicitly defined indexes in the config

mappings := fetchStarMapping()
checkMappings(mappings, expectedIndexes)

fieldCaps := fetchStarFieldCaps()
checkFieldCaps(fieldCaps, expectedIndexes)

for _, index := range expectedIndexes {
fetchFieldCaps(index)
fetchMappings(index)
}

// add a new index (common table)

a.RequestToQuesma(ctx, t, "POST", "/go_to_common_table/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))

expectedIndexes = append(expectedIndexes, "go_to_common_table")
mappings = fetchStarMapping()
checkMappings(mappings, expectedIndexes)

fieldCaps = fetchStarFieldCaps()
checkFieldCaps(fieldCaps, expectedIndexes)

for _, index := range expectedIndexes {
fetchFieldCaps(index)
fetchMappings(index)
}
}
6 changes: 6 additions & 0 deletions http_requests/_mapping.http
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
GET http://localhost:8080/kibana_sample_data_ecommerce/_mapping

####
GET localhost:8080/*/_field_caps

###
GET localhost:8080/*/_mapping
26 changes: 20 additions & 6 deletions platform/frontend_connectors/es_responses.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,20 +183,34 @@ func putIndexResult(index string) (*quesma_api.Result, error) {
return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
}

func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
result := map[string]any{
index: map[string]any{
"mappings": mappings,
},
func getIndexMappingResults(mappings map[string]map[string]any) (*quesma_api.Result, error) {

result := make(map[string]any)

for index, mapping := range mappings {
result[index] = map[string]any{
"mappings": mapping,
}
}

serialized, err := json.Marshal(result)
if err != nil {
return nil, err
}

return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
}

func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {

// single index mapping result

allMappings := make(map[string]map[string]any)

allMappings[index] = mappings

return getIndexMappingResults(allMappings)
}

func getIndexResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
// For now return the same as getIndexMappingResult,
// but "GET /:index" can also contain "settings" and "aliases" (in the future)
Expand Down
43 changes: 36 additions & 7 deletions platform/frontend_connectors/route_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"fmt"
"github.com/QuesmaOrg/quesma/platform/backend_connectors"
"github.com/QuesmaOrg/quesma/platform/clickhouse"
"github.com/QuesmaOrg/quesma/platform/common_table"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/elasticsearch"
quesma_errors "github.com/QuesmaOrg/quesma/platform/errors"
Expand Down Expand Up @@ -177,16 +178,44 @@ func HandleIndexRefresh() (*quesma_api.Result, error) {
return ElasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil
}

func HandleGetIndexMapping(sr schema.Registry, index string) (*quesma_api.Result, error) {
foundSchema, found := sr.FindSchema(schema.IndexName(index))
if !found {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
func HandleGetIndexMapping(ctx context.Context, sr schema.Registry, lm clickhouse.LogManagerIFace, index string) (*quesma_api.Result, error) {

indexes, err := lm.ResolveIndexPattern(ctx, sr, index)
if err != nil {
return nil, err
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)
allMappings := make(map[string]map[string]any)

var filteredIndexes []string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same as below, it would be better to filter out common_table in ResolveIndexPattern

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

for _, idx := range indexes {

if idx == common_table.TableName {
// Skip the common table, it is not a real index
continue
}
filteredIndexes = append(filteredIndexes, idx)
}
indexes = filteredIndexes

for _, resolvedIndex := range indexes {

foundSchema, found := sr.FindSchema(schema.IndexName(resolvedIndex))
if !found {
continue
}

hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema)
mappings := elasticsearch.GenerateMappings(hierarchicalSchema)

allMappings[resolvedIndex] = mappings
}

if len(allMappings) == 0 {
return &quesma_api.Result{StatusCode: http.StatusNotFound, GenericResult: make([]byte, 0)}, nil
}

return getIndexMappingResult(index, mappings)
return getIndexMappingResults(allMappings)
}

func HandlePitStore(indexPattern string) (*quesma_api.Result, error) {
Expand Down
Loading
Loading