Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions ci/it/configs/quesma-index-name-rewrite.yml.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: e
type: elasticsearch
config:
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
user: elastic
password: quesmaquesma
- name: c
type: clickhouse-os
config:
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
logging:
path: "logs"
level: "info"
disableFileLogging: false
enableSQLTracing: true
processors:
- name: QP
type: quesma-v1-processor-query
config:
useCommonTable: true
indexes:

"*":
target:
- c
- name: IP
type: quesma-v1-processor-ingest
config:
indexNameRewriteRules:
0:
from: (.*?)(-\d{4}\.\d{2}\.\d{2})$
to: "$1"
1:
from: (.*?)(-\d{4}\.\d{2})$
to: "$1"
3:
from: (.*?)(.\d{4}-\d{2})$
to: "$1"
4:
from: (.*?)(.\d{4}-\d{2}\-\d{2})$
to: "$1"
useCommonTable: true
indexes:
"*":
target:
- c

pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ QP ]
backendConnectors: [ e, c ]
- name: my-elasticsearch-proxy-write
frontendConnectors: [ elastic-ingest ]
processors: [ IP ]
backendConnectors: [ e, c ]
5 changes: 5 additions & 0 deletions ci/it/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,8 @@ func TestTableOverrideTestcase(t *testing.T) {
testCase := testcases.NewOverrideTestcase()
runIntegrationTest(t, testCase)
}

func TestIndexNameRewrite(t *testing.T) {
testCase := testcases.NewIndexNameRewriteTestcase()
runIntegrationTest(t, testCase)
}
115 changes: 115 additions & 0 deletions ci/it/testcases/test_index_name_rewrite.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

// This file contains integration tests for different ingest functionalities.
// This is a good place to add regression tests for ingest bugs.

package testcases

import (
"context"
"encoding/json"
"fmt"
"github.com/stretchr/testify/assert"
"net/http"
"testing"
)

type IndexNameRewriteTestcase struct {
IntegrationTestcaseBase
}

func NewIndexNameRewriteTestcase() *IndexNameRewriteTestcase {
return &IndexNameRewriteTestcase{
IntegrationTestcaseBase: IntegrationTestcaseBase{
ConfigTemplate: "quesma-index-name-rewrite.yml.template",
},
}
}

func (a *IndexNameRewriteTestcase) SetupContainers(ctx context.Context) error {
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
a.Containers = containers
return err
}

func (a *IndexNameRewriteTestcase) RunTests(ctx context.Context, t *testing.T) error {
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })

return nil
}

func (a *IndexNameRewriteTestcase) testBasicRequest(ctx context.Context, t *testing.T) {

testCases := []struct {
TestCaseName string `json:"name"`
IndexName string `json:"index_name"`
ExpectedIndexName string `json:"expected_index_name"`
}{
{
TestCaseName: "1. plain index name",
IndexName: "foo",
ExpectedIndexName: "foo",
},
{
TestCaseName: "2. index name with date",
IndexName: "foo.2001-01-01",
ExpectedIndexName: "foo",
},
{
TestCaseName: "3. index name and month",
IndexName: "foo.2001-01",
ExpectedIndexName: "foo",
},
{
TestCaseName: "3. index name with date and dashes",
IndexName: "foo-2001.01",
ExpectedIndexName: "foo",
},
{
TestCaseName: "4. index name with date and dashes",
IndexName: "foo-2001.01.01",
ExpectedIndexName: "foo",
},
{
TestCaseName: "5. index name not matching",
IndexName: "foo-not-matching",
ExpectedIndexName: "foo-not-matching",
},
}

for n, d := range testCases {

data, err := json.Marshal(d)
if err != nil {
t.Fatalf("Failed to marshal test case %d: %s", n, err)
}

resp, bodyBytes := a.RequestToQuesma(ctx, t,
"POST", fmt.Sprintf("/%s/_doc", d.IndexName), data)

assert.Contains(t, string(bodyBytes), "created")
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
}

rows, err := a.ExecuteClickHouseQuery(ctx, "select name, __quesma_index_name, expected_index_name from quesma_common_table where expected_index_name <> __quesma_index_name")

defer rows.Close()
if err != nil {
t.Fatalf("Failed to execute ClickHouse query: %s", err)
}

if rows.Next() {
var name *string
var expectedIndexName *string
var actualIndexName *string

if err := rows.Scan(&name, &actualIndexName, &expectedIndexName); err != nil {
t.Fatalf("Failed to scan row: %s", err)
}
t.Fatalf("Expected index name does not match actual index. Test case: %s, actual index name: %s, expected index name: %s", *name, *actualIndexName, *expectedIndexName)
}

}
5 changes: 2 additions & 3 deletions platform/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type QuesmaConfiguration struct {
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
MapFieldsDiscoveringEnabled bool
IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2"
DefaultStringColumnType string
}

Expand Down Expand Up @@ -270,8 +271,7 @@ Quesma Configuration:
UseCommonTableForWildcard: %t,
DefaultIngestTarget: %v,
DefaultQueryTarget: %v,
MapFieldsDiscoveringEnabled: %t,
DefaultStringColumnType: %s
MapFieldsDiscoveringEnabled: %t
`,
c.TransparentProxy,
elasticUrl,
Expand All @@ -294,7 +294,6 @@ Quesma Configuration:
c.DefaultIngestTarget,
c.DefaultQueryTarget,
c.MapFieldsDiscoveringEnabled,
c.DefaultStringColumnType,
)
}

Expand Down
31 changes: 31 additions & 0 deletions platform/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/rs/zerolog"
"log"
"reflect"
"regexp"
"slices"
"strings"
)
Expand Down Expand Up @@ -133,8 +134,15 @@ type (
IndexConfig IndicesConfigs `koanf:"indexes"`
// DefaultTargetConnectorType is used in V2 code only
DefaultTargetConnectorType string //it is not serialized to maintain configuration BWC, so it's basically just populated from '*' config in `config_v2.go`

IndexNameRewriteRules map[string]IndexNameRewriteRule `koanf:"indexNameRewriteRules"`
}
IndicesConfigs map[string]IndexConfiguration

IndexNameRewriteRule struct {
From string `koanf:"from"` // pattern to match
To string `koanf:"to"` // replacement string
}
)

func (p *QuesmaProcessorConfig) IsFieldMapSyntaxEnabled(indexName string) bool {
Expand Down Expand Up @@ -422,6 +430,18 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string {
return names
}

func (c *QuesmaNewConfiguration) validateRewriteRules(rules map[string]IndexNameRewriteRule) error {

for name, rule := range rules {
_, err := regexp.Compile(rule.From)
if err != nil {
return fmt.Errorf("index name rewrite rule '%s' has an invalid 'from' regex: %w", name, err)
}
}

return nil
}

func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
if len(p.Name) == 0 {
return fmt.Errorf("processor must have a non-empty name")
Expand All @@ -440,12 +460,23 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
}
}

if p.Config.IndexNameRewriteRules != nil || len(p.Config.IndexNameRewriteRules) > 0 {
return fmt.Errorf("index name rewrite rules are not supported in query processor configuration, use the ingest processor for this purpose")
}

} else {
if _, ok := indexConfig.Target.([]interface{}); ok {
if len(indexConfig.Target.([]interface{})) > 2 {
return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName)
}
}

err := c.validateRewriteRules(p.Config.IndexNameRewriteRules)
if err != nil {
return err
}

}
targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target)
if errTarget != nil {
Expand Down
21 changes: 21 additions & 0 deletions platform/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,27 @@ func TestPartitionBy(t *testing.T) {
assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
}

func TestIndexNameRewriteRules(t *testing.T) {

os.Setenv(configFileLocationEnvVar, "./test_configs/index_name_rewrite_rules.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, 4, len(legacyConf.IndexNameRewriteRules))

for _, rule := range legacyConf.IndexNameRewriteRules {
assert.Equal(t, "$1", rule.To)
}

assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[0].From)
assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[1].From)
assert.Equal(t, "(.*?)(.\\d{4}-\\d{2})$", legacyConf.IndexNameRewriteRules[2].From)
assert.Equal(t, "(.*?)(.\\d{4}-\\d{2}-\\d{2})$", legacyConf.IndexNameRewriteRules[3].From) // empty string means no rewrite rule
}

func TestStringColumnIsTextDefaultBehavior(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
cfg := LoadV2Config()
Expand Down
22 changes: 22 additions & 0 deletions platform/config/config_v2_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"github.com/hashicorp/go-multierror"
"slices"
"sort"
)

func (c *QuesmaConfiguration) translateAndAddSinglePipeline(confNew *QuesmaNewConfiguration, errAcc error) {
Expand Down Expand Up @@ -344,6 +345,27 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf
c.DefaultIngestOptimizers = nil
}

if ingestProcessor.Config.IndexNameRewriteRules != nil {

if len(ingestProcessor.Config.IndexNameRewriteRules) > 0 {

var names []string
for name := range ingestProcessor.Config.IndexNameRewriteRules {
names = append(names, name)
}

sort.Strings(names)

var orderedRules []IndexNameRewriteRule
for _, name := range names {
if rule, ok := ingestProcessor.Config.IndexNameRewriteRules[name]; ok {
orderedRules = append(orderedRules, rule)
}
}
c.IndexNameRewriteRules = orderedRules
}
}

// safe to call per validation earlier
if targts, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok {
conn := confNew.GetBackendConnectorByName(targts[0].(string))
Expand Down
Loading
Loading