Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 38b6652

Browse files
authored
Rewrite index names on ingest (#1448)
This PR adds options to rewrite index names on ingest. It's beneficial if we want all `daily` indices into single index. It is configured as a list of rules. These rules are applied in order (sorted names). - `from` contains a matching regexp - `to` part contains a match group name Rewriting is done at a very early stage of processing. Here is a example config: ``` - name: my-ingest-processor type: quesma-v1-processor-ingest config: indexNameRewriteRules: 0: from: (.*?)(-\d{4}\.\d{2}\.\d{2})$ to: "$1" 1: from: (.*?)(-\d{4}\.\d{2})$ to: "$1" 3: from: (.*?)(.\d{4}-\d{2})$ to: "$1" 4: from: (.*?)(.\d{4}-\d{2}-\d{2})$ to: "$1" ``` It will rewrite `logs-2023.01` into `logs`.
1 parent 2527ae6 commit 38b6652

File tree

12 files changed

+446
-10
lines changed

12 files changed

+446
-10
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
frontendConnectors:
2+
- name: elastic-ingest
3+
type: elasticsearch-fe-ingest
4+
config:
5+
listenPort: 8080
6+
- name: elastic-query
7+
type: elasticsearch-fe-query
8+
config:
9+
listenPort: 8080
10+
backendConnectors:
11+
- name: e
12+
type: elasticsearch
13+
config:
14+
url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}"
15+
user: elastic
16+
password: quesmaquesma
17+
- name: c
18+
type: clickhouse-os
19+
config:
20+
url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }}
21+
logging:
22+
path: "logs"
23+
level: "info"
24+
disableFileLogging: false
25+
enableSQLTracing: true
26+
processors:
27+
- name: QP
28+
type: quesma-v1-processor-query
29+
config:
30+
useCommonTable: true
31+
indexes:
32+
33+
"*":
34+
target:
35+
- c
36+
- name: IP
37+
type: quesma-v1-processor-ingest
38+
config:
39+
indexNameRewriteRules:
40+
0:
41+
from: (.*?)(-\d{4}\.\d{2}\.\d{2})$
42+
to: "$1"
43+
1:
44+
from: (.*?)(-\d{4}\.\d{2})$
45+
to: "$1"
46+
3:
47+
from: (.*?)(.\d{4}-\d{2})$
48+
to: "$1"
49+
4:
50+
from: (.*?)(.\d{4}-\d{2}\-\d{2})$
51+
to: "$1"
52+
useCommonTable: true
53+
indexes:
54+
"*":
55+
target:
56+
- c
57+
58+
pipelines:
59+
- name: my-elasticsearch-proxy-read
60+
frontendConnectors: [ elastic-query ]
61+
processors: [ QP ]
62+
backendConnectors: [ e, c ]
63+
- name: my-elasticsearch-proxy-write
64+
frontendConnectors: [ elastic-ingest ]
65+
processors: [ IP ]
66+
backendConnectors: [ e, c ]

