Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit 0b1a835

Browse files
authored
Adding experimental (and limited) support for table partitioning (#1347)
We're adding a `partitioningStrategy` to index configuration: ```yaml processors: - name: my-query-processor type: quesma-v1-processor-query config: indexes: '*': partitioningStrategy: "daily" target: [ my-clickhouse-data-source ] try_it: partitioningStrategy: "monthly" target: [ my-clickhouse-data-source ] ``` Having `partitioningStrategy` essentially adds `PARTITION BY function(` to `CREATE TABLE` statement executed by Quesma. Key points: 1) `partitioningStrategy` is applied only on table creation handled by Quesma. **If the table already exists, Quesma won't modify it.** 2) `partitioningStrategy` can be set to `hourly`, `daily`, `monthly` and `yearly`. 3) **We're not adding common table partitioning support** - so if index has `useCommonTable: true` specified together with any `partitioningStrategy` - Quesma is going to fail due to invalid configuration 4) Leaving `partitioningStrategy` not specified means table is not partitioned at all (== ClickHouse default) <!-- A note on testing your PR --> <!-- Basic unit test run is executed against each commit in the PR. If you want to run a full integration test suite, you can trigger it by commenting with '/run-integration-tests' -->
1 parent 40897bb commit 0b1a835

File tree

11 files changed

+157
-34
lines changed

11 files changed

+157
-34
lines changed

