Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions platform/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ type (
// TODO make sure it's unique in schema (there's no other 'timestamp' field)
// I (Krzysiek) can write it quickly, but don't want to waste time for it right now.
TimestampDefaultsNow bool
ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName`
Engine string // "Log", "MergeTree", etc.
OrderBy string // "" if none
PartitionBy string // "" if none
PrimaryKey string // "" if none
Settings string // "" if none
Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR
ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName`
Engine string // "Log", "MergeTree", etc.
OrderBy string // "" if none
PartitionStrategy config.PartitionStrategy // PartitionStrategy to be applied to tables created by Quesma
PrimaryKey string // "" if none
Settings string // "" if none
Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR
// look https://clickhouse.com/docs/en/sql-reference/data-types/special-data-types/interval
// "" if none
// TODO make sure it's unique in schema (there's no other 'others' field)
Expand Down Expand Up @@ -360,7 +360,6 @@ func NewDefaultCHConfig() *ChTableConfig {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(" + `"@timestamp"` + ")",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand All @@ -381,7 +380,6 @@ func NewNoTimestampOnlyStringAttrCHConfig() *ChTableConfig {
TimestampDefaultsNow: false,
Engine: "MergeTree",
OrderBy: "(" + `"@timestamp"` + ")",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand Down
4 changes: 0 additions & 4 deletions platform/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,6 @@ func TestJsonFlatteningToStringAttr(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand Down Expand Up @@ -438,7 +437,6 @@ func TestJsonConvertingBoolToStringAttr(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand Down Expand Up @@ -519,7 +517,6 @@ func TestCreateTableString_1(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand Down Expand Up @@ -606,7 +603,6 @@ func TestCreateTableString_NewDateTypes(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []Attribute{
Expand Down
24 changes: 22 additions & 2 deletions platform/clickhouse/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@ package clickhouse

import (
"fmt"
"github.com/QuesmaOrg/quesma/platform/config"
"github.com/QuesmaOrg/quesma/platform/logger"
"github.com/QuesmaOrg/quesma/platform/util"
"math"
"reflect"
"strconv"
"strings"
"time"
)
Expand Down Expand Up @@ -342,8 +344,8 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string {
if config.OrderBy != "" {
s += "ORDER BY " + config.OrderBy + "\n"
}
if config.PartitionBy != "" {
s += "PARTITION BY " + config.PartitionBy + "\n"
if partitioningFunc := getPartitioningFunc(config.PartitionStrategy); config.PartitionStrategy != "" && partitioningFunc != "" {
s += "PARTITION BY " + partitioningFunc + "(" + strconv.Quote(timestampFieldName) + ")" + "\n"
}
if config.PrimaryKey != "" {
s += "PRIMARY KEY " + config.PrimaryKey + "\n"
Expand All @@ -358,6 +360,24 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string {
return s
}

func getPartitioningFunc(strategy config.PartitionStrategy) string {
switch strategy {
case config.Hourly:
return "toStartOfHour"
case config.Daily:
return "toYYYYMMDD"
case config.Monthly:
return "toYYYYMM"
case config.Yearly:
return "toYYYY"
case config.None:
return ""
default:
logger.Error().Msgf("Unknown partitioning strategy '%v'", strategy)
return ""
}
}

func NewDefaultStringAttribute() Attribute {
return Attribute{
KeysArrayName: DeprecatedAttributesKeyColumn,
Expand Down
17 changes: 9 additions & 8 deletions platform/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ type QuesmaConfiguration struct {
DisableAuth bool
AutodiscoveryEnabled bool

EnableIngest bool // this is computed from the configuration 2.0
CreateCommonTable bool
ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause
UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes
DefaultIngestTarget []string
DefaultQueryTarget []string
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
DefaultPartitioningStrategy PartitionStrategy // applied from the "*" index configuration
EnableIngest bool // this is computed from the configuration 2.0
CreateCommonTable bool
ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause
UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes
DefaultIngestTarget []string
DefaultQueryTarget []string
DefaultIngestOptimizers map[string]OptimizerConfiguration
DefaultQueryOptimizers map[string]OptimizerConfiguration
}

func (c *QuesmaConfiguration) AliasFields(indexName string) map[string]string {
Expand Down
14 changes: 14 additions & 0 deletions platform/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,19 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
if queryIndexConf.UseCommonTable != ingestIndexConf.UseCommonTable {
return fmt.Errorf("ingest and query processors must have the same configuration of 'useCommonTable' for index '%s' due to current limitations", indexName)
}
if queryIndexConf.PartitioningStrategy != ingestIndexConf.PartitioningStrategy {
return fmt.Errorf("ingest and query processors must have the same configuration of 'partitioningStrategy' for index '%s' due to current limitations", indexName)
}
if ingestIndexConf.PartitioningStrategy != "" && ingestIndexConf.UseCommonTable {
return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName)
}
if queryIndexConf.PartitioningStrategy != "" && queryIndexConf.UseCommonTable {
return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName)
}
allowedPartitioningStrategies := []PartitionStrategy{None, Hourly, Daily, Monthly, Yearly}
if !slices.Contains(allowedPartitioningStrategies, queryIndexConf.PartitioningStrategy) {
return fmt.Errorf("partitioning strategy '%s' is not allowed for index '%s', only %v are supported", queryIndexConf.PartitioningStrategy, indexName, allowedPartitioningStrategies)
}
if queryIndexConf.SchemaOverrides == nil || ingestIndexConf.SchemaOverrides == nil {
if queryIndexConf.SchemaOverrides != ingestIndexConf.SchemaOverrides {
return fmt.Errorf("ingest and query processors must have the same configuration of 'schemaOverrides' for index '%s' due to current limitations", indexName)
Expand Down Expand Up @@ -842,6 +855,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}
if defaultQueryConfig, ok := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]; ok {
conf.DefaultQueryOptimizers = defaultQueryConfig.Optimizers
conf.DefaultPartitioningStrategy = queryProcessor.Config.IndexConfig[DefaultWildcardIndexName].PartitioningStrategy
} else {
conf.DefaultQueryOptimizers = nil
}
Expand Down
19 changes: 19 additions & 0 deletions platform/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,3 +339,22 @@ func TestIngestOptimizers(t *testing.T) {
_, ok = legacyConf.DefaultIngestOptimizers["query_only"]
assert.False(t, ok)
}

func TestPartitionBy(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()

assert.Equal(t, 2, len(legacyConf.IndexConfig))

ecommerce := legacyConf.IndexConfig["kibana_sample_data_ecommerce"]
assert.Equal(t, Daily, ecommerce.PartitioningStrategy)

flights := legacyConf.IndexConfig["kibana_sample_data_flights"]
assert.Equal(t, None, flights.PartitioningStrategy)

assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
}
15 changes: 14 additions & 1 deletion platform/config/index_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,27 @@ const (
ClickhouseTarget = "clickhouse"
)

// PartitionStrategy represents a configurable partitioning strategy for ClickHouse tables created by Quesma
type PartitionStrategy string

const (
Hourly PartitionStrategy = "hourly"
Daily PartitionStrategy = "daily"
Monthly PartitionStrategy = "monthly"
Yearly PartitionStrategy = "yearly"
None PartitionStrategy = ""
)

type IndexConfiguration struct {
SchemaOverrides *SchemaConfiguration `koanf:"schemaOverrides"`
Optimizers map[string]OptimizerConfiguration `koanf:"optimizers"`
Override string `koanf:"tableName"` // use method TableName()
UseCommonTable bool `koanf:"useCommonTable"`
Target any `koanf:"target"`

EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"`
// PartitioningStrategy adds PARTITION BY clause to the table creation query
PartitioningStrategy PartitionStrategy `koanf:"partitioningStrategy"` // Experimental feature
EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"` // Experimental feature

// Computed based on the overall configuration
QueryTarget []string
Expand Down
67 changes: 67 additions & 0 deletions platform/config/test_configs/partition_by.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# TEST CONFIGURATION
licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004"
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://localhost:9200"
- name: my-clickhouse-data-source
type: clickhouse-os
config:
url: "clickhouse://localhost:9000"
ingestStatistics: true
internalTelemetryUrl: "https://api.quesma.com/phone-home"
logging:
remoteUrl: "https://api.quesma.com/phone-home"
path: "logs"
level: "info"
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
- name: my-ingest-processor
type: quesma-v1-processor-ingest
config:
indexes:
example-index:
target:
- my-clickhouse-data-source
kibana_sample_data_ecommerce:
target:
- my-clickhouse-data-source
partitioningStrategy: daily
"*":
target:
- my-minimal-elasticsearch
partitioningStrategy: hourly
pipelines:
- name: my-pipeline-elasticsearch-query-clickhouse
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
frontendConnectors: [ elastic-ingest ]
processors: [ my-ingest-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]

2 changes: 0 additions & 2 deletions platform/ingest/alter_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ func TestAlterTable(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down Expand Up @@ -77,7 +76,6 @@ func TestAlterTableHeuristic(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down
7 changes: 5 additions & 2 deletions platform/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -642,6 +642,11 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
var createTableCmd string
if table == nil {
tableConfig = NewOnlySchemaFieldsCHConfig(ip.cfg.ClusterName)
if indexConfig, ok := ip.cfg.IndexConfig[tableName]; ok {
tableConfig.PartitionStrategy = indexConfig.PartitioningStrategy
} else if strategy := ip.cfg.DefaultPartitioningStrategy; strategy != "" {
tableConfig.PartitionStrategy = strategy
}
columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig)

fieldOrigins := make(map[schema.FieldName]schema.FieldSource)
Expand Down Expand Up @@ -1011,7 +1016,6 @@ func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(" + `"@timestamp"` + ")",
PartitionBy: "",
ClusterName: clusterName,
PrimaryKey: "",
Ttl: "",
Expand All @@ -1028,7 +1032,6 @@ func NewDefaultCHConfig() *chLib.ChTableConfig {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(" + `"@timestamp"` + ")",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []chLib.Attribute{
Expand Down
6 changes: 0 additions & 6 deletions platform/ingest/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ var hasOthersConfig = &clickhouse.ChTableConfig{
TimestampDefaultsNow: false,
Engine: "MergeTree",
OrderBy: "(timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{},
Expand Down Expand Up @@ -132,7 +131,6 @@ func TestAddTimestamp(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{},
Expand Down Expand Up @@ -541,7 +539,6 @@ func TestJsonFlatteningToStringAttr(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down Expand Up @@ -576,7 +573,6 @@ func TestJsonConvertingBoolToStringAttr(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down Expand Up @@ -657,7 +653,6 @@ func TestCreateTableString_1(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down Expand Up @@ -742,7 +737,6 @@ func TestCreateTableString_NewDateTypes(t *testing.T) {
TimestampDefaultsNow: true,
Engine: "MergeTree",
OrderBy: "(@timestamp)",
PartitionBy: "",
PrimaryKey: "",
Ttl: "",
Attributes: []clickhouse.Attribute{
Expand Down
Loading