Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 0326054

Browse files
authored
Default schema overrides (#1451)
This PR adds a method to apply a schema override to all indices. Here is a sample config: ``` "*": useCommonTable: true schemaOverrides: fields: "message": type: text target: - my-clickhouse-data-source ``` In this case, all indices will have a `message` field with type `text`. It means full-text search will be performed against that field.
1 parent e0bcceb commit 0326054

File tree

6 files changed

+332
-6
lines changed

6 files changed

+332
-6
lines changed
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
flags:
2+
defaultStringColumnType: keyword
3+
4+
5+
frontendConnectors:
6+
- name: elastic-ingest
7+
type: elasticsearch-fe-ingest
8+
config:
9+
listenPort: 8080
10+
- name: elastic-query
11+
type: elasticsearch-fe-query
12+
config:
13+
listenPort: 8080
14+
backendConnectors:
15+
- name: e
16+
type: elasticsearch
17+
config:
18+
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
19+
user: elastic
20+
password: quesmaquesma
21+
- name: c
22+
type: clickhouse-os
23+
config:
24+
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
25+
logging:
26+
path: "logs"
27+
level: "info"
28+
disableFileLogging: false
29+
enableSQLTracing: true
30+
processors:
31+
- name: QP
32+
type: quesma-v1-processor-query
33+
config:
34+
useCommonTable: true
35+
indexes:
36+
no-message-index:
37+
useCommonTable: true
38+
target:
39+
- c
40+
41+
"*":
42+
schemaOverrides:
43+
fields:
44+
"message":
45+
type: text
46+
"default_field_for_not_configured_index":
47+
type: keyword
48+
49+
useCommonTable: true
50+
target:
51+
- c
52+
- name: IP
53+
type: quesma-v1-processor-ingest
54+
config:
55+
indexNameRewriteRules:
56+
0:
57+
from: (.*?)(-\d{4}\.\d{2}\.\d{2})$
58+
to: "$1"
59+
1:
60+
from: (.*?)(-\d{4}\.\d{2})$
61+
to: "$1"
62+
3:
63+
from: (.*?)(.\d{4}-\d{2})$
64+
to: "$1"
65+
4:
66+
from: (.*?)(.\d{4}-\d{2}\-\d{2})$
67+
to: "$1"
68+
useCommonTable: true
69+
indexes:
70+
no-message-index:
71+
useCommonTable: true
72+
target:
73+
- c
74+
75+
"*":
76+
useCommonTable: true
77+
schemaOverrides:
78+
fields:
79+
"message":
80+
type: text
81+
"default_field_for_not_configured_index":
82+
type: keyword
83+
84+
target:
85+
- c
86+
87+
pipelines:
88+
- name: my-elasticsearch-proxy-read
89+
frontendConnectors: [ elastic-query ]
90+
processors: [ QP ]
91+
backendConnectors: [ e, c ]
92+
- name: my-elasticsearch-proxy-write
93+
frontendConnectors: [ elastic-ingest ]
94+
processors: [ IP ]
95+
backendConnectors: [ e, c ]

ci/it/integration_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,11 @@ func TestIndexNameRewrite(t *testing.T) {
7777
runIntegrationTest(t, testCase)
7878
}
7979

80+
func TestDefaultSchemaOverride(t *testing.T) {
81+
testCase := testcases.NewDefaultSchemaOverrideTestcase()
82+
runIntegrationTest(t, testCase)
83+
}
84+
8085
func TestSplitTimeRange(t *testing.T) {
8186
testCase := testcases.NewSplitTimeRangeTestcase()
8287
runIntegrationTest(t, testCase)
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
// This file contains integration tests for different ingest functionalities.
5+
// This is a good place to add regression tests for ingest bugs.
6+
7+
package testcases
8+
9+
import (
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"github.com/stretchr/testify/assert"
14+
"net/http"
15+
"testing"
16+
)
17+
18+
type DefaultSchemaOverrideTestcase struct {
19+
IntegrationTestcaseBase
20+
}
21+
22+
func NewDefaultSchemaOverrideTestcase() *DefaultSchemaOverrideTestcase {
23+
return &DefaultSchemaOverrideTestcase{
24+
IntegrationTestcaseBase: IntegrationTestcaseBase{
25+
ConfigTemplate: "quesma-default-schema-override.yml.template",
26+
},
27+
}
28+
}
29+
30+
func (a *DefaultSchemaOverrideTestcase) SetupContainers(ctx context.Context) error {
31+
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
32+
a.Containers = containers
33+
return err
34+
}
35+
36+
func (a *DefaultSchemaOverrideTestcase) RunTests(ctx context.Context, t *testing.T) error {
37+
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
38+
39+
return nil
40+
}
41+
42+
func (a *DefaultSchemaOverrideTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
43+
44+
testCases := []struct {
45+
testCaseName string
46+
47+
// ingest
48+
indexName string
49+
50+
hasDefaultField bool
51+
52+
// value of the doc
53+
message string
54+
55+
// query
56+
queryIndex string
57+
pattern string
58+
totalResults int
59+
}{
60+
{
61+
testCaseName: "1. plain index name",
62+
indexName: "foo",
63+
hasDefaultField: true,
64+
queryIndex: "foo",
65+
message: "This is first",
66+
pattern: "first",
67+
totalResults: 1,
68+
},
69+
{
70+
testCaseName: "2. plain index name with date",
71+
indexName: "foo.2023-10-01",
72+
hasDefaultField: true,
73+
queryIndex: "foo",
74+
message: "This is second",
75+
pattern: "second",
76+
totalResults: 1,
77+
},
78+
{
79+
testCaseName: "3. plain index name with date not matching ",
80+
indexName: "foo.2023-10-01",
81+
hasDefaultField: true,
82+
queryIndex: "foo",
83+
message: "This is third",
84+
pattern: "notmatching",
85+
totalResults: 0,
86+
},
87+
{
88+
testCaseName: "4. another index name with date",
89+
indexName: "anotherindex.2023-10-01",
90+
hasDefaultField: true,
91+
queryIndex: "anotherindex",
92+
message: "This is third",
93+
pattern: "third",
94+
totalResults: 1,
95+
},
96+
{
97+
testCaseName: "5. query all",
98+
indexName: "foo.2023-01",
99+
hasDefaultField: true,
100+
queryIndex: "foo,anotherindex",
101+
message: "This is fifth",
102+
pattern: "This",
103+
totalResults: 5,
104+
},
105+
{
106+
testCaseName: "6. no message index",
107+
indexName: "no-message-index",
108+
hasDefaultField: false,
109+
queryIndex: "no-message-index",
110+
message: "",
111+
pattern: "",
112+
totalResults: 0,
113+
},
114+
}
115+
116+
type Doc struct {
117+
IndexName string `json:"index_name"`
118+
Message *string `json:"message,omitempty"`
119+
}
120+
121+
// ingest all test cases
122+
for n, d := range testCases {
123+
124+
var doc Doc
125+
126+
doc.IndexName = d.indexName
127+
if d.message != "" {
128+
doc.Message = &d.message
129+
} else {
130+
doc.Message = nil // explicitly set to nil if no message
131+
}
132+
133+
data, err := json.Marshal(doc)
134+
if err != nil {
135+
t.Fatalf("Failed to marshal test case %d: %s", n, err)
136+
}
137+
138+
resp, bodyBytes := a.RequestToQuesma(ctx, t,
139+
"POST", fmt.Sprintf("/%s/_doc", d.indexName), data)
140+
141+
assert.Contains(t, string(bodyBytes), "created")
142+
assert.Equal(t, http.StatusOK, resp.StatusCode)
143+
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
144+
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
145+
}
146+
147+
// query all test cases
148+
149+
for _, d := range testCases {
150+
t.Run(d.testCaseName, func(t *testing.T) {
151+
// check field caps
152+
fmt.Println("Testing: ", d.queryIndex, d.testCaseName)
153+
q := `{
154+
"fields": [
155+
"*"
156+
]
157+
}`
158+
159+
_, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_field_caps", d.queryIndex), []byte(q))
160+
161+
defaulfFieldForNotConfiguredIndex := `default_field_for_not_configured_index`
162+
163+
if d.hasDefaultField {
164+
assert.Contains(t, string(bodyBytes), defaulfFieldForNotConfiguredIndex, "Index %s should have a default_field_for_not_configured_index field", d.queryIndex)
165+
} else {
166+
assert.NotContains(t, string(bodyBytes), defaulfFieldForNotConfiguredIndex, "Index %s should not have a default_field_for_not_configured_index field", d.queryIndex)
167+
}
168+
169+
if d.message == "" {
170+
assert.NotContains(t, string(bodyBytes), `"message":{"text"`, "Index %s should not have a message field", d.queryIndex)
171+
// don't check the rest of the fields if message is not present
172+
return
173+
} else {
174+
assert.Contains(t, string(bodyBytes), `"message":{"text"`, "Index %s should have a message field", d.queryIndex)
175+
}
176+
177+
// perform full-text search
178+
179+
fullTextQuery := fmt.Sprintf(`{"query": {"match": {"message": "%s"} }, "fields": ["message"], "_source": false }`, d.pattern)
180+
181+
_, bodyBytes = a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_search", d.queryIndex), []byte(fullTextQuery))
182+
183+
fmt.Println(string(bodyBytes))
184+
185+
type Total struct {
186+
Value int `json:"value"`
187+
Relation string `json:"relation"`
188+
}
189+
190+
type Hits struct {
191+
Total Total `json:"total"`
192+
}
193+
194+
type ElasticsearchResponse struct {
195+
Hits Hits `json:"hits"`
196+
}
197+
198+
var esResponse ElasticsearchResponse
199+
if err := json.Unmarshal(bodyBytes, &esResponse); err != nil {
200+
t.Fatalf("Failed to unmarshal response body: %s", err)
201+
}
202+
203+
if esResponse.Hits.Total.Value != d.totalResults {
204+
t.Fatalf("Expected %d results, got %d for test case %s", d.totalResults, esResponse.Hits.Total, d.testCaseName)
205+
}
206+
})
207+
}
208+
209+
}

platform/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,8 @@ type QuesmaConfiguration struct {
5959
MapFieldsDiscoveringEnabled bool
6060
IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2"
6161
DefaultStringColumnType string
62+
63+
DefaultSchemaOverrides *SchemaConfiguration
6264
}
6365

6466
func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration {

platform/config/config_v2_util.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,9 +55,11 @@ func (c *QuesmaConfiguration) translateAndAddSinglePipeline(confNew *QuesmaNewCo
5555
c.CreateCommonTable = true
5656
c.UseCommonTableForWildcard = true
5757
}
58+
5859
if defaultConfig.SchemaOverrides != nil {
59-
errAcc = multierror.Append(errAcc, fmt.Errorf("schema overrides of default index ('%s') are not currently supported (only supported in configuration of a specific index)", DefaultWildcardIndexName))
60+
c.DefaultSchemaOverrides = defaultConfig.SchemaOverrides
6061
}
62+
6163
if len(defaultConfig.QueryTarget) > 1 {
6264
errAcc = multierror.Append(errAcc, fmt.Errorf("the target configuration of default index ('%s') of query processor is not currently supported", DefaultWildcardIndexName))
6365
}
@@ -192,9 +194,11 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf
192194
c.UseCommonTableForWildcard = queryProcessor.Config.UseCommonTable
193195
}
194196
}
197+
195198
if defaultConfig.SchemaOverrides != nil {
196-
errAcc = multierror.Append(errAcc, fmt.Errorf("schema overrides of default index ('%s') are not currently supported (only supported in configuration of a specific index)", DefaultWildcardIndexName))
199+
c.DefaultSchemaOverrides = defaultConfig.SchemaOverrides
197200
}
201+
198202
if defaultConfig.UseCommonTable {
199203
// We set both flags to true here
200204
// as creating common table depends on the first one

0 commit comments

Comments
 (0)