platform/clickhouse/clickhouse.go

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,13 @@ type (
5252
// TODO make sure it's unique in schema (there's no other 'timestamp' field)
5353
// I (Krzysiek) can write it quickly, but don't want to waste time for it right now.
5454
TimestampDefaultsNow bool
55-
ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName`
56-
Engine string // "Log", "MergeTree", etc.
57-
OrderBy string // "" if none
58-
PartitionBy string // "" if none
59-
PrimaryKey string // "" if none
60-
Settings string // "" if none
61-
Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR
55+
ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName`
56+
Engine string // "Log", "MergeTree", etc.
57+
OrderBy string // "" if none
58+
PartitionStrategy config.PartitionStrategy // PartitionStrategy to be applied to tables created by Quesma
59+
PrimaryKey string // "" if none
60+
Settings string // "" if none
61+
Ttl string // of type Interval, e.g. 3 MONTH, 1 YEAR
6262
// look https://clickhouse.com/docs/en/sql-reference/data-types/special-data-types/interval
6363
// "" if none
6464
// TODO make sure it's unique in schema (there's no other 'others' field)
@@ -360,7 +360,6 @@ func NewDefaultCHConfig() *ChTableConfig {
360360
TimestampDefaultsNow: true,
361361
Engine: "MergeTree",
362362
OrderBy: "(" + `"@timestamp"` + ")",
363-
PartitionBy: "",
364363
PrimaryKey: "",
365364
Ttl: "",
366365
Attributes: []Attribute{
@@ -381,7 +380,6 @@ func NewNoTimestampOnlyStringAttrCHConfig() *ChTableConfig {
381380
TimestampDefaultsNow: false,
382381
Engine: "MergeTree",
383382
OrderBy: "(" + `"@timestamp"` + ")",
384-
PartitionBy: "",
385383
PrimaryKey: "",
386384
Ttl: "",
387385
Attributes: []Attribute{

platform/clickhouse/clickhouse_test.go

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -403,7 +403,6 @@ func TestJsonFlatteningToStringAttr(t *testing.T) {
403403
TimestampDefaultsNow: true,
404404
Engine: "MergeTree",
405405
OrderBy: "(timestamp)",
406-
PartitionBy: "",
407406
PrimaryKey: "",
408407
Ttl: "",
409408
Attributes: []Attribute{
@@ -438,7 +437,6 @@ func TestJsonConvertingBoolToStringAttr(t *testing.T) {
438437
TimestampDefaultsNow: true,
439438
Engine: "MergeTree",
440439
OrderBy: "(timestamp)",
441-
PartitionBy: "",
442440
PrimaryKey: "",
443441
Ttl: "",
444442
Attributes: []Attribute{
@@ -519,7 +517,6 @@ func TestCreateTableString_1(t *testing.T) {
519517
TimestampDefaultsNow: true,
520518
Engine: "MergeTree",
521519
OrderBy: "(@timestamp)",
522-
PartitionBy: "",
523520
PrimaryKey: "",
524521
Ttl: "",
525522
Attributes: []Attribute{
@@ -606,7 +603,6 @@ func TestCreateTableString_NewDateTypes(t *testing.T) {
606603
TimestampDefaultsNow: true,
607604
Engine: "MergeTree",
608605
OrderBy: "(@timestamp)",
609-
PartitionBy: "",
610606
PrimaryKey: "",
611607
Ttl: "",
612608
Attributes: []Attribute{

platform/clickhouse/schema.go

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@ package clickhouse
44

55
import (
66
"fmt"
7+
"github.com/QuesmaOrg/quesma/platform/config"
78
"github.com/QuesmaOrg/quesma/platform/logger"
89
"github.com/QuesmaOrg/quesma/platform/util"
910
"math"
1011
"reflect"
12+
"strconv"
1113
"strings"
1214
"time"
1315
)
@@ -342,8 +344,8 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string {
342344
if config.OrderBy != "" {
343345
s += "ORDER BY " + config.OrderBy + "\n"
344346
}
345-
if config.PartitionBy != "" {
346-
s += "PARTITION BY " + config.PartitionBy + "\n"
347+
if partitioningFunc := getPartitioningFunc(config.PartitionStrategy); config.PartitionStrategy != "" && partitioningFunc != "" {
348+
s += "PARTITION BY " + partitioningFunc + "(" + strconv.Quote(timestampFieldName) + ")" + "\n"
347349
}
348350
if config.PrimaryKey != "" {
349351
s += "PRIMARY KEY " + config.PrimaryKey + "\n"
@@ -358,6 +360,24 @@ func (config *ChTableConfig) CreateTablePostFieldsString() string {
358360
return s
359361
}
360362

363+
func getPartitioningFunc(strategy config.PartitionStrategy) string {
364+
switch strategy {
365+
case config.Hourly:
366+
return "toStartOfHour"
367+
case config.Daily:
368+
return "toYYYYMMDD"
369+
case config.Monthly:
370+
return "toYYYYMM"
371+
case config.Yearly:
372+
return "toYYYY"
373+
case config.None:
374+
return ""
375+
default:
376+
logger.Error().Msgf("Unknown partitioning strategy '%v'", strategy)
377+
return ""
378+
}
379+
}
380+
361381
func NewDefaultStringAttribute() Attribute {
362382
return Attribute{
363383
KeysArrayName: DeprecatedAttributesKeyColumn,

platform/config/config.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,14 +47,15 @@ type QuesmaConfiguration struct {
4747
DisableAuth bool
4848
AutodiscoveryEnabled bool
4949

50-
EnableIngest bool // this is computed from the configuration 2.0
51-
CreateCommonTable bool
52-
ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause
53-
UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes
54-
DefaultIngestTarget []string
55-
DefaultQueryTarget []string
56-
DefaultIngestOptimizers map[string]OptimizerConfiguration
57-
DefaultQueryOptimizers map[string]OptimizerConfiguration
50+
DefaultPartitioningStrategy PartitionStrategy // applied from the "*" index configuration
51+
EnableIngest bool // this is computed from the configuration 2.0
52+
CreateCommonTable bool
53+
ClusterName string // When creating tables Quesma will append `ON CLUSTER ClusterName` clause
54+
UseCommonTableForWildcard bool //the meaning of this is to use a common table for wildcard (default) indexes
55+
DefaultIngestTarget []string
56+
DefaultQueryTarget []string
57+
DefaultIngestOptimizers map[string]OptimizerConfiguration
58+
DefaultQueryOptimizers map[string]OptimizerConfiguration
5859
}
5960

6061
func (c *QuesmaConfiguration) AliasFields(indexName string) map[string]string {

platform/config/config_v2.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,19 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
341341
if queryIndexConf.UseCommonTable != ingestIndexConf.UseCommonTable {
342342
return fmt.Errorf("ingest and query processors must have the same configuration of 'useCommonTable' for index '%s' due to current limitations", indexName)
343343
}
344+
if queryIndexConf.PartitioningStrategy != ingestIndexConf.PartitioningStrategy {
345+
return fmt.Errorf("ingest and query processors must have the same configuration of 'partitioningStrategy' for index '%s' due to current limitations", indexName)
346+
}
347+
if ingestIndexConf.PartitioningStrategy != "" && ingestIndexConf.UseCommonTable {
348+
return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName)
349+
}
350+
if queryIndexConf.PartitioningStrategy != "" && queryIndexConf.UseCommonTable {
351+
return fmt.Errorf("partitioning strategy cannot be set for index '%s' - common table partitioning is NOT supported", indexName)
352+
}
353+
allowedPartitioningStrategies := []PartitionStrategy{None, Hourly, Daily, Monthly, Yearly}
354+
if !slices.Contains(allowedPartitioningStrategies, queryIndexConf.PartitioningStrategy) {
355+
return fmt.Errorf("partitioning strategy '%s' is not allowed for index '%s', only %v are supported", queryIndexConf.PartitioningStrategy, indexName, allowedPartitioningStrategies)
356+
}
344357
if queryIndexConf.SchemaOverrides == nil || ingestIndexConf.SchemaOverrides == nil {
345358
if queryIndexConf.SchemaOverrides != ingestIndexConf.SchemaOverrides {
346359
return fmt.Errorf("ingest and query processors must have the same configuration of 'schemaOverrides' for index '%s' due to current limitations", indexName)
@@ -842,6 +855,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
842855
}
843856
if defaultQueryConfig, ok := queryProcessor.Config.IndexConfig[DefaultWildcardIndexName]; ok {
844857
conf.DefaultQueryOptimizers = defaultQueryConfig.Optimizers
858+
conf.DefaultPartitioningStrategy = queryProcessor.Config.IndexConfig[DefaultWildcardIndexName].PartitioningStrategy
845859
} else {
846860
conf.DefaultQueryOptimizers = nil
847861
}

platform/config/config_v2_test.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -339,3 +339,22 @@ func TestIngestOptimizers(t *testing.T) {
339339
_, ok = legacyConf.DefaultIngestOptimizers["query_only"]
340340
assert.False(t, ok)
341341
}
342+
343+
func TestPartitionBy(t *testing.T) {
344+
os.Setenv(configFileLocationEnvVar, "./test_configs/partition_by.yaml")
345+
cfg := LoadV2Config()
346+
if err := cfg.Validate(); err != nil {
347+
t.Fatalf("error validating config: %v", err)
348+
}
349+
legacyConf := cfg.TranslateToLegacyConfig()
350+
351+
assert.Equal(t, 2, len(legacyConf.IndexConfig))
352+
353+
ecommerce := legacyConf.IndexConfig["kibana_sample_data_ecommerce"]
354+
assert.Equal(t, Daily, ecommerce.PartitioningStrategy)
355+
356+
flights := legacyConf.IndexConfig["kibana_sample_data_flights"]
357+
assert.Equal(t, None, flights.PartitioningStrategy)
358+
359+
assert.Equal(t, Hourly, legacyConf.DefaultPartitioningStrategy)
360+
}

platform/config/index_config.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,27 @@ const (
1313
ClickhouseTarget = "clickhouse"
1414
)
1515

16+
// PartitionStrategy represents a configurable partitioning strategy for ClickHouse tables created by Quesma
17+
type PartitionStrategy string
18+
19+
const (
20+
Hourly PartitionStrategy = "hourly"
21+
Daily PartitionStrategy = "daily"
22+
Monthly PartitionStrategy = "monthly"
23+
Yearly PartitionStrategy = "yearly"
24+
None PartitionStrategy = ""
25+
)
26+
1627
type IndexConfiguration struct {
1728
SchemaOverrides *SchemaConfiguration `koanf:"schemaOverrides"`
1829
Optimizers map[string]OptimizerConfiguration `koanf:"optimizers"`
1930
Override string `koanf:"tableName"` // use method TableName()
2031
UseCommonTable bool `koanf:"useCommonTable"`
2132
Target any `koanf:"target"`
2233

23-
EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"`
34+
// PartitioningStrategy adds PARTITION BY clause to the table creation query
35+
PartitioningStrategy PartitionStrategy `koanf:"partitioningStrategy"` // Experimental feature
36+
EnableFieldMapSyntax bool `koanf:"enableFieldMapSyntax"` // Experimental feature
2437

2538
// Computed based on the overall configuration
2639
QueryTarget []string
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# TEST CONFIGURATION
2+
licenseKey: "cdd749a3-e777-11ee-bcf8-0242ac150004"
3+
frontendConnectors:
4+
- name: elastic-ingest
5+
type: elasticsearch-fe-ingest
6+
config:
7+
listenPort: 8080
8+
- name: elastic-query
9+
type: elasticsearch-fe-query
10+
config:
11+
listenPort: 8080
12+
backendConnectors:
13+
- name: my-minimal-elasticsearch
14+
type: elasticsearch
15+
config:
16+
url: "http://localhost:9200"
17+
- name: my-clickhouse-data-source
18+
type: clickhouse-os
19+
config:
20+
url: "clickhouse://localhost:9000"
21+
ingestStatistics: true
22+
internalTelemetryUrl: "https://api.quesma.com/phone-home"
23+
logging:
24+
remoteUrl: "https://api.quesma.com/phone-home"
25+
path: "logs"
26+
level: "info"
27+
processors:
28+
- name: my-query-processor
29+
type: quesma-v1-processor-query
30+
config:
31+
indexes:
32+
example-index:
33+
target:
34+
- my-clickhouse-data-source
35+
kibana_sample_data_ecommerce:
36+
target:
37+
- my-clickhouse-data-source
38+
partitioningStrategy: daily
39+
"*":
40+
target:
41+
- my-minimal-elasticsearch
42+
partitioningStrategy: hourly
43+
- name: my-ingest-processor
44+
type: quesma-v1-processor-ingest
45+
config:
46+
indexes:
47+
example-index:
48+
target:
49+
- my-clickhouse-data-source
50+
kibana_sample_data_ecommerce:
51+
target:
52+
- my-clickhouse-data-source
53+
partitioningStrategy: daily
54+
"*":
55+
target:
56+
- my-minimal-elasticsearch
57+
partitioningStrategy: hourly
58+
pipelines:
59+
- name: my-pipeline-elasticsearch-query-clickhouse
60+
frontendConnectors: [ elastic-query ]
61+
processors: [ my-query-processor ]
62+
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]
63+
- name: my-pipeline-elasticsearch-ingest-to-clickhouse
64+
frontendConnectors: [ elastic-ingest ]
65+
processors: [ my-ingest-processor ]
66+
backendConnectors: [ my-minimal-elasticsearch, my-clickhouse-data-source ]
67+

platform/ingest/alter_table_test.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ func TestAlterTable(t *testing.T) {
1919
TimestampDefaultsNow: true,
2020
Engine: "MergeTree",
2121
OrderBy: "(timestamp)",
22-
PartitionBy: "",
2322
PrimaryKey: "",
2423
Ttl: "",
2524
Attributes: []clickhouse.Attribute{
@@ -77,7 +76,6 @@ func TestAlterTableHeuristic(t *testing.T) {
7776
TimestampDefaultsNow: true,
7877
Engine: "MergeTree",
7978
OrderBy: "(timestamp)",
80-
PartitionBy: "",
8179
PrimaryKey: "",
8280
Ttl: "",
8381
Attributes: []clickhouse.Attribute{

platform/ingest/processor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -642,6 +642,11 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
642642
var createTableCmd string
643643
if table == nil {
644644
tableConfig = NewOnlySchemaFieldsCHConfig(ip.cfg.ClusterName)
645+
if indexConfig, ok := ip.cfg.IndexConfig[tableName]; ok {
646+
tableConfig.PartitionStrategy = indexConfig.PartitioningStrategy
647+
} else if strategy := ip.cfg.DefaultPartitioningStrategy; strategy != "" {
648+
tableConfig.PartitionStrategy = strategy
649+
}
645650
columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig)
646651

647652
fieldOrigins := make(map[schema.FieldName]schema.FieldSource)
@@ -1011,7 +1016,6 @@ func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig {
10111016
TimestampDefaultsNow: true,
10121017
Engine: "MergeTree",
10131018
OrderBy: "(" + `"@timestamp"` + ")",
1014-
PartitionBy: "",
10151019
ClusterName: clusterName,
10161020
PrimaryKey: "",
10171021
Ttl: "",
@@ -1028,7 +1032,6 @@ func NewDefaultCHConfig() *chLib.ChTableConfig {
10281032
TimestampDefaultsNow: true,
10291033
Engine: "MergeTree",
10301034
OrderBy: "(" + `"@timestamp"` + ")",
1031-
PartitionBy: "",
10321035
PrimaryKey: "",
10331036
Ttl: "",
10341037
Attributes: []chLib.Attribute{

0 commit comments

Comments
 (0)