ci/it/integration_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,3 +71,8 @@ func TestTableOverrideTestcase(t *testing.T) {
7171
testCase := testcases.NewOverrideTestcase()
7272
runIntegrationTest(t, testCase)
7373
}
74+
75+
func TestIndexNameRewrite(t *testing.T) {
76+
testCase := testcases.NewIndexNameRewriteTestcase()
77+
runIntegrationTest(t, testCase)
78+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
4+
// This file contains integration tests for different ingest functionalities.
5+
// This is a good place to add regression tests for ingest bugs.
6+
7+
package testcases
8+
9+
import (
10+
"context"
11+
"encoding/json"
12+
"fmt"
13+
"github.com/stretchr/testify/assert"
14+
"net/http"
15+
"testing"
16+
)
17+
18+
type IndexNameRewriteTestcase struct {
19+
IntegrationTestcaseBase
20+
}
21+
22+
func NewIndexNameRewriteTestcase() *IndexNameRewriteTestcase {
23+
return &IndexNameRewriteTestcase{
24+
IntegrationTestcaseBase: IntegrationTestcaseBase{
25+
ConfigTemplate: "quesma-index-name-rewrite.yml.template",
26+
},
27+
}
28+
}
29+
30+
func (a *IndexNameRewriteTestcase) SetupContainers(ctx context.Context) error {
31+
containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate)
32+
a.Containers = containers
33+
return err
34+
}
35+
36+
func (a *IndexNameRewriteTestcase) RunTests(ctx context.Context, t *testing.T) error {
37+
t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) })
38+
39+
return nil
40+
}
41+
42+
func (a *IndexNameRewriteTestcase) testBasicRequest(ctx context.Context, t *testing.T) {
43+
44+
testCases := []struct {
45+
TestCaseName string `json:"name"`
46+
IndexName string `json:"index_name"`
47+
ExpectedIndexName string `json:"expected_index_name"`
48+
}{
49+
{
50+
TestCaseName: "1. plain index name",
51+
IndexName: "foo",
52+
ExpectedIndexName: "foo",
53+
},
54+
{
55+
TestCaseName: "2. index name with date",
56+
IndexName: "foo.2001-01-01",
57+
ExpectedIndexName: "foo",
58+
},
59+
{
60+
TestCaseName: "3. index name and month",
61+
IndexName: "foo.2001-01",
62+
ExpectedIndexName: "foo",
63+
},
64+
{
65+
TestCaseName: "3. index name with date and dashes",
66+
IndexName: "foo-2001.01",
67+
ExpectedIndexName: "foo",
68+
},
69+
{
70+
TestCaseName: "4. index name with date and dashes",
71+
IndexName: "foo-2001.01.01",
72+
ExpectedIndexName: "foo",
73+
},
74+
{
75+
TestCaseName: "5. index name not matching",
76+
IndexName: "foo-not-matching",
77+
ExpectedIndexName: "foo-not-matching",
78+
},
79+
}
80+
81+
for n, d := range testCases {
82+
83+
data, err := json.Marshal(d)
84+
if err != nil {
85+
t.Fatalf("Failed to marshal test case %d: %s", n, err)
86+
}
87+
88+
resp, bodyBytes := a.RequestToQuesma(ctx, t,
89+
"POST", fmt.Sprintf("/%s/_doc", d.IndexName), data)
90+
91+
assert.Contains(t, string(bodyBytes), "created")
92+
assert.Equal(t, http.StatusOK, resp.StatusCode)
93+
assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source"))
94+
assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product"))
95+
}
96+
97+
rows, err := a.ExecuteClickHouseQuery(ctx, "select name, __quesma_index_name, expected_index_name from quesma_common_table where expected_index_name <> __quesma_index_name")
98+
99+
defer rows.Close()
100+
if err != nil {
101+
t.Fatalf("Failed to execute ClickHouse query: %s", err)
102+
}
103+
104+
if rows.Next() {
105+
var name *string
106+
var expectedIndexName *string
107+
var actualIndexName *string
108+
109+
if err := rows.Scan(&name, &actualIndexName, &expectedIndexName); err != nil {
110+
t.Fatalf("Failed to scan row: %s", err)
111+
}
112+
t.Fatalf("Expected index name does not match actual index. Test case: %s, actual index name: %s, expected index name: %s", *name, *actualIndexName, *expectedIndexName)
113+
}
114+
115+
}

platform/config/config.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ type QuesmaConfiguration struct {
5757
DefaultIngestOptimizers map[string]OptimizerConfiguration
5858
DefaultQueryOptimizers map[string]OptimizerConfiguration
5959
MapFieldsDiscoveringEnabled bool
60+
IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2"
6061
DefaultStringColumnType string
6162
}
6263

