Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit af689fa

Browse files
committed
WIP
1 parent 57e07b7 commit af689fa

File tree

6 files changed

+138
-3
lines changed

6 files changed

+138
-3
lines changed

platform/config/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ type QuesmaConfiguration struct {
5757
DefaultIngestOptimizers map[string]OptimizerConfiguration
5858
DefaultQueryOptimizers map[string]OptimizerConfiguration
5959
MapFieldsDiscoveringEnabled bool
60+
61+
IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2"
6062
}
6163

6264
func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration {

platform/config/config_v2.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/rs/zerolog"
1313
"log"
1414
"reflect"
15+
"regexp"
1516
"slices"
1617
"strings"
1718
)
@@ -126,8 +127,15 @@ type (
126127
IndexConfig IndicesConfigs `koanf:"indexes"`
127128
// DefaultTargetConnectorType is used in V2 code only
128129
DefaultTargetConnectorType string //it is not serialized to maintain configuration BWC, so it's basically just populated from '*' config in `config_v2.go`
130+
131+
IndexNameRewriteRules map[string]IndexNameRewriteRule `koanf:"indexNameRewriteRules"`
129132
}
130133
IndicesConfigs map[string]IndexConfiguration
134+
135+
IndexNameRewriteRule struct {
136+
From string `koanf:"from"` // pattern to match
137+
To string `koanf:"to"` // replacement string
138+
}
131139
)
132140

133141
func (p *QuesmaProcessorConfig) IsFieldMapSyntaxEnabled(indexName string) bool {
@@ -415,6 +423,18 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string {
415423
return names
416424
}
417425

426+
func (c *QuesmaNewConfiguration) validateRewriteRules(rules map[string]IndexNameRewriteRule) error {
427+
428+
for name, rule := range rules {
429+
_, err := regexp.Compile(rule.From)
430+
if err != nil {
431+
return fmt.Errorf("index name rewrite rule '%s' has an invalid 'from' regex: %w", name, err)
432+
}
433+
}
434+
435+
return nil
436+
}
437+
418438
func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
419439
if len(p.Name) == 0 {
420440
return fmt.Errorf("processor must have a non-empty name")
@@ -433,12 +453,23 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error {
433453
return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName)
434454
}
435455
}
456+
457+
if p.Config.IndexNameRewriteRules != nil || len(p.Config.IndexNameRewriteRules) > 0 {
458+
return fmt.Errorf("index name rewrite rules are not supported in query processor configuration, use the ingest processor for this purpose")
459+
}
460+
436461
} else {
437462
if _, ok := indexConfig.Target.([]interface{}); ok {
438463
if len(indexConfig.Target.([]interface{})) > 2 {
439464
return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName)
440465
}
441466
}
467+
468+
err := c.validateRewriteRules(p.Config.IndexNameRewriteRules)
469+
if err != nil {
470+
return err
471+
}
472+
442473
}
443474
targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target)
444475
if errTarget != nil {

platform/config/config_v2_util.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,25 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf
344344
c.DefaultIngestOptimizers = nil
345345
}
346346

347+
if ingestProcessor.Config.IndexNameRewriteRules != nil {
348+
349+
if len(ingestProcessor.Config.IndexNameRewriteRules) > 0 {
350+
351+
var names []string
352+
for name := range ingestProcessor.Config.IndexNameRewriteRules {
353+
names = append(names, name)
354+
}
355+
356+
orderedRules := make([]IndexNameRewriteRule, len(names))
357+
for _, name := range names {
358+
if rule, ok := ingestProcessor.Config.IndexNameRewriteRules[name]; ok {
359+
orderedRules = append(orderedRules, rule)
360+
}
361+
}
362+
c.IndexNameRewriteRules = orderedRules
363+
}
364+
}
365+
347366
// safe to call per validation earlier
348367
if targts, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok {
349368
conn := confNew.GetBackendConnectorByName(targts[0].(string))

platform/functionality/bulk/bulk.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
7575

7676
// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
7777
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
78-
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver)
78+
results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, ip.GetIndexNameRewriter())
7979
if err != nil {
8080
return []BulkItem{}, err
8181
}
@@ -118,7 +118,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
118118
return nonEmptyResults, nil
119119
}
120120

