diff --git a/ci/it/configs/quesma-index-name-rewrite.yml.template b/ci/it/configs/quesma-index-name-rewrite.yml.template new file mode 100644 index 000000000..aa8d4abac --- /dev/null +++ b/ci/it/configs/quesma-index-name-rewrite.yml.template @@ -0,0 +1,66 @@ +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: e + type: elasticsearch + config: + url: "http://{{ .elasticsearch_host }}:{{ .elasticsearch_port }}" + user: elastic + password: quesmaquesma + - name: c + type: clickhouse-os + config: + url: clickhouse://{{ .clickhouse_host }}:{{ .clickhouse_port }} +logging: + path: "logs" + level: "info" + disableFileLogging: false + enableSQLTracing: true +processors: + - name: QP + type: quesma-v1-processor-query + config: + useCommonTable: true + indexes: + + "*": + target: + - c + - name: IP + type: quesma-v1-processor-ingest + config: + indexNameRewriteRules: + 0: + from: (.*?)(-\d{4}\.\d{2}\.\d{2})$ + to: "$1" + 1: + from: (.*?)(-\d{4}\.\d{2})$ + to: "$1" + 3: + from: (.*?)(.\d{4}-\d{2})$ + to: "$1" + 4: + from: (.*?)(.\d{4}-\d{2}\-\d{2})$ + to: "$1" + useCommonTable: true + indexes: + "*": + target: + - c + +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ QP ] + backendConnectors: [ e, c ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ IP ] + backendConnectors: [ e, c ] \ No newline at end of file diff --git a/ci/it/integration_test.go b/ci/it/integration_test.go index d2beab3c3..d6ca87842 100644 --- a/ci/it/integration_test.go +++ b/ci/it/integration_test.go @@ -71,3 +71,8 @@ func TestTableOverrideTestcase(t *testing.T) { testCase := testcases.NewOverrideTestcase() runIntegrationTest(t, testCase) } + +func TestIndexNameRewrite(t *testing.T) { + testCase := testcases.NewIndexNameRewriteTestcase() + runIntegrationTest(t, testCase) +} diff --git a/ci/it/testcases/test_index_name_rewrite.go b/ci/it/testcases/test_index_name_rewrite.go new file mode 100644 index 000000000..2575394c8 --- /dev/null +++ b/ci/it/testcases/test_index_name_rewrite.go @@ -0,0 +1,115 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +// This file contains integration tests for different ingest functionalities. +// This is a good place to add regression tests for ingest bugs. + +package testcases + +import ( + "context" + "encoding/json" + "fmt" + "github.com/stretchr/testify/assert" + "net/http" + "testing" +) + +type IndexNameRewriteTestcase struct { + IntegrationTestcaseBase +} + +func NewIndexNameRewriteTestcase() *IndexNameRewriteTestcase { + return &IndexNameRewriteTestcase{ + IntegrationTestcaseBase: IntegrationTestcaseBase{ + ConfigTemplate: "quesma-index-name-rewrite.yml.template", + }, + } +} + +func (a *IndexNameRewriteTestcase) SetupContainers(ctx context.Context) error { + containers, err := setupAllContainersWithCh(ctx, a.ConfigTemplate) + a.Containers = containers + return err +} + +func (a *IndexNameRewriteTestcase) RunTests(ctx context.Context, t *testing.T) error { + t.Run("test basic request", func(t *testing.T) { a.testBasicRequest(ctx, t) }) + + return nil +} + +func (a *IndexNameRewriteTestcase) testBasicRequest(ctx context.Context, t *testing.T) { + + testCases := []struct { + TestCaseName string `json:"name"` + IndexName string `json:"index_name"` + ExpectedIndexName string `json:"expected_index_name"` + }{ + { + TestCaseName: "1. plain index name", + IndexName: "foo", + ExpectedIndexName: "foo", + }, + { + TestCaseName: "2. index name with date", + IndexName: "foo.2001-01-01", + ExpectedIndexName: "foo", + }, + { + TestCaseName: "3. index name and month", + IndexName: "foo.2001-01", + ExpectedIndexName: "foo", + }, + { + TestCaseName: "3. index name with date and dashes", + IndexName: "foo-2001.01", + ExpectedIndexName: "foo", + }, + { + TestCaseName: "4. index name with date and dashes", + IndexName: "foo-2001.01.01", + ExpectedIndexName: "foo", + }, + { + TestCaseName: "5. index name not matching", + IndexName: "foo-not-matching", + ExpectedIndexName: "foo-not-matching", + }, + } + + for n, d := range testCases { + + data, err := json.Marshal(d) + if err != nil { + t.Fatalf("Failed to marshal test case %d: %s", n, err) + } + + resp, bodyBytes := a.RequestToQuesma(ctx, t, + "POST", fmt.Sprintf("/%s/_doc", d.IndexName), data) + + assert.Contains(t, string(bodyBytes), "created") + assert.Equal(t, http.StatusOK, resp.StatusCode) + assert.Equal(t, "Clickhouse", resp.Header.Get("X-Quesma-Source")) + assert.Equal(t, "Elasticsearch", resp.Header.Get("X-Elastic-Product")) + } + + rows, err := a.ExecuteClickHouseQuery(ctx, "select name, __quesma_index_name, expected_index_name from quesma_common_table where expected_index_name <> __quesma_index_name") + + defer rows.Close() + if err != nil { + t.Fatalf("Failed to execute ClickHouse query: %s", err) + } + + if rows.Next() { + var name *string + var expectedIndexName *string + var actualIndexName *string + + if err := rows.Scan(&name, &actualIndexName, &expectedIndexName); err != nil { + t.Fatalf("Failed to scan row: %s", err) + } + t.Fatalf("Expected index name does not match actual index. Test case: %s, actual index name: %s, expected index name: %s", *name, *actualIndexName, *expectedIndexName) + } + +} diff --git a/platform/config/config.go b/platform/config/config.go index 53901c6c4..93db79062 100644 --- a/platform/config/config.go +++ b/platform/config/config.go @@ -57,6 +57,7 @@ type QuesmaConfiguration struct { DefaultIngestOptimizers map[string]OptimizerConfiguration DefaultQueryOptimizers map[string]OptimizerConfiguration MapFieldsDiscoveringEnabled bool + IndexNameRewriteRules []IndexNameRewriteRule // rules for rewriting index names, e.g. "index_name" -> "index_name_v2" DefaultStringColumnType string } @@ -270,8 +271,7 @@ Quesma Configuration: UseCommonTableForWildcard: %t, DefaultIngestTarget: %v, DefaultQueryTarget: %v, - MapFieldsDiscoveringEnabled: %t, - DefaultStringColumnType: %s + MapFieldsDiscoveringEnabled: %t `, c.TransparentProxy, elasticUrl, @@ -294,7 +294,6 @@ Quesma Configuration: c.DefaultIngestTarget, c.DefaultQueryTarget, c.MapFieldsDiscoveringEnabled, - c.DefaultStringColumnType, ) } diff --git a/platform/config/config_v2.go b/platform/config/config_v2.go index a18ec8434..1d6675673 100644 --- a/platform/config/config_v2.go +++ b/platform/config/config_v2.go @@ -12,6 +12,7 @@ import ( "github.com/rs/zerolog" "log" "reflect" + "regexp" "slices" "strings" ) @@ -133,8 +134,15 @@ type ( IndexConfig IndicesConfigs `koanf:"indexes"` // DefaultTargetConnectorType is used in V2 code only DefaultTargetConnectorType string //it is not serialized to maintain configuration BWC, so it's basically just populated from '*' config in `config_v2.go` + + IndexNameRewriteRules map[string]IndexNameRewriteRule `koanf:"indexNameRewriteRules"` } IndicesConfigs map[string]IndexConfiguration + + IndexNameRewriteRule struct { + From string `koanf:"from"` // pattern to match + To string `koanf:"to"` // replacement string + } ) func (p *QuesmaProcessorConfig) IsFieldMapSyntaxEnabled(indexName string) bool { @@ -422,6 +430,18 @@ func (c *QuesmaNewConfiguration) definedProcessorNames() []string { return names } +func (c *QuesmaNewConfiguration) validateRewriteRules(rules map[string]IndexNameRewriteRule) error { + + for name, rule := range rules { + _, err := regexp.Compile(rule.From) + if err != nil { + return fmt.Errorf("index name rewrite rule '%s' has an invalid 'from' regex: %w", name, err) + } + } + + return nil +} + func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { if len(p.Name) == 0 { return fmt.Errorf("processor must have a non-empty name") @@ -440,12 +460,23 @@ func (c *QuesmaNewConfiguration) validateProcessor(p Processor) error { return fmt.Errorf("configuration of index %s must have at most two targets (query processor)", indexName) } } + + if p.Config.IndexNameRewriteRules != nil || len(p.Config.IndexNameRewriteRules) > 0 { + return fmt.Errorf("index name rewrite rules are not supported in query processor configuration, use the ingest processor for this purpose") + } + } else { if _, ok := indexConfig.Target.([]interface{}); ok { if len(indexConfig.Target.([]interface{})) > 2 { return fmt.Errorf("configuration of index %s must have at most two targets (ingest processor)", indexName) } } + + err := c.validateRewriteRules(p.Config.IndexNameRewriteRules) + if err != nil { + return err + } + } targets, errTarget := c.getTargetsExtendedConfig(indexConfig.Target) if errTarget != nil { diff --git a/platform/config/config_v2_test.go b/platform/config/config_v2_test.go index bded66ac8..7423967cc 100644 --- a/platform/config/config_v2_test.go +++ b/platform/config/config_v2_test.go @@ -360,6 +360,27 @@ func TestPartitionBy(t *testing.T) { assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy) } +func TestIndexNameRewriteRules(t *testing.T) { + + os.Setenv(configFileLocationEnvVar, "./test_configs/index_name_rewrite_rules.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + + assert.Equal(t, 4, len(legacyConf.IndexNameRewriteRules)) + + for _, rule := range legacyConf.IndexNameRewriteRules { + assert.Equal(t, "$1", rule.To) + } + + assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[0].From) + assert.Equal(t, "(.*?)(-\\d{4}\\.\\d{2})$", legacyConf.IndexNameRewriteRules[1].From) + assert.Equal(t, "(.*?)(.\\d{4}-\\d{2})$", legacyConf.IndexNameRewriteRules[2].From) + assert.Equal(t, "(.*?)(.\\d{4}-\\d{2}-\\d{2})$", legacyConf.IndexNameRewriteRules[3].From) // empty string means no rewrite rule +} + func TestStringColumnIsTextDefaultBehavior(t *testing.T) { os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml") cfg := LoadV2Config() diff --git a/platform/config/config_v2_util.go b/platform/config/config_v2_util.go index 9fc903d0f..25192f4e9 100644 --- a/platform/config/config_v2_util.go +++ b/platform/config/config_v2_util.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/hashicorp/go-multierror" "slices" + "sort" ) func (c *QuesmaConfiguration) translateAndAddSinglePipeline(confNew *QuesmaNewConfiguration, errAcc error) { @@ -344,6 +345,27 @@ func (c *QuesmaConfiguration) translateAndAddDualPipeline(confNew *QuesmaNewConf c.DefaultIngestOptimizers = nil } + if ingestProcessor.Config.IndexNameRewriteRules != nil { + + if len(ingestProcessor.Config.IndexNameRewriteRules) > 0 { + + var names []string + for name := range ingestProcessor.Config.IndexNameRewriteRules { + names = append(names, name) + } + + sort.Strings(names) + + var orderedRules []IndexNameRewriteRule + for _, name := range names { + if rule, ok := ingestProcessor.Config.IndexNameRewriteRules[name]; ok { + orderedRules = append(orderedRules, rule) + } + } + c.IndexNameRewriteRules = orderedRules + } + } + // safe to call per validation earlier if targts, ok := ingestProcessor.Config.IndexConfig[DefaultWildcardIndexName].Target.([]interface{}); ok { conn := confNew.GetBackendConnectorByName(targts[0].(string)) diff --git a/platform/config/test_configs/index_name_rewrite_rules.yaml b/platform/config/test_configs/index_name_rewrite_rules.yaml new file mode 100644 index 000000000..a6d239420 --- /dev/null +++ b/platform/config/test_configs/index_name_rewrite_rules.yaml @@ -0,0 +1,93 @@ +installationId: #HYDROLIX_REQUIRES_THIS +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: E + type: elasticsearch + config: + url: "http://elasticsearch:9200" + user: elastic + password: quesmaquesma + - name: C + type: clickhouse-os + config: + url: "clickhouse://clickhouse:9000" +ingestStatistics: true +processors: + - name: QP + type: quesma-v1-processor-query + config: + indexes: + logs-1: + target: + - E + logs-2: + target: + - E + logs-3: + target: + - C + - E + logs-4: + target: + - C: + useCommonTable: true + logs-5: + target: + "*": + target: + - E + + - name: IP + type: quesma-v1-processor-ingest + config: + indexNameRewriteRules: + 0: + from: (.*?)(-\d{4}\.\d{2}\.\d{2})$ + to: "$1" + 1: + from: (.*?)(-\d{4}\.\d{2})$ + to: "$1" + 3: + from: (.*?)(.\d{4}-\d{2})$ + to: "$1" + 4: + from: (.*?)(.\d{4}-\d{2}-\d{2})$ + to: "$1" + indexes: + logs-1: + target: + - E + logs-2: + target: + - E + logs-3: + target: + - C + - E + logs-4: + target: + - C: + useCommonTable: true + "*": + target: + - E + logs-5: + target: + +pipelines: + - name: my-elasticsearch-proxy-read + frontendConnectors: [ elastic-query ] + processors: [ QP ] + backendConnectors: [ E, C ] + - name: my-elasticsearch-proxy-write + frontendConnectors: [ elastic-ingest ] + processors: [ IP ] + backendConnectors: [ E, C ] \ No newline at end of file diff --git a/platform/functionality/bulk/bulk.go b/platform/functionality/bulk/bulk.go index 97fbf0f83..27d65c129 100644 --- a/platform/functionality/bulk/bulk.go +++ b/platform/functionality/bulk/bulk.go @@ -75,7 +75,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing // The returned results should be in the same order as the input request, however splitting the bulk might change the order. // Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk. - results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver) + results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, defaultIndex, bulk, maxBulkSize, tableResolver, ip.GetIndexNameRewriter()) if err != nil { return []BulkItem{}, err } @@ -118,7 +118,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing return nonEmptyResults, nil } -func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, maxBulkSize int, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) { +func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, maxBulkSize int, tableResolver table_resolver.TableResolver, rewriter ingest.IndexNameRewriter) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) { results := make([]BulkItem, maxBulkSize) clickhouseBulkEntries := make(map[string][]BulkRequestEntry, maxBulkSize) @@ -129,6 +129,8 @@ func SplitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, max index := op.GetIndex() operation := op.GetOperation() + index = rewriter.RewriteIndex(index) + entryWithResponse := BulkRequestEntry{ operation: operation, index: index, diff --git a/platform/functionality/bulk/bulk_test.go b/platform/functionality/bulk/bulk_test.go index dd024d5fe..08de46bf0 100644 --- a/platform/functionality/bulk/bulk_test.go +++ b/platform/functionality/bulk/bulk_test.go @@ -5,6 +5,7 @@ package bulk import ( "context" "github.com/QuesmaOrg/quesma/platform/config" + "github.com/QuesmaOrg/quesma/platform/ingest" "github.com/QuesmaOrg/quesma/platform/table_resolver" "github.com/QuesmaOrg/quesma/platform/types" "github.com/QuesmaOrg/quesma/platform/util" @@ -140,7 +141,7 @@ func TestSplitBulkSampleData(t *testing.T) { maxBulkSize := len(bulk) // first returned value here is a result of side effects (writes to ClickHouse and Elasticsearch) so it is not tested here - _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver) + _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver, &ingest.NoOpIndexNameRewriter{}) assert.NoError(t, err) assert.Len(t, clickhouseBulkEntries["kibana_sample_data_ecommerce"], 5) @@ -160,7 +161,7 @@ func TestSplitBulkDelete(t *testing.T) { maxBulkSize := len(bulk) // first returned value here is a result of side effects (writes to ClickHouse and Elasticsearch) so it is not tested here - _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver) + _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver, &ingest.NoOpIndexNameRewriter{}) assert.NoError(t, err) assert.Len(t, clickhouseBulkEntries, 0) @@ -188,7 +189,7 @@ func TestSplitBulkUpdatePayload(t *testing.T) { maxBulkSize := len(bulk) // first returned value here is a result of side effects (writes to ClickHouse and Elasticsearch) so it is not tested here - _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver) + _, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver, &ingest.NoOpIndexNameRewriter{}) assert.NoError(t, err) assert.Len(t, clickhouseBulkEntries, 0) @@ -216,7 +217,7 @@ func TestSplitBulkMixedPayload(t *testing.T) { } maxBulkSize := len(bulk) - results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver) + results, clickhouseBulkEntries, elasticRequestBody, elasticBulkEntries, err := SplitBulk(ctx, &defaultIndex, bulk, maxBulkSize, testTableResolver, &ingest.NoOpIndexNameRewriter{}) assert.NoError(t, err) assert.Len(t, results, maxBulkSize) diff --git a/platform/ingest/index_name_rewriter.go b/platform/ingest/index_name_rewriter.go new file mode 100644 index 000000000..233bbd105 --- /dev/null +++ b/platform/ingest/index_name_rewriter.go @@ -0,0 +1,74 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package ingest + +import ( + "fmt" + "github.com/QuesmaOrg/quesma/platform/config" + "github.com/QuesmaOrg/quesma/platform/logger" + "regexp" +) + +type IndexNameRewriter interface { + RewriteIndex(indexName string) string +} + +type NoOpIndexNameRewriter struct { +} + +func (n *NoOpIndexNameRewriter) RewriteIndex(indexName string) string { + // no-op rewriter, returns the index name as is + return indexName +} + +type rewriteRule struct { + Pattern *regexp.Regexp + Replacement string +} + +func (r *rewriteRule) String() string { + return fmt.Sprintf("RewriteRule `%s` -> `%s`", r.Pattern.String(), r.Replacement) +} + +type indexNameRegexpRewriter struct { + rules []rewriteRule +} + +func NewIndexNameRewriter(cfg *config.QuesmaConfiguration) IndexNameRewriter { + + if len(cfg.IndexNameRewriteRules) == 0 { + logger.Info().Msgf("No index name rewrite rules configured, using no-op rewriter") + // if no rewrite rules are configured, return a no-op rewriter + return &NoOpIndexNameRewriter{} + } + + var rules []rewriteRule + + for _, rule := range cfg.IndexNameRewriteRules { + if rule.From == "" || rule.To == "" { + continue // skip invalid rules + } + pattern, err := regexp.Compile(rule.From) + if err != nil { + logger.Error().Msgf("Unable to compile regexp for index name rewrite: %s", rule.From) + continue // skip invalid regex patterns + } + r := rewriteRule{ + Pattern: pattern, + Replacement: rule.To, + } + rules = append(rules, r) + logger.Info().Msgf("Added index name rewrite rule: %s", r.String()) + } + + return &indexNameRegexpRewriter{rules: rules} +} + +func (i *indexNameRegexpRewriter) RewriteIndex(indexName string) string { + + rewritten := indexName + for _, rule := range i.rules { + rewritten = rule.Pattern.ReplaceAllString(rewritten, rule.Replacement) + } + return rewritten +} diff --git a/platform/ingest/processor.go b/platform/ingest/processor.go index b5631e62c..b43b3e1bd 100644 --- a/platform/ingest/processor.go +++ b/platform/ingest/processor.go @@ -69,6 +69,8 @@ type ( virtualTableStorage persistence.JSONDatabase tableResolver table_resolver.TableResolver + indexNameRewriter IndexNameRewriter + errorLogCounter atomic.Int64 } TableMap = util.SyncMap[string, *chLib.Table] @@ -1062,9 +1064,14 @@ func (ip *IngestProcessor) Ping() error { return ip.chDb.Ping() } +func (ip *IngestProcessor) GetIndexNameRewriter() IndexNameRewriter { + return ip.indexNameRewriter +} + func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeClient diag.PhoneHomeClient, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor { ctx, cancel := context.WithCancel(context.Background()) - return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeClient: phoneHomeClient, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver} + indexRewriter := NewIndexNameRewriter(cfg) + return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeClient: phoneHomeClient, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver, indexNameRewriter: indexRewriter} } func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig {