Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 3e14761

Browse files
nablaonepdelewski
andauthored
Fix /*/_mapping and /*/_field_caps handling (#1475)
This PR fixes getting mappings and fields caps for all indices. Details: - Added * resolve in mapping endpoint, - Added 'Meta' pipeline in the table resolver. It doesn't check if we are trying to query multiple tables. It's not required while handling metadata (mappings, field caps) - Added e2e tests. --------- Co-authored-by: Przemek Delewski <[email protected]>
1 parent 26b8fa6 commit 3e14761

File tree

14 files changed

+421
-52
lines changed

14 files changed

+421
-52
lines changed

.github/workflows/integration-tests.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ jobs:
5252
runs-on: ubuntu-latest
5353
needs: [build-quesma-docker-image, check-comment]
5454
if: ${{ always() && (github.event_name != 'issue_comment' || needs.check-comment.result == 'success') }}
55+
timeout-minutes: 20
5556
steps:
5657
- uses: actions/checkout@v4
5758
with:
@@ -105,7 +106,7 @@ jobs:
105106
106107
- name: Run integration tests
107108
working-directory: ci/it
108-
run: go test -v
109+
run: go test -timeout 20m -v
109110
110111
- name: Send Slack notification on failure
111112
if: ${{ failure() && github.event_name == 'push' && github.ref == 'refs/heads/main' }}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
flags:
2+
defaultStringColumnType: keyword
3+
ingestStatistics: false
4+
5+
frontendConnectors:
6+
- name: elastic-ingest
7+
type: elasticsearch-fe-ingest
8+
config:
9+
listenPort: 8080
10+
disableAuth: true
11+
- name: elastic-query
12+
type: elasticsearch-fe-query
13+
config:
14+
listenPort: 8080
15+
16+
backendConnectors:
17+
- name: my-minimal-elasticsearch
18+
type: elasticsearch
19+
config:
20+
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
21+
user: elastic
22+
password: quesmaquesma
23+
- name: my-clickhouse-data-source
24+
type: clickhouse-os
25+
config:
26+
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
27+
28+
processors:
29+
- name: my-query-processor
30+
type: quesma-v1-processor-query
31+
config:
32+
indexes:
33+
first:
34+
schemaOverrides:
35+
fields:
36+
"message":
37+
type: text
38+
target:
39+
- my-clickhouse-data-source
40+
second:
41+
schemaOverrides:
42+
fields:
43+
"message":
44+
type: text
45+
target:
46+
- my-clickhouse-data-source
47+
third:
48+
schemaOverrides:
49+
fields:
50+
"message":
51+
type: text
52+
target:
53+
- my-clickhouse-data-source
54+
"*":
55+
useCommonTable: true
56+
schemaOverrides:
57+
fields:
58+
"message":
59+
type: text
60+
target:
61+
- my-clickhouse-data-source
62+
- name: my-ingest-processor
63+
type: quesma-v1-processor-ingest
64+
config:
65+
indexes:
66+
first:
67+
schemaOverrides:
68+
fields:
69+
"message":
70+
type: text
71+
target:
72+
- my-clickhouse-data-source
73+
second:
74+
schemaOverrides:
75+
fields:
76+
"message":
77+
type: text
78+
target:
79+
- my-clickhouse-data-source
80+
third:
81+
schemaOverrides:
82+
fields:
83+
"message":
84+
type: text
85+
target:
86+
- my-clickhouse-data-source
87+
"*":
88+
useCommonTable: true
89+
schemaOverrides:
90+
fields:
91+
"message":
92+
type: text
93+
target:
94+
- my-clickhouse-data-source
95+
pipelines:
96+
- name: my-pipeline-elasticsearch-query-clickhouse
97+
frontendConnectors: [elastic-query]
98+
processors: [my-query-processor]
99+
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]
100+
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
101+
frontendConnectors: [elastic-ingest]
102+
processors: [my-ingest-processor]
103+
backendConnectors: [my-minimal-elasticsearch, my-clickhouse-data-source]

