diff --git a/cmd/experimental/main.go b/cmd/experimental/main.go index 3f433d498..5a3dc33bd 100644 --- a/cmd/experimental/main.go +++ b/cmd/experimental/main.go @@ -165,7 +165,7 @@ func main() { virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName) tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage) - schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{}) + schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.DefaultStringColumnType)) schemaRegistry.Start() im := elasticsearch.NewIndexManagement(cfg.Elasticsearch) diff --git a/cmd/main.go b/cmd/main.go index bc8531259..f891609a5 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -88,7 +88,7 @@ func main() { virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName) tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage) - schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{}) + schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.DefaultStringColumnType)) schemaRegistry.Start() im := elasticsearch.NewIndexManagement(cfg.Elasticsearch) diff --git a/platform/clickhouse/type_adapter.go b/platform/clickhouse/type_adapter.go index 7d4171269..74d3aba9f 100644 --- a/platform/clickhouse/type_adapter.go +++ b/platform/clickhouse/type_adapter.go @@ -3,11 +3,20 @@ package clickhouse import ( + "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/schema" "strings" ) type SchemaTypeAdapter struct { + defaultStringColumnType string +} + +func NewSchemaTypeAdapter(defaultType string) SchemaTypeAdapter { + + return SchemaTypeAdapter{ + defaultStringColumnType: defaultType, + } } func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) { @@ -23,7 +32,18 @@ func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) { switch s { case "String": - return schema.QuesmaTypeText, true + switch c.defaultStringColumnType { + + // empty if for testing purposes, in production it should always be set + case "", "text": + return schema.QuesmaTypeText, true + case "keyword": + return schema.QuesmaTypeKeyword, true + default: + logger.Error().Msgf("Unknown field type %s", c.defaultStringColumnType) + return schema.QuesmaTypeUnknown, false + } + case "LowCardinality(String)", "UUID", "FixedString": return schema.QuesmaTypeKeyword, true case "Int", "Int8", "Int16", "Int32", "Int64": diff --git a/platform/config/config.go b/platform/config/config.go index d583417d4..53901c6c4 100644 --- a/platform/config/config.go +++ b/platform/config/config.go @@ -57,6 +57,7 @@ type QuesmaConfiguration struct { DefaultIngestOptimizers map[string]OptimizerConfiguration DefaultQueryOptimizers map[string]OptimizerConfiguration MapFieldsDiscoveringEnabled bool + DefaultStringColumnType string } func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration { @@ -269,7 +270,8 @@ Quesma Configuration: UseCommonTableForWildcard: %t, DefaultIngestTarget: %v, DefaultQueryTarget: %v, - MapFieldsDiscoveringEnabled: %t + MapFieldsDiscoveringEnabled: %t, + DefaultStringColumnType: %s `, c.TransparentProxy, elasticUrl, @@ -292,6 +294,7 @@ Quesma Configuration: c.DefaultIngestTarget, c.DefaultQueryTarget, c.MapFieldsDiscoveringEnabled, + c.DefaultStringColumnType, ) } diff --git a/platform/config/config_v2.go b/platform/config/config_v2.go index 2b6840c98..a18ec8434 100644 --- a/platform/config/config_v2.go +++ b/platform/config/config_v2.go @@ -49,6 +49,13 @@ type QuesmaNewConfiguration struct { Pipelines []Pipeline `koanf:"pipelines"` DisableTelemetry bool `koanf:"disableTelemetry"` MapFieldsDiscoveringEnabled bool `koanf:"mapFieldsDiscoveringEnabled"` + DefaultStringToKeywordType bool `koanf:"defaultStringToKeywordType"` + QuesmaFlags QuesmaFlags `koanf:"flags"` +} + +// It holds all the configuration flags that affect global Quesma behavior. +type QuesmaFlags struct { + DefaultStringColumnType *string `koanf:"defaultStringColumnType"` } type LoggingConfiguration struct { @@ -592,6 +599,21 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { conf.MapFieldsDiscoveringEnabled = c.MapFieldsDiscoveringEnabled + conf.DefaultStringColumnType = "text" // default value, can be overridden by the flag + if c.QuesmaFlags.DefaultStringColumnType != nil { + + switch *c.QuesmaFlags.DefaultStringColumnType { + case "keyword": + conf.DefaultStringColumnType = "keyword" + case "text": + conf.DefaultStringColumnType = "text" + default: + + errAcc = multierror.Append(errAcc, fmt.Errorf("defaultStringColumnType must be either 'keyword' or 'text', got '%s'", *c.QuesmaFlags.DefaultStringColumnType)) + + } + } + conf.AutodiscoveryEnabled = false conf.Connectors = make(map[string]RelationalDbConfiguration) relDBConn, connType, relationalDBErr := c.getRelationalDBConf() diff --git a/platform/config/config_v2_test.go b/platform/config/config_v2_test.go index eb19182f9..bded66ac8 100644 --- a/platform/config/config_v2_test.go +++ b/platform/config/config_v2_test.go @@ -359,3 +359,27 @@ func TestPartitionBy(t *testing.T) { assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy) } + +func TestStringColumnIsTextDefaultBehavior(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + + assert.Equal(t, "text", legacyConf.DefaultStringColumnType) + +} + +func TestStringColumnIsKeyword(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/string_column_is_keyword_field.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + + assert.Equal(t, "keyword", legacyConf.DefaultStringColumnType) + +} diff --git a/platform/config/test_configs/string_column_is_keyword_field.yaml b/platform/config/test_configs/string_column_is_keyword_field.yaml new file mode 100644 index 000000000..cad81691b --- /dev/null +++ b/platform/config/test_configs/string_column_is_keyword_field.yaml @@ -0,0 +1,72 @@ +# TEST CONFIGURATION +licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004" + + +flags: + defaultStringColumnType: keyword + +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://localhost:9200" + - name: my-clickhouse-data-source + type: clickhouse-os + config: + url: "clickhouse://localhost:9000" +ingestStatistics: true +internalTelemetryUrl: "https://api.quesma.com/phone-home" +logging: + remoteUrl: "https://api.quesma.com/phone-home" + path: "logs" + level: "info" +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + example-index: + target: + - my-clickhouse-data-source + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source + partitioningStrategy: daily + "*": + target: + - my-minimal-elasticsearch + partitioningStrategy: hourly + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + example-index: + target: + - my-clickhouse-data-source + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source + partitioningStrategy: daily + "*": + target: + - my-minimal-elasticsearch + partitioningStrategy: hourly +pipelines: + - name: my-pipeline-elasticsearch-query-clickhouse + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] + - name: my-pipeline-elasticsearch-ingest-to-clickhouse + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] +