Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/experimental/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func main() {

virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.DefaultStringColumnType))
schemaRegistry.Start()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
Expand Down
2 changes: 1 addition & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func main() {

virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.NewSchemaTypeAdapter(cfg.DefaultStringColumnType))
schemaRegistry.Start()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)
Expand Down
22 changes: 21 additions & 1 deletion platform/clickhouse/type_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@
package clickhouse

import (
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/schema"
"strings"
)

type SchemaTypeAdapter struct {
defaultStringColumnType string
}

func NewSchemaTypeAdapter(defaultType string) SchemaTypeAdapter {

return SchemaTypeAdapter{
defaultStringColumnType: defaultType,
}
}

func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) {
Expand All @@ -23,7 +32,18 @@ func (c SchemaTypeAdapter) Convert(s string) (schema.QuesmaType, bool) {

switch s {
case "String":
return schema.QuesmaTypeText, true
switch c.defaultStringColumnType {

// empty if for testing purposes, in production it should always be set
case "", "text":
return schema.QuesmaTypeText, true
case "keyword":
return schema.QuesmaTypeKeyword, true
default:
logger.Error().Msgf("Unknown field type %s", c.defaultStringColumnType)
return schema.QuesmaTypeUnknown, false
}

case "LowCardinality(String)", "UUID", "FixedString":
return schema.QuesmaTypeKeyword, true
case "Int", "Int8", "Int16", "Int32", "Int64":
Expand Down
5 changes: 4 additions & 1 deletion platform/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ type QuesmaConfiguration struct {
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
MapFieldsDiscoveringEnabled bool
DefaultStringColumnType string
}

func NewQuesmaConfigurationIndexConfigOnly(indexConfig map[string]IndexConfiguration) QuesmaConfiguration {
Expand Down Expand Up @@ -269,7 +270,8 @@ Quesma Configuration:
UseCommonTableForWildcard: %t,
DefaultIngestTarget: %v,
DefaultQueryTarget: %v,
MapFieldsDiscoveringEnabled: %t
MapFieldsDiscoveringEnabled: %t,
DefaultStringColumnType: %s
`,
c.TransparentProxy,
elasticUrl,
Expand All @@ -292,6 +294,7 @@ Quesma Configuration:
c.DefaultIngestTarget,
c.DefaultQueryTarget,
c.MapFieldsDiscoveringEnabled,
c.DefaultStringColumnType,
)
}

Expand Down
22 changes: 22 additions & 0 deletions platform/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,13 @@ type QuesmaNewConfiguration struct {
Pipelines []Pipeline `koanf:"pipelines"`
DisableTelemetry bool `koanf:"disableTelemetry"`
MapFieldsDiscoveringEnabled bool `koanf:"mapFieldsDiscoveringEnabled"`
DefaultStringToKeywordType bool `koanf:"defaultStringToKeywordType"`
QuesmaFlags QuesmaFlags `koanf:"flags"`
}

// It holds all the configuration flags that affect global Quesma behavior.
type QuesmaFlags struct {
DefaultStringColumnType *string `koanf:"defaultStringColumnType"`
}

type LoggingConfiguration struct {
Expand Down Expand Up @@ -592,6 +599,21 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {

conf.MapFieldsDiscoveringEnabled = c.MapFieldsDiscoveringEnabled

conf.DefaultStringColumnType = "text" // default value, can be overridden by the flag
if c.QuesmaFlags.DefaultStringColumnType != nil {

switch *c.QuesmaFlags.DefaultStringColumnType {
case "keyword":
conf.DefaultStringColumnType = "keyword"
case "text":
conf.DefaultStringColumnType = "text"
default:

errAcc = multierror.Append(errAcc, fmt.Errorf("defaultStringColumnType must be either 'keyword' or 'text', got '%s'", *c.QuesmaFlags.DefaultStringColumnType))

}
}

conf.AutodiscoveryEnabled = false
conf.Connectors = make(map[string]RelationalDbConfiguration)
relDBConn, connType, relationalDBErr := c.getRelationalDBConf()
Expand Down
24 changes: 24 additions & 0 deletions platform/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,3 +359,27 @@ func TestPartitionBy(t *testing.T) {

assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
}

func TestStringColumnIsTextDefaultBehavior(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, "text", legacyConf.DefaultStringColumnType)

}

func TestStringColumnIsKeyword(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/string_column_is_keyword_field.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, "keyword", legacyConf.DefaultStringColumnType)

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# TEST CONFIGURATION
licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004"


flags:
defaultStringColumnType: keyword

frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://localhost:9200"
- name: my-clickhouse-data-source
type: clickhouse-os
config:
url: "clickhouse://localhost:9000"
ingestStatistics: true
internalTelemetryUrl: "https://api.quesma.com/phone-home"
logging:
remoteUrl: "https://api.quesma.com/phone-home"
path: "logs"
level: "info"
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
pipelines:
- name: my-pipeline-elasticsearch-query-clickhouse
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]

Loading