Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions ci/it/configs/quesma-default-schema-override.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: e
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: c
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
logging:
path: "logs"
level: "info"
disableFileLogging: false
enableSQLTracing: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
useCommonTable: true
indexes:
"*":
schemaOverrides:
fields:
"message":
type: text
useCommonTable: true
target:
- c
- name: IP
type: quesma-v1-processor-ingest
config:
indexNameRewriteRules:
0:
from: (.*?)(-\d{4}\.\d{2}\.\d{2})$
to: "$1"
1:
from: (.*?)(-\d{4}\.\d{2})$
to: "$1"
3:
from: (.*?)(.\d{4}-\d{2})$
to: "$1"
4:
from: (.*?)(.\d{4}-\d{2}\-\d{2})$
to: "$1"
useCommonTable: true
indexes:
"*":
useCommonTable: true
schemaOverrides:
fields:
"message":
type: text
target:
- c

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ e, c ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ e, c ]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,3 +76,8 @@ func TestIndexNameRewrite(t *testing.T) {
testCase := testcases.NewIndexNameRewriteTestcase()
runIntegrationTest(t, testCase)
}

func TestDefaultSchemaOverride(t *testing.T) {
testCase := testcases.NewDefaultSchemaOverrideTestcase()
runIntegrationTest(t, testCase)
}
161 changes: 161 additions & 0 deletions ci/it/testcases/test_default_schema_override.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

// This file contains integration tests for different ingest functionalities.
// This is a good place to add regression tests for ingest bugs.

package testcases

import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

type DefaultSchemaOverrideTestcase struct {
IntegrationTestcaseBase
}

func NewDefaultSchemaOverrideTestcase() *DefaultSchemaOverrideTestcase {
return &DefaultSchemaOverrideTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-default-schema-override.yml.template",
},
}
}

func (a *DefaultSchemaOverrideTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *DefaultSchemaOverrideTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })

return nil
}