ci/it/integration_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,3 +91,8 @@ func TestOnlyCommonTable(t *testing.T) {
9191
testCase := testcases.NewOnlyCommonTableTestcase()
9292
runIntegrationTest(t, testCase)
9393
}
94+
95+
func TestCommonTableAndRegularTable(t *testing.T) {
96+
testCase := testcases.NewCommonTableAndRegularTable()
97+
runIntegrationTest(t, testCase)
98+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
package testcases
5+
6+
import (
7+
"context"
8+
"encoding/json"
9+
"fmt"
10+
"github.com/stretchr/testify/assert"
11+
"sort"
12+
"testing"
13+
)
14+
15+
type CommonTableAndRegularTable struct {
16+
IntegrationTestcaseBase
17+
}
18+
19+
func NewCommonTableAndRegularTable() *CommonTableAndRegularTable {
20+
return &CommonTableAndRegularTable{
21+
IntegrationTestcaseBase: IntegrationTestcaseBase{
22+
ConfigTemplate: "quesma-common-table-and-regular-table.yml.template",
23+
},
24+
}
25+
}
26+
27+
func (a *CommonTableAndRegularTable) SetupContainers(ctx context.Context) error {
28+
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
29+
a.Containers = containers
30+
return err
31+
}
32+
33+
func (a *CommonTableAndRegularTable) RunTests(ctx context.Context, t *testing.T) error {
34+
35+
t.Run("all mappings", func(t *testing.T) { a.mappingsAll(ctx, t) })
36+
37+
return nil
38+
}
39+
40+
func (a *CommonTableAndRegularTable) mappingsAll(ctx context.Context, t *testing.T) {
41+
42+
fetchStarFieldCaps := func() map[string]any {
43+
44+
resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_field_caps", nil)
45+
if resp.StatusCode != 200 {
46+
t.Fatalf("Failed to fetch mappings: %s", body)
47+
}
48+
49+
var fieldCaps map[string]any
50+
if err := json.Unmarshal(body, &fieldCaps); err != nil {
51+
t.Fatalf("Failed to unmarshal mappings: %v", err)
52+
}
53+
54+
return fieldCaps
55+
}
56+
57+
fetchFieldCaps := func(index string) {
58+
59+
resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_field_caps", index), nil)
60+
if resp.StatusCode != 200 {
61+
t.Fatalf("Failed to fetch mappings: %s", body)
62+
}
63+
64+
var fieldCaps map[string]any
65+
if err := json.Unmarshal(body, &fieldCaps); err != nil {
66+
t.Fatalf("Failed to unmarshal mappings: %v", err)
67+
}
68+
if len(fieldCaps) == 0 {
69+
t.Fatalf("Expected field caps for index %s, got empty response", index)
70+
}
71+
}
72+
73+
checkFieldCaps := func(fieldCaps map[string]any, expectedIndexes []string) {
74+
75+
indicesAny, ok := fieldCaps["indices"].([]any)
76+
if !ok {
77+
t.Fatalf("Expected 'indices' to be a slice of strings, got: %T", fieldCaps["indices"])
78+
}
79+
80+
indices := make([]string, len(indicesAny))
81+
for i, index := range indicesAny {
82+
indexStr, ok := index.(string)
83+
if !ok {
84+
t.Fatalf("Expected index to be a string, got: %T", index)
85+
}
86+
indices[i] = indexStr
87+
}
88+
89+
assert.Equal(t, len(expectedIndexes), len(indices))
90+
91+
sort.Strings(indices)
92+
sort.Strings(expectedIndexes)
93+
94+
for i, index := range expectedIndexes {
95+
assert.Equal(t, index, indices[i], fmt.Sprintf("Index %s should exist in field caps", index))
96+
}
97+
98+
}
99+
100+
fetchStarMapping := func() map[string]any {
101+
102+
resp, body := a.RequestToQuesma(ctx, t, "GET", "/*/_mapping", nil)
103+
if resp.StatusCode != 200 {
104+
t.Fatalf("Failed to fetch mappings: %s", body)
105+
}
106+
107+
var mappings map[string]any
108+
if err := json.Unmarshal(body, &mappings); err != nil {
109+
t.Fatalf("Failed to unmarshal mappings: %v", err)
110+
}
111+
return mappings
112+
}
113+
114+
fetchMappings := func(index string) {
115+
resp, body := a.RequestToQuesma(ctx, t, "GET", fmt.Sprintf("/%s/_mapping", index), nil)
116+
if resp.StatusCode != 200 {
117+
t.Fatalf("Failed to fetch mappings: %s", body)
118+
}
119+
120+
var mappings map[string]any
121+
if err := json.Unmarshal(body, &mappings); err != nil {
122+
t.Fatalf("Failed to unmarshal mappings: %v", err)
123+
}
124+
if len(mappings) == 0 {
125+
t.Fatalf("Expected mappings for index %s, got empty response", index)
126+
}
127+
}
128+
129+
checkMappings := func(mappings map[string]any, expectedIndexes []string) {
130+
131+
assert.Equal(t, len(expectedIndexes), len(mappings))
132+
133+
for _, index := range expectedIndexes {
134+
_, exists := mappings[index]
135+
assert.True(t, exists, fmt.Sprintf("Index %s should exist in mappings", index))
136+
}
137+
}
138+
139+
expectedIndexes := []string{"first", "second", "third"} // explicitly defined indexes in the config
140+
141+
mappings := fetchStarMapping()
142+
checkMappings(mappings, expectedIndexes)
143+
144+
fieldCaps := fetchStarFieldCaps()
145+
checkFieldCaps(fieldCaps, expectedIndexes)
146+
147+
for _, index := range expectedIndexes {
148+
fetchFieldCaps(index)
149+
fetchMappings(index)
150+
}
151+
152+
// add a new index (common table)
153+
154+
a.RequestToQuesma(ctx, t, "POST", "/go_to_common_table/_doc", []byte(`{"name": "Przemyslaw", "age": 31337}`))
155+
156+
expectedIndexes = append(expectedIndexes, "go_to_common_table")
157+
mappings = fetchStarMapping()
158+
checkMappings(mappings, expectedIndexes)
159+
160+
fieldCaps = fetchStarFieldCaps()
161+
checkFieldCaps(fieldCaps, expectedIndexes)
162+
163+
for _, index := range expectedIndexes {
164+
fetchFieldCaps(index)
165+
fetchMappings(index)
166+
}
167+
}

http_requests/_mapping.http

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,7 @@
11
GET http://localhost:8080/kibana_sample_data_ecommerce/_mapping
2+
3+
####
4+
GET localhost:8080/*/_field_caps
5+
6+
###
7+
GET localhost:8080/*/_mapping

platform/common_table/const.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,3 +65,15 @@ type VirtualTable struct {
6565
StoredAt string `json:"stored_at"`
6666
Columns []VirtualTableColumn `json:"columns"`
6767
}
68+
69+
// FilterCommonTableIndex filters out the common table index from the provided list of indexes
70+
func FilterCommonTableIndex(indexes []string) []string {
71+
// filter out common table index
72+
var filtered []string
73+
for _, index := range indexes {
74+
if index != TableName {
75+
filtered = append(filtered, index)
76+
}
77+
}
78+
return filtered
79+
}

platform/frontend_connectors/es_responses.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -183,20 +183,34 @@ func putIndexResult(index string) (*quesma_api.Result, error) {
183183
return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
184184
}
185185

186-
func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
187-
result := map[string]any{
188-
index: map[string]any{
189-
"mappings": mappings,
190-
},
186+
func getIndexMappingResults(mappings map[string]map[string]any) (*quesma_api.Result, error) {
187+
188+
result := make(map[string]any)
189+
190+
for index, mapping := range mappings {
191+
result[index] = map[string]any{
192+
"mappings": mapping,
193+
}
191194
}
195+
192196
serialized, err := json.Marshal(result)
193197
if err != nil {
194198
return nil, err
195199
}
196-
197200
return &quesma_api.Result{StatusCode: http.StatusOK, Body: string(serialized), GenericResult: serialized}, nil
198201
}
199202

203+
func getIndexMappingResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
204+
205+
// single index mapping result
206+
207+
allMappings := make(map[string]map[string]any)
208+
209+
allMappings[index] = mappings
210+
211+
return getIndexMappingResults(allMappings)
212+
}
213+
200214
func getIndexResult(index string, mappings map[string]any) (*quesma_api.Result, error) {
201215
// For now return the same as getIndexMappingResult,
202216
// but "GET /:index" can also contain "settings" and "aliases" (in the future)

0 commit comments

Comments
 (0)