Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/experimental/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func main() {

virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.StringColumnIsKeywordField))
schemaRegistry.Start()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func main() {

virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.StringColumnIsKeywordField))
schemaRegistry.Start()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
Expand Down
15 changes: 14 additions & 1 deletion platform/clickhouse/type_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,14 @@ import (
)

type SchemaTypeAdapter struct {
stringColumnIsKeywordField bool
}

func NewSchemaTypeAdapter(stringColumnIsKeywordField bool) SchemaTypeAdapter {

return SchemaTypeAdapter{
stringColumnIsKeywordField: stringColumnIsKeywordField,
}
}

func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) {
Expand All @@ -23,7 +31,12 @@ func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) {

switch s {
case "String":
return schema.QuesmaTypeText, true
if c.stringColumnIsKeywordField {
return schema.QuesmaTypeKeyword, true
} else {
return schema.QuesmaTypeText, true
}

case "LowCardinality(String)", "UUID", "FixedString":
return schema.QuesmaTypeKeyword, true
case "Int", "Int8", "Int16", "Int32", "Int64":
Expand Down
5 changes: 4 additions & 1 deletion platform/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type QuesmaConfiguration struct {
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
MapFieldsDiscoveringEnabled bool
StringColumnIsKeywordField bool
}

func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration {
Expand Down Expand Up @@ -269,7 +270,8 @@ Quesma Configuration:
UseCommonTableForWildcard: %t,
DefaultIngestTarget: %v,
DefaultQueryTarget: %v,
MapFieldsDiscoveringEnabled: %t
MapFieldsDiscoveringEnabled: %t,
StringColumnIsKeywordField: %t
`,
c.TransparentProxy,
elasticUrl,
Expand All @@ -292,6 +294,7 @@ Quesma Configuration:
c.DefaultIngestTarget,
c.DefaultQueryTarget,
c.MapFieldsDiscoveringEnabled,
c.StringColumnIsKeywordField,
)
}

Expand Down
2 changes: 2 additions & 0 deletions platform/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type QuesmaNewConfiguration struct {
Pipelines []Pipeline `koanf:"pipelines"`
DisableTelemetry bool `koanf:"disableTelemetry"`
MapFieldsDiscoveringEnabled bool `koanf:"mapFieldsDiscoveringEnabled"`
StringColumnIsKeywordField bool `koanf:"stringColumnIsKeywordField"`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not like this name.

For example better suggestion:
defaultStringToKeywordType

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed

}

type LoggingConfiguration struct {
Expand Down Expand Up @@ -591,6 +592,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
conf.LicenseKey = c.LicenseKey

conf.MapFieldsDiscoveringEnabled = c.MapFieldsDiscoveringEnabled
conf.StringColumnIsKeywordField = c.StringColumnIsKeywordField

conf.AutodiscoveryEnabled = false
conf.Connectors = make(map[string]RelationalDbConfiguration)
Expand Down
24 changes: 24 additions & 0 deletions platform/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,27 @@ func TestPartitionBy(t *testing.T) {

assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
}

func TestStringColumnIsTextDefaultBehavior(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, false, legacyConf.StringColumnIsKeywordField)

}

func TestStringColumnIsKeyword(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/string_column_is_keyword_field.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, true, legacyConf.StringColumnIsKeywordField)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# TEST CONFIGURATION
licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004"
stringColumnIsKeywordField: true
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://localhost:9200"
- name: my-clickhouse-data-source
type: clickhouse-os
config:
url: "clickhouse://localhost:9000"
ingestStatistics: true
internalTelemetryUrl: "https://api.quesma.com/phone-home"
logging:
remoteUrl: "https://api.quesma.com/phone-home"
path: "logs"
level: "info"
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
pipelines:
- name: my-pipeline-elasticsearch-query-clickhouse
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]

Loading