diff --git a/platform/clickhouse/clickhouse.go b/platform/clickhouse/clickhouse.go index e4ba59a4f..0aab2d212 100644 --- a/platform/clickhouse/clickhouse.go +++ b/platform/clickhouse/clickhouse.go @@ -52,13 +52,13 @@ type ( // TODO make sure it's unique in schema (there's no other 'timestamp' field) // I (Krzysiek) can write it quickly, but don't want to waste time for it right now. TimestampDefaultsNow bool - ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName` - Engine string // "Log", "MergeTree", etc. - OrderBy string // "" if none - PartitionBy string // "" if none - PrimaryKey string // "" if none - Settings string // "" if none - Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR + ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName` + Engine string // "Log", "MergeTree", etc. + OrderBy string // "" if none + PartitionStrategy config.PartitionStrategy // PartitionStrategy to be applied to tables created by Quesma + PrimaryKey string // "" if none + Settings string // "" if none + Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR // look https://clickhouse.com/docs/en/sql-reference/data-types/special-data-types/interval // "" if none // TODO make sure it's unique in schema (there's no other 'others' field) @@ -360,7 +360,6 @@ func NewDefaultCHConfig() *ChTableConfig { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(" + `"@timestamp"` + ")", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ @@ -381,7 +380,6 @@ func NewNoTimestampOnlyStringAttrCHConfig() *ChTableConfig { TimestampDefaultsNow: false, Engine: "MergeTree", OrderBy: "(" + `"@timestamp"` + ")", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ diff --git a/platform/clickhouse/clickhouse_test.go b/platform/clickhouse/clickhouse_test.go index c343547ff..807ab6f34 100644 --- a/platform/clickhouse/clickhouse_test.go +++ b/platform/clickhouse/clickhouse_test.go @@ -403,7 +403,6 @@ func TestJsonFlatteningToStringAttr(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ @@ -438,7 +437,6 @@ func TestJsonConvertingBoolToStringAttr(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ @@ -519,7 +517,6 @@ func TestCreateTableString_1(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ @@ -606,7 +603,6 @@ func TestCreateTableString_NewDateTypes(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []Attribute{ diff --git a/platform/clickhouse/schema.go b/platform/clickhouse/schema.go index 079c56492..6c34f0629 100644 --- a/platform/clickhouse/schema.go +++ b/platform/clickhouse/schema.go @@ -4,10 +4,12 @@ package clickhouse import ( "fmt" + "github.com/QuesmaOrg/quesma/platform/config" "github.com/QuesmaOrg/quesma/platform/logger" "github.com/QuesmaOrg/quesma/platform/util" "math" "reflect" + "strconv" "strings" "time" ) @@ -342,8 +344,8 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string { if config.OrderBy != "" { s += "ORDER BY " + config.OrderBy + "\n" } - if config.PartitionBy != "" { - s += "PARTITION BY " + config.PartitionBy + "\n" + if partitioningFunc := getPartitioningFunc(config.PartitionStrategy); config.PartitionStrategy != "" && partitioningFunc != "" { + s += "PARTITION BY " + partitioningFunc + "(" + strconv.Quote(timestampFieldName) + ")" + "\n" } if config.PrimaryKey != "" { s += "PRIMARY KEY " + config.PrimaryKey + "\n" @@ -358,6 +360,24 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string { return s } +func getPartitioningFunc(strategy config.PartitionStrategy) string { + switch strategy { + case config.Hourly: + return "toStartOfHour" + case config.Daily: + return "toYYYYMMDD" + case config.Monthly: + return "toYYYYMM" + case config.Yearly: + return "toYYYY" + case config.None: + return "" + default: + logger.Error().Msgf("Unknown partitioning strategy '%v'", strategy) + return "" + } +} + func NewDefaultStringAttribute() Attribute { return Attribute{ KeysArrayName: DeprecatedAttributesKeyColumn, diff --git a/platform/config/config.go b/platform/config/config.go index bf633492d..1a2cbb01d 100644 --- a/platform/config/config.go +++ b/platform/config/config.go @@ -47,14 +47,15 @@ type QuesmaConfiguration struct { DisableAuth bool AutodiscoveryEnabled bool - EnableIngest bool // this is computed from the configuration 2.0 - CreateCommonTable bool - ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause - UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes - DefaultIngestTarget []string - DefaultQueryTarget []string - DefaultIngestOptimizers map[string]OptimizerConfiguration - DefaultQueryOptimizers map[string]OptimizerConfiguration + DefaultPartitioningStrategy PartitionStrategy // applied from the "*" index configuration + EnableIngest bool // this is computed from the configuration 2.0 + CreateCommonTable bool + ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause + UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes + DefaultIngestTarget []string + DefaultQueryTarget []string + DefaultIngestOptimizers map[string]OptimizerConfiguration + DefaultQueryOptimizers map[string]OptimizerConfiguration } func (c *QuesmaConfiguration) AliasFields(indexName string) map[string]string { diff --git a/platform/config/config_v2.go b/platform/config/config_v2.go index 1bd450630..0847882a9 100644 --- a/platform/config/config_v2.go +++ b/platform/config/config_v2.go @@ -341,6 +341,19 @@ func (c *QuesmaNewConfiguration) validatePipelines() error { if queryIndexConf.UseCommonTable != ingestIndexConf.UseCommonTable { return fmt.Errorf("ingest and query processors must have the same configuration of 'useCommonTable' for index '%s' due to current limitations", indexName) } + if queryIndexConf.PartitioningStrategy != ingestIndexConf.PartitioningStrategy { + return fmt.Errorf("ingest and query processors must have the same configuration of 'partitioningStrategy' for index '%s' due to current limitations", indexName) + } + if ingestIndexConf.PartitioningStrategy != "" && ingestIndexConf.UseCommonTable { + return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName) + } + if queryIndexConf.PartitioningStrategy != "" && queryIndexConf.UseCommonTable { + return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName) + } + allowedPartitioningStrategies := []PartitionStrategy{None, Hourly, Daily, Monthly, Yearly} + if !slices.Contains(allowedPartitioningStrategies, queryIndexConf.PartitioningStrategy) { + return fmt.Errorf("partitioning strategy '%s' is not allowed for index '%s', only %v are supported", queryIndexConf.PartitioningStrategy, indexName, allowedPartitioningStrategies) + } if queryIndexConf.SchemaOverrides == nil || ingestIndexConf.SchemaOverrides == nil { if queryIndexConf.SchemaOverrides != ingestIndexConf.SchemaOverrides { return fmt.Errorf("ingest and query processors must have the same configuration of 'schemaOverrides' for index '%s' due to current limitations", indexName) @@ -842,6 +855,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { } if defaultQueryConfig, ok := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]; ok { conf.DefaultQueryOptimizers = defaultQueryConfig.Optimizers + conf.DefaultPartitioningStrategy = queryProcessor.Config.IndexConfig[DefaultWildcardIndexName].PartitioningStrategy } else { conf.DefaultQueryOptimizers = nil } diff --git a/platform/config/config_v2_test.go b/platform/config/config_v2_test.go index ddaf3fced..69b3a4135 100644 --- a/platform/config/config_v2_test.go +++ b/platform/config/config_v2_test.go @@ -339,3 +339,22 @@ func TestIngestOptimizers(t *testing.T) { _, ok = legacyConf.DefaultIngestOptimizers["query_only"] assert.False(t, ok) } + +func TestPartitionBy(t *testing.T) { + os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml") + cfg := LoadV2Config() + if err := cfg.Validate(); err != nil { + t.Fatalf("error validating config: %v", err) + } + legacyConf := cfg.TranslateToLegacyConfig() + + assert.Equal(t, 2, len(legacyConf.IndexConfig)) + + ecommerce := legacyConf.IndexConfig["kibana_sample_data_ecommerce"] + assert.Equal(t, Daily, ecommerce.PartitioningStrategy) + + flights := legacyConf.IndexConfig["kibana_sample_data_flights"] + assert.Equal(t, None, flights.PartitioningStrategy) + + assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy) +} diff --git a/platform/config/index_config.go b/platform/config/index_config.go index 7181a168b..e1fa186b3 100644 --- a/platform/config/index_config.go +++ b/platform/config/index_config.go @@ -13,6 +13,17 @@ const ( ClickhouseTarget = "clickhouse" ) +// PartitionStrategy represents a configurable partitioning strategy for ClickHouse tables created by Quesma +type PartitionStrategy string + +const ( + Hourly PartitionStrategy = "hourly" + Daily PartitionStrategy = "daily" + Monthly PartitionStrategy = "monthly" + Yearly PartitionStrategy = "yearly" + None PartitionStrategy = "" +) + type IndexConfiguration struct { SchemaOverrides *SchemaConfiguration `koanf:"schemaOverrides"` Optimizers map[string]OptimizerConfiguration `koanf:"optimizers"` @@ -20,7 +31,9 @@ type IndexConfiguration struct { UseCommonTable bool `koanf:"useCommonTable"` Target any `koanf:"target"` - EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"` + // PartitioningStrategy adds PARTITION BY clause to the table creation query + PartitioningStrategy PartitionStrategy `koanf:"partitioningStrategy"` // Experimental feature + EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"` // Experimental feature // Computed based on the overall configuration QueryTarget []string diff --git a/platform/config/test_configs/partition_by.yaml b/platform/config/test_configs/partition_by.yaml new file mode 100644 index 000000000..8ce2c23aa --- /dev/null +++ b/platform/config/test_configs/partition_by.yaml @@ -0,0 +1,67 @@ +# TEST CONFIGURATION +licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004" +frontendConnectors: + - name: elastic-ingest + type: elasticsearch-fe-ingest + config: + listenPort: 8080 + - name: elastic-query + type: elasticsearch-fe-query + config: + listenPort: 8080 +backendConnectors: + - name: my-minimal-elasticsearch + type: elasticsearch + config: + url: "http://localhost:9200" + - name: my-clickhouse-data-source + type: clickhouse-os + config: + url: "clickhouse://localhost:9000" +ingestStatistics: true +internalTelemetryUrl: "https://api.quesma.com/phone-home" +logging: + remoteUrl: "https://api.quesma.com/phone-home" + path: "logs" + level: "info" +processors: + - name: my-query-processor + type: quesma-v1-processor-query + config: + indexes: + example-index: + target: + - my-clickhouse-data-source + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source + partitioningStrategy: daily + "*": + target: + - my-minimal-elasticsearch + partitioningStrategy: hourly + - name: my-ingest-processor + type: quesma-v1-processor-ingest + config: + indexes: + example-index: + target: + - my-clickhouse-data-source + kibana_sample_data_ecommerce: + target: + - my-clickhouse-data-source + partitioningStrategy: daily + "*": + target: + - my-minimal-elasticsearch + partitioningStrategy: hourly +pipelines: + - name: my-pipeline-elasticsearch-query-clickhouse + frontendConnectors: [ elastic-query ] + processors: [ my-query-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] + - name: my-pipeline-elasticsearch-ingest-to-clickhouse + frontendConnectors: [ elastic-ingest ] + processors: [ my-ingest-processor ] + backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ] + diff --git a/platform/ingest/alter_table_test.go b/platform/ingest/alter_table_test.go index 16dd2ed3f..4299f3302 100644 --- a/platform/ingest/alter_table_test.go +++ b/platform/ingest/alter_table_test.go @@ -19,7 +19,6 @@ func TestAlterTable(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{ @@ -77,7 +76,6 @@ func TestAlterTableHeuristic(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{ diff --git a/platform/ingest/processor.go b/platform/ingest/processor.go index 8fcf6364f..c97c3a055 100644 --- a/platform/ingest/processor.go +++ b/platform/ingest/processor.go @@ -642,6 +642,11 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context, var createTableCmd string if table == nil { tableConfig = NewOnlySchemaFieldsCHConfig(ip.cfg.ClusterName) + if indexConfig, ok := ip.cfg.IndexConfig[tableName]; ok { + tableConfig.PartitionStrategy = indexConfig.PartitioningStrategy + } else if strategy := ip.cfg.DefaultPartitioningStrategy; strategy != "" { + tableConfig.PartitionStrategy = strategy + } columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig) fieldOrigins := make(map[schema.FieldName]schema.FieldSource) @@ -1011,7 +1016,6 @@ func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(" + `"@timestamp"` + ")", - PartitionBy: "", ClusterName: clusterName, PrimaryKey: "", Ttl: "", @@ -1028,7 +1032,6 @@ func NewDefaultCHConfig() *chLib.ChTableConfig { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(" + `"@timestamp"` + ")", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []chLib.Attribute{ diff --git a/platform/ingest/processor_test.go b/platform/ingest/processor_test.go index b63bec754..9c78a4aa2 100644 --- a/platform/ingest/processor_test.go +++ b/platform/ingest/processor_test.go @@ -41,7 +41,6 @@ var hasOthersConfig = &clickhouse.ChTableConfig{ TimestampDefaultsNow: false, Engine: "MergeTree", OrderBy: "(timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{}, @@ -132,7 +131,6 @@ func TestAddTimestamp(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{}, @@ -541,7 +539,6 @@ func TestJsonFlatteningToStringAttr(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{ @@ -576,7 +573,6 @@ func TestJsonConvertingBoolToStringAttr(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{ @@ -657,7 +653,6 @@ func TestCreateTableString_1(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{ @@ -742,7 +737,6 @@ func TestCreateTableString_NewDateTypes(t *testing.T) { TimestampDefaultsNow: true, Engine: "MergeTree", OrderBy: "(@timestamp)", - PartitionBy: "", PrimaryKey: "", Ttl: "", Attributes: []clickhouse.Attribute{