@@ -270,8 +271,7 @@ Quesma Configuration:
270271
UseCommonTableForWildcard: %t,
271272
DefaultIngestTarget: %v,
272273
DefaultQueryTarget: %v,
273-
MapFieldsDiscoveringEnabled: %t,
274-
DefaultStringColumnType: %s
274+
MapFieldsDiscoveringEnabled: %t
275275
`,
276276
c.TransparentProxy,
277277
elasticUrl,
@@ -294,7 +294,6 @@ Quesma Configuration:
294294
c.DefaultIngestTarget,
295295
c.DefaultQueryTarget,
296296
c.MapFieldsDiscoveringEnabled,
297-
c.DefaultStringColumnType,
298297
)
299298
}
300299

platform/config/config_v2.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/rs/zerolog"
1313
"log"
1414
"reflect"
15+
"regexp"
1516
"slices"
1617
"strings"
1718
)
@@ -133,8 +134,15 @@ type (
133134
IndexConfig IndicesConfigs `koanf:"indexes"`
134135
// DefaultTargetConnectorType is used in V2 code only
135136
DefaultTargetConnectorType string //it is not serialized to maintain configuration BWC, so it's basically just populated from '*' config in `config_v2.go`
137+
138+
IndexNameRewriteRules map[string]IndexNameRewriteRule `koanf:"indexNameRewriteRules"`
136139
}
137140
IndicesConfigs map[string]IndexConfiguration
141+
142+
IndexNameRewriteRule struct {
143+
From string `koanf:"from"` // pattern to match
144+
To string `koanf:"to"` // replacement string
145+
}
138146
)
139147

140148
func (p *QuesmaProcessorConfig) IsFieldMapSyntaxEnabled(indexName string) bool {
@@ -422,6 +430,18 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string {
422430
return names
423431
}
424432

433+
func (c *QuesmaNewConfiguration) validateRewriteRules(rules map[string]IndexNameRewriteRule) error {
434+
435+
for name, rule := range rules {
436+
_, err := regexp.Compile(rule.From)
437+
if err != nil {
438+
return fmt.Errorf("index name rewrite rule '%s' has an invalid 'from' regex: %w", name, err)
439+
}
440+
}
441+
442+
return nil
443+
}
444+
425445
func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
426446
if len(p.Name) == 0 {
427447
return fmt.Errorf("processor must have a non-empty name")
@@ -440,12 +460,23 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
440460
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
441461
}
442462
}
463+
464+
if p.Config.IndexNameRewriteRules != nil || len(p.Config.IndexNameRewriteRules) > 0 {
465+
return fmt.Errorf("index name rewrite rules are not supported in query processor configuration, use the ingest processor for this purpose")
466+
}
467+
443468
} else {
444469
if _, ok := indexConfig.Target.([]interface{}); ok {
445470
if len(indexConfig.Target.([]interface{})) > 2 {
446471
return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName)
447472
}
448473
}
474+
475+
err := c.validateRewriteRules(p.Config.IndexNameRewriteRules)
476+
if err != nil {
477+
return err
478+
}
479+
449480
}
450481
targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target)
451482
if errTarget != nil {

platform/config/config_v2_test.go

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -360,6 +360,27 @@ func TestPartitionBy(t *testing.T) {
360360
assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
361361
}
362362

363+
func TestIndexNameRewriteRules(t *testing.T) {
364+
365+
os.Setenv(configFileLocationEnvVar, "./test_configs/index_name_rewrite_rules.yaml")
366+
cfg := LoadV2Config()
367+
if err := cfg.Validate(); err != nil {
368+
t.Fatalf("error validating config: %v", err)
369+
}
370+
legacyConf := cfg.TranslateToLegacyConfig()
371+
372+
assert.Equal(t, 4, len(legacyConf.IndexNameRewriteRules))
373+
374+
for _, rule := range legacyConf.IndexNameRewriteRules {
375+
assert.Equal(t, "$1", rule.To)
376+
}
377+
378+
assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[0].From)
379+
assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[1].From)
380+
assert.Equal(t, "(.*?)(.\\d{4}-\\d{2})$", legacyConf.IndexNameRewriteRules[2].From)
381+
assert.Equal(t, "(.*?)(.\\d{4}-\\d{2}-\\d{2})$", legacyConf.IndexNameRewriteRules[3].From) // empty string means no rewrite rule
382+
}
383+
363384
func TestStringColumnIsTextDefaultBehavior(t *testing.T) {
364385
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
365386
cfg := LoadV2Config()

platform/config/config_v2_util.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"fmt"
77
"github.com/hashicorp/go-multierror"
88
"slices"
9+
"sort"
910
)
1011

1112
func (c *QuesmaConfiguration) translateAndAddSinglePipeline(confNew *QuesmaNewConfiguration, errAcc error) {
@@ -344,6 +345,27 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf
344345
c.DefaultIngestOptimizers = nil
345346
}
346347

348+
if ingestProcessor.Config.IndexNameRewriteRules != nil {
349+
350+
if len(ingestProcessor.Config.IndexNameRewriteRules) > 0 {
351+
352+
var names []string
353+
for name := range ingestProcessor.Config.IndexNameRewriteRules {
354+
names = append(names, name)
355+
}
356+
357+
sort.Strings(names)
358+
359+
var orderedRules []IndexNameRewriteRule
360+
for _, name := range names {
361+
if rule, ok := ingestProcessor.Config.IndexNameRewriteRules[name]; ok {
362+
orderedRules = append(orderedRules, rule)
363+
}
364+
}
365+
c.IndexNameRewriteRules = orderedRules
366+
}
367+
}
368+
347369
// safe to call per validation earlier
348370
if targts, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok {
349371
conn := confNew.GetBackendConnectorByName(targts[0].(string))

0 commit comments

Comments
 (0)