func (a *DefaultSchemaOverrideTestcase) testBasicRequest(ctx context.Context, t *testing.T) {

testCases := []struct {
TestCaseName string `json:"name"`

// ingest
IndexName string `json:"index_name"`

// value of the doc
Message string `json:"message"`

// query
QueryIndex string `json:"query_index"`
Pattern string `json:"pattern"`
TotalResults int `json:"total_results"`
}{
{
TestCaseName: "1. plain index name",
IndexName: "foo",
QueryIndex: "foo",
Message: "This is first",
Pattern: "first",
TotalResults: 1,
},
{
TestCaseName: "2. plain index name with date",
IndexName: "foo.2023-10-01",
QueryIndex: "foo",
Message: "This is second",
Pattern: "second",
TotalResults: 1,
},
{
TestCaseName: "3. plain index name with date not matching ",
IndexName: "foo.2023-10-01",
QueryIndex: "foo",
Message: "This is third",
Pattern: "notmatching",
TotalResults: 0,
},
{
TestCaseName: "4. another index name with date",
IndexName: "anotherindex.2023-10-01",
QueryIndex: "anotherindex",
Message: "This is third",
Pattern: "third",
TotalResults: 1,
},
{
TestCaseName: "5. query all",
IndexName: "foo.2023-01",
QueryIndex: "foo,anotherindex",
Message: "This is fifth",
Pattern: "This",
TotalResults: 5,
},
}

for n, d := range testCases {

data, err := json.Marshal(d)
if err != nil {
t.Fatalf("Failed to marshal test case %d: %s", n, err)
}

resp, bodyBytes := a.RequestToQuesma(ctx, t,
"POST", fmt.Sprintf("/%s/_doc", d.IndexName), data)

assert.Contains(t, string(bodyBytes), "created")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

for _, d := range testCases {
t.Run(d.TestCaseName, func(t *testing.T) {
// check field caps
fmt.Println("Testing: ", d.QueryIndex, d.TestCaseName)
q := `{
"fields": [
"*"
]
}`

_, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_field_caps", d.QueryIndex), []byte(q))
assert.Contains(t, string(bodyBytes), `"message":{"text"`)

// perform full-text search

fullTextQuery := fmt.Sprintf(`{"query": {"match": {"message": "%s"}}}`, d.Pattern)

_, bodyBytes = a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_search", d.QueryIndex), []byte(fullTextQuery))

fmt.Println(string(bodyBytes))

type Total struct {
Value int `json:"value"`
Relation string `json:"relation"`
}

type Hits struct {
Total Total `json:"total"`
}

type ElasticsearchResponse struct {
Hits Hits `json:"hits"`
}

var esResponse ElasticsearchResponse
if err := json.Unmarshal(bodyBytes, &esResponse); err != nil {
t.Fatalf("Failed to unmarshal response body: %s", err)
}

if esResponse.Hits.Total.Value != d.TotalResults {
t.Fatalf("Expected %d results, got %d for test case %s", d.TotalResults, esResponse.Hits.Total, d.TestCaseName)
}
})
}

}
19 changes: 18 additions & 1 deletion ci/it/testcases/test_reading_clickhouse_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,29 @@ func (a *ReadingClickHouseTablesIntegrationTestcase) testWildcardGoesToElastic(c
func (a *ReadingClickHouseTablesIntegrationTestcase) testIngestIsDisabled(ctx context.Context, t *testing.T) {
// There is no ingest pipeline, so Quesma should reject all ingest requests
for _, tt := range []string{"test_table", "extra_index"} {
t.Run(tt, func(t *testing.T) {
t.Run(tt+"_doc", func(t *testing.T) {
resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", fmt.Sprintf("/%s/_doc", tt), []byte(`{"name": "Piotr", "age": 11111}`))
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}

for _, tt := range []string{"test_table", "extra_index"} {
t.Run(tt+"_bulk", func(t *testing.T) {

bulkPayload := []byte(fmt.Sprintf(`
{ "index": { "_index": "%s", "_id": "1" } }
{ "name": "Alice", "age": 30 }
`, tt))

resp, bodyBytes := a.RequestToQuesma(ctx, t, "POST", "/_bulk", bulkPayload)
assert.Contains(t, string(bodyBytes), "index_closed_exception")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
})
}

}
2 changes: 2 additions & 0 deletions platform/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ type QuesmaConfiguration struct {
MapFieldsDiscoveringEnabled bool
IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2"
DefaultStringColumnType string

DefaultSchemaOverrides *SchemaConfiguration
}

func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration {
Expand Down
8 changes: 6 additions & 2 deletions platform/config/config_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func (c *QuesmaConfiguration) translateAndAddSinglePipeline(confNew *QuesmaNewCo
c.CreateCommonTable = true
c.UseCommonTableForWildcard = true
}

if defaultConfig.SchemaOverrides != nil {
errAcc = multierror.Append(errAcc, fmt.Errorf("schema overrides of default index ('%s') are not currently supported (only supported in configuration of a specific index)", DefaultWildcardIndexName))
c.DefaultSchemaOverrides = defaultConfig.SchemaOverrides
}

if len(defaultConfig.QueryTarget) > 1 {
errAcc = multierror.Append(errAcc, fmt.Errorf("the target configuration of default index ('%s') of query processor is not currently supported", DefaultWildcardIndexName))
}
Expand Down Expand Up @@ -192,9 +194,11 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf
c.UseCommonTableForWildcard = queryProcessor.Config.UseCommonTable
}
}

if defaultConfig.SchemaOverrides != nil {
errAcc = multierror.Append(errAcc, fmt.Errorf("schema overrides of default index ('%s') are not currently supported (only supported in configuration of a specific index)", DefaultWildcardIndexName))
c.DefaultSchemaOverrides = defaultConfig.SchemaOverrides
}

if defaultConfig.UseCommonTable {
// We set both flags to true here
// as creating common table depends on the first one
Expand Down
12 changes: 10 additions & 2 deletions platform/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,13 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing

// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, ip.GetIndexNameRewriter())

var indexNameRewriter ingest.IndexNameRewriter
if ip != nil {
indexNameRewriter = ip.GetIndexNameRewriter()
}

results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, indexNameRewriter)
if err != nil {
return []BulkItem{}, err
}
Expand Down Expand Up @@ -129,7 +135,9 @@ func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, max
index := op.GetIndex()
operation := op.GetOperation()

index = rewriter.RewriteIndex(index)
if rewriter != nil {
index = rewriter.RewriteIndex(index)
}

entryWithResponse := BulkRequestEntry{
operation: operation,
Expand Down
15 changes: 11 additions & 4 deletions platform/schema/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type (

// index configuration overrides always take precedence
indexConfiguration *map[string]config.IndexConfiguration
defaultSchemaOverrides *config.SchemaConfiguration
dataSourceTableProvider TableProvider
dataSourceTypeAdapter typeAdapter
dynamicConfiguration map[string]Table
Expand Down Expand Up @@ -149,6 +150,11 @@ func (s *schemaRegistry) loadSchemas() (map[IndexName]Schema, error) {
if s.dataSourceTableProvider.AutodiscoveryEnabled() {
for tableName, table := range definitions {
fields := make(map[FieldName]Field)

if s.defaultSchemaOverrides != nil {
s.populateSchemaFromStaticConfiguration(s.defaultSchemaOverrides, fields)
}

internalToPublicFieldsEncodings := s.getInternalToPublicFieldEncodings(tableName)
existsInDataSource := s.populateSchemaFromTableDefinition(definitions, tableName, fields, internalToPublicFieldsEncodings)
schemas[IndexName(tableName)] = NewSchema(fields, existsInDataSource, table.DatabaseName)
Expand All @@ -159,7 +165,7 @@ func (s *schemaRegistry) loadSchemas() (map[IndexName]Schema, error) {
fields := make(map[FieldName]Field)
aliases := make(map[FieldName]FieldName)
s.populateSchemaFromDynamicConfiguration(indexName, fields)
s.populateSchemaFromStaticConfiguration(indexConfiguration, fields)
s.populateSchemaFromStaticConfiguration(indexConfiguration.SchemaOverrides, fields)
internalToPublicFieldsEncodings := s.getInternalToPublicFieldEncodings(indexName)
tableName := indexConfiguration.TableName(indexName)
existsInDataSource := s.populateSchemaFromTableDefinition(definitions, tableName, fields, internalToPublicFieldsEncodings)
Expand Down Expand Up @@ -256,6 +262,7 @@ func (s *schemaRegistry) GetFieldEncodings() map[FieldEncodingKey]EncodedFieldNa
func NewSchemaRegistry(tableProvider TableProvider, configuration *config.QuesmaConfiguration, dataSourceTypeAdapter typeAdapter) Registry {
res := &schemaRegistry{
indexConfiguration: &configuration.IndexConfig,
defaultSchemaOverrides: configuration.DefaultSchemaOverrides,
dataSourceTableProvider: tableProvider,
dataSourceTypeAdapter: dataSourceTypeAdapter,
dynamicConfiguration: make(map[string]Table),
Expand All @@ -266,11 +273,11 @@ func NewSchemaRegistry(tableProvider TableProvider, configuration *config.Quesma
return res
}

func (s *schemaRegistry) populateSchemaFromStaticConfiguration(indexConfiguration config.IndexConfiguration, fields map[FieldName]Field) {
if indexConfiguration.SchemaOverrides == nil {
func (s *schemaRegistry) populateSchemaFromStaticConfiguration(schemaOverrides *config.SchemaConfiguration, fields map[FieldName]Field) {
if schemaOverrides == nil {
return
}
for fieldName, field := range indexConfiguration.SchemaOverrides.Fields {
for fieldName, field := range schemaOverrides.Fields {
if field.Type.AsString() == config.TypeAlias || field.Ignored {
continue
}
Expand Down
Loading