121-
func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, maxBulkSize int, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
121+
func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, maxBulkSize int, tableResolver table_resolver.TableResolver, rewriter ingest.IndexNameRewriter) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
122122
results := make([]BulkItem, maxBulkSize)
123123

124124
clickhouseBulkEntries := make(map[string][]BulkRequestEntry, maxBulkSize)
@@ -129,6 +129,8 @@ func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, max
129129
index := op.GetIndex()
130130
operation := op.GetOperation()
131131

132+
index = rewriter.RewriteIndex(index)
133+
132134
entryWithResponse := BulkRequestEntry{
133135
operation: operation,
134136
index: index,
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package ingest
4+
5+
import (
6+
"fmt"
7+
"github.com/QuesmaOrg/quesma/platform/config"
8+
"github.com/QuesmaOrg/quesma/platform/logger"
9+
"regexp"
10+
)
11+
12+
type IndexNameRewriter interface {
13+
RewriteIndex(indexName string) string
14+
}
15+
16+
type nopIndexNameRewriter struct {
17+
}
18+
19+
func (n *nopIndexNameRewriter) RewriteIndex(indexName string) string {
20+
// no-op rewriter, returns the index name as is
21+
return indexName
22+
}
23+
24+
type rewriteRule struct {
25+
Pattern *regexp.Regexp
26+
Replacement string
27+
}
28+
29+
func (r *rewriteRule) String() string {
30+
return fmt.Sprintf("RewriteRule `%s` -> `%s`", r.Pattern.String(), r.Replacement)
31+
}
32+
33+
type indexNameRegexpRewriter struct {
34+
rules []rewriteRule
35+
}
36+
37+
func NewIndexNameRewriter(cfg *config.QuesmaConfiguration) IndexNameRewriter {
38+
39+
if len(cfg.IndexNameRewriteRules) == 0 {
40+
logger.Info().Msgf("No index name rewrite rules configured, using no-op rewriter")
41+
// if no rewrite rules are configured, return a no-op rewriter
42+
return &nopIndexNameRewriter{}
43+
}
44+
45+
var rules []rewriteRule
46+
47+
for _, rule := range cfg.IndexNameRewriteRules {
48+
if rule.From == "" || rule.To == "" {
49+
continue // skip invalid rules
50+
}
51+
pattern, err := regexp.Compile(rule.From)
52+
if err != nil {
53+
logger.Error().Msgf("Unable to compile regexp for index name rewrite: %s", rule.From)
54+
continue // skip invalid regex patterns
55+
}
56+
r := rewriteRule{
57+
Pattern: pattern,
58+
Replacement: rule.To,
59+
}
60+
rules = append(rules, r)
61+
logger.Info().Msgf("Added index name rewrite rule: %s", r.String())
62+
}
63+
64+
return &indexNameRegexpRewriter{rules: rules}
65+
}
66+
67+
func (i *indexNameRegexpRewriter) RewriteIndex(indexName string) string {
68+
69+
rewritten := indexName
70+
for _, rule := range i.rules {
71+
rewritten = rule.Pattern.ReplaceAllString(rewritten, rule.Replacement)
72+
}
73+
return rewritten
74+
}

platform/ingest/processor.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,8 @@ type (
6969
virtualTableStorage persistence.JSONDatabase
7070
tableResolver table_resolver.TableResolver
7171

72+
indexNameRewriter IndexNameRewriter
73+
7274
errorLogCounter atomic.Int64
7375
}
7476
TableMap = util.SyncMap[string, *chLib.Table]
@@ -1062,9 +1064,14 @@ func (ip *IngestProcessor) Ping() error {
10621064
return ip.chDb.Ping()
10631065
}
10641066

1067+
func (ip *IngestProcessor) GetIndexNameRewriter() IndexNameRewriter {
1068+
return ip.indexNameRewriter
1069+
}
1070+
10651071
func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeClient diag.PhoneHomeClient, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor {
10661072
ctx, cancel := context.WithCancel(context.Background())
1067-
return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeClient: phoneHomeClient, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver}
1073+
indexRewriter := NewIndexNameRewriter(cfg)
1074+
return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeClient: phoneHomeClient, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver, indexNameRewriter: indexRewriter}
10681075
}
10691076

10701077
func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig {

0 commit comments

Comments
 (0)