Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit b319526

Browse files
mieciuavelanarius
andauthored
Adding support for ON CLUSTER clause in DDL statements (#1267)
This PR adds support for `ON CLUSTER` clause within DDL statements. If user want to synchronize DDL across nodes, it is required to place `clusterName` in the ingest processor configuration like this: ```yaml - name: my-ingest-processor type: quesma-v1-processor-ingest config: clusterName: "quesma_cluster" indexes: example: target: [ my-clickhouse-data-source ] ``` Having that, Quesma is going to append `ON CLUSTER "quesma_cluster"` to all `CREATE TABLE` and `ALTER TABLE` statements. ### Some useful bits for testing **Step 0.** Make sure you configured `clusterName` for table named `try_it`. Test table creation: ``` curl -i -X POST "http://localhost:8080/_bulk" -H "Content-Type: application/ndjson" -d ' { "index": { "_index": "try_it", "_id": "1" } } { "name": "Alice", "age": 30, "city": "New York" } { "index": { "_index": "try_it", "_id": "2" } } { "name": "Bob", "age": 25, "city": "San Francisco" } { "index": { "_index": "try_it", "_id": "3" } } { "name": "Charlie", "age": 35, "city": "Los Angeles" } ' ``` Test `ALTER TABLE`: ``` curl -i -X POST "http://localhost:8080/_bulk" -H "Content-Type: application/ndjson" -d ' { "index": { "_index": "try_it", "_id": "6" } } { "name": "Alice", "age": 30, "city": "New York", "genre": "jazz" } { "index": { "_index": "try_it", "_id": "8" } } { "name": "Bob", "age": 25, "city": "San Francisco", "genre": "hiphop" } ' ``` Also the same steps (create + alter) should be run for common table scenario. --------- Co-authored-by: Piotr Grabowski <[email protected]>
1 parent e97bc77 commit b319526

File tree

13 files changed

+143
-15
lines changed

13 files changed

+143
-15
lines changed

docs/public/docs/config-primer.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ The supported configuration options for backend connectors (under `config`):
106106
* `user` - username for authentication
107107
* `password` - password for authentication
108108
* `database` - name of the database to connect to. It is optional for ClickHouse, but strictly required for Hydrolix, where it is also referred as "project".
109+
* `clusterName` - name of the ClickHouse cluster (optional). This will be used in the `ON CLUSTER clusterName` clause when Quesma creates tables. Setting this option ensures that the created tables are present on all nodes of the distributed cluster.
109110
* `adminUrl` - URL for administrative operations to render a handy link in Quesma management UI (optional)
110111
* `disableTLS` - when set to true, disables TLS for the connection (optional)
111112

docs/public/docs/limitations.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ Quesma supports only ElasticSearch/Kibana 8.0 or above.
2424

2525
### ClickHouse limitations
2626
* When using a cluster deployment of ClickHouse, the tables automatically created by Quesma (during [Ingest](/ingest.md)) will use the `MergeTree` engine. If you wish to use the `ReplicatedMergeTree` engine instead, you will have to create the tables manually with `ReplicatedMergeTree` engine before ingesting data to Quesma.
27+
* Remember to configure `clusterName` in backend connector configuration to make sure that the tables are created on all nodes of the cluster.
2728
* *Note: On ClickHouse Cloud, the tables automatically created by Quesma will use the `ReplicatedMergeTree` engine (ClickHouse Cloud default engine).*
2829

2930
## Functional limitations

quesma/clickhouse/clickhouse.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ type (
5252
// TODO make sure it's unique in schema (there's no other 'timestamp' field)
5353
// I (Krzysiek) can write it quickly, but don't want to waste time for it right now.
5454
TimestampDefaultsNow bool
55+
ClusterName string // Name of the cluster if created with `CREATE TABLE ... ON CLUSTER ClusterName`
5556
Engine string // "Log", "MergeTree", etc.
5657
OrderBy string // "" if none
5758
PartitionBy string // "" if none
@@ -379,6 +380,7 @@ func NewDefaultCHConfig() *ChTableConfig {
379380
}
380381
}
381382

383+
// NewNoTimestampOnlyStringAttrCHConfig is used only in tests
382384
func NewNoTimestampOnlyStringAttrCHConfig() *ChTableConfig {
383385
return &ChTableConfig{
384386
HasTimestamp: false,
@@ -396,6 +398,7 @@ func NewNoTimestampOnlyStringAttrCHConfig() *ChTableConfig {
396398
}
397399
}
398400

401+
// NewChTableConfigNoAttrs is used only in tests
399402
func NewChTableConfigNoAttrs() *ChTableConfig {
400403
return &ChTableConfig{
401404
HasTimestamp: false,
@@ -408,6 +411,7 @@ func NewChTableConfigNoAttrs() *ChTableConfig {
408411
}
409412
}
410413

414+
// NewChTableConfigTimestampStringAttr is used only in tests
411415
func NewChTableConfigTimestampStringAttr() *ChTableConfig {
412416
return &ChTableConfig{
413417
HasTimestamp: true,

quesma/clickhouse/parserCreateTable.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -389,11 +389,22 @@ func ParseCreateTable(q string) (*Table, int) {
389389
// parse [ON CLUSTER cluster_name]
390390
i3 := parseExact(q, i2, "ON CLUSTER ")
391391
if i3 != -1 {
392+
i3 = omitWhitespace(q, i3)
393+
i4, _ := parseMaybeAndForget(q, i3, `"`) // cluster name can be quoted, but doesn't have to
394+
if i4 != -1 {
395+
i3 = i4
396+
}
392397
i4, ident := parseIdent(q, i3)
393398
if i4 == -1 {
394399
return &t, i3
395400
}
396-
t.Cluster = ident
401+
t.ClusterName = ident
402+
if i4 != -1 {
403+
i4, _ = parseMaybeAndForget(q, i4, `"`)
404+
if i4 == -1 {
405+
return &t, i3
406+
}
407+
}
397408
i2 = i4
398409
}
399410

quesma/clickhouse/schema.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,7 @@ func NewTable(createTableQuery string, config *ChTableConfig) (*Table, error) {
314314
}
315315
}
316316

317+
// NewEmptyTable is used only in tests
317318
func NewEmptyTable(tableName string) *Table {
318319
return &Table{Name: tableName, Config: NewChTableConfigNoAttrs()}
319320
}

quesma/clickhouse/table.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
type Table struct {
1717
Name string
1818
DatabaseName string `default:""`
19-
Cluster string `default:""`
19+
ClusterName string `default:""`
2020
Cols map[string]*Column
2121
Config *ChTableConfig
2222
Created bool // do we need to create it during first insert
@@ -60,7 +60,11 @@ func (t *Table) createTableOurFieldsString() []string {
6060
}
6161

6262
func (t *Table) CreateTableString() string {
63-
s := "CREATE TABLE IF NOT EXISTS " + t.FullTableName() + " (\n"
63+
var onClusterClause string
64+
if t.ClusterName != "" {
65+
onClusterClause = " ON CLUSTER " + strconv.Quote(t.ClusterName)
66+
}
67+
s := "CREATE TABLE IF NOT EXISTS " + t.FullTableName() + onClusterClause + " (\n"
6468
rows := make([]string, 0)
6569
for _, col := range t.Cols {
6670
rows = append(rows, col.createTableString(1))

quesma/clickhouse/table_discovery.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -398,6 +398,7 @@ func (td *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
398398
Name: tableName,
399399
Comment: resTable.comment,
400400
DatabaseName: databaseName,
401+
ClusterName: cfg.ClusterName, // FIXME: is this really necessary? The cluster name is only used when creating table, but this is an already created table - so is this information not needed?
401402
Cols: columnsMap,
402403
Config: &ChTableConfig{
403404
Attributes: []Attribute{},

quesma/common_table/const.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,17 @@ package common_table
44

55
import (
66
"context"
7+
"fmt"
78
"github.com/QuesmaOrg/quesma/quesma/logger"
89
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
10+
"strconv"
911
)
1012

1113
const TableName = "quesma_common_table"
1214
const IndexNameColumn = "__quesma_index_name"
1315

1416
const singleTableDDL = `
15-
CREATE TABLE IF NOT EXISTS "quesma_common_table"
17+
CREATE TABLE IF NOT EXISTS "quesma_common_table" %s
1618
(
1719
"attributes_values" Map(String, String),
1820
"attributes_metadata" Map(String, String),
@@ -26,10 +28,19 @@ CREATE TABLE IF NOT EXISTS "quesma_common_table"
2628
2729
`
2830

29-
func EnsureCommonTableExists(db quesma_api.BackendConnector) {
31+
func commonTableDDL(clusterName string) string {
32+
var maybeOnClusterClause string
33+
if clusterName != "" {
34+
maybeOnClusterClause = "ON CLUSTER " + strconv.Quote(clusterName)
35+
}
36+
return fmt.Sprintf(singleTableDDL, maybeOnClusterClause)
37+
38+
}
39+
40+
func EnsureCommonTableExists(db quesma_api.BackendConnector, clusterName string) {
3041

3142
logger.Info().Msgf("Ensuring common table '%v' exists", TableName)
32-
err := db.Exec(context.Background(), singleTableDDL)
43+
err := db.Exec(context.Background(), commonTableDDL(clusterName))
3344
if err != nil {
3445
// TODO check if we've got RO access to the database
3546
logger.Warn().Msgf("Failed to create common table '%v': %v", TableName, err)

quesma/ingest/parserCreateTable_test.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func TestParseSignozSchema_2(t *testing.T) {
7777
assert.Contains(t, table.Cols, fieldName)
7878
}
7979
assert.Equal(t, "db", table.DatabaseName)
80-
assert.Equal(t, "cluster", table.Cluster)
80+
assert.Equal(t, "cluster", table.ClusterName)
8181
}
8282

8383
func TestParseQuotedTablename(t *testing.T) {
@@ -119,6 +119,79 @@ func TestParseNonLetterNames(t *testing.T) {
119119
}
120120
}
121121

122+
func TestParseCreateSampleDataEcommerce(t *testing.T) {
123+
q := `CREATE TABLE IF NOT EXISTS "kibana_sample_data_ecommerce" ON CLUSTER "quesma_cluster"
124+
(
125+
"@timestamp" DateTime64(3) DEFAULT now64(),
126+
"attributes_values" Map(String,String),
127+
"attributes_metadata" Map(String,String),
128+
129+
130+
"taxful_total_price" Nullable(Float64) COMMENT 'quesmaMetadataV1:fieldName=taxful_total_price',
131+
"sku" Array(String) COMMENT 'quesmaMetadataV1:fieldName=sku',
132+
"taxless_total_price" Nullable(Float64) COMMENT 'quesmaMetadataV1:fieldName=taxless_total_price',
133+
"total_unique_products" Nullable(Int64) COMMENT 'quesmaMetadataV1:fieldName=total_unique_products',
134+
"geoip_region_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.region_name',
135+
"category" Array(String) COMMENT 'quesmaMetadataV1:fieldName=category',
136+
"products_created_on" Array(DateTime64) COMMENT 'quesmaMetadataV1:fieldName=products.created_on',
137+
"products_taxful_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.taxful_price',
138+
"user" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=user',
139+
"currency" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=currency',
140+
"day_of_week" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=day_of_week',
141+
"geoip_location_lat" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.location.lat',
142+
"geoip_city_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.city_name',
143+
"products__id" Array(String) COMMENT 'quesmaMetadataV1:fieldName=products._id',
144+
"products_discount_amount" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.discount_amount',
145+
"customer_last_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_last_name',
146+
"total_quantity" Nullable(Int64) COMMENT 'quesmaMetadataV1:fieldName=total_quantity',
147+
"geoip_continent_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.continent_name',
148+
"products_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.price',
149+
"products_sku" Array(String) COMMENT 'quesmaMetadataV1:fieldName=products.sku',
150+
"products_discount_percentage" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.discount_percentage',
151+
"type" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=type',
152+
"customer_full_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_full_name',
153+
"products_min_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.min_price',
154+
"products_unit_discount_amount" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.unit_discount_amount',
155+
"products_manufacturer" Array(String) COMMENT 'quesmaMetadataV1:fieldName=products.manufacturer',
156+
"products_base_unit_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.base_unit_price',
157+
"day_of_week_i" Nullable(Int64) COMMENT 'quesmaMetadataV1:fieldName=day_of_week_i',
158+
"manufacturer" Array(String) COMMENT 'quesmaMetadataV1:fieldName=manufacturer',
159+
"customer_first_name" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_first_name',
160+
"products_product_id" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.product_id',
161+
"customer_gender" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_gender',
162+
"email" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=email',
163+
"order_id" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=order_id',
164+
"customer_phone" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_phone',
165+
"order_date" Nullable(DateTime64) COMMENT 'quesmaMetadataV1:fieldName=order_date',
166+
"geoip_location_lon" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.location.lon',
167+
"products_base_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.base_price',
168+
"products_category" Array(String) COMMENT 'quesmaMetadataV1:fieldName=products.category',
169+
"products_tax_amount" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.tax_amount',
170+
"products_product_name" Array(String) COMMENT 'quesmaMetadataV1:fieldName=products.product_name',
171+
"customer_id" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=customer_id',
172+
"event_dataset" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=event.dataset',
173+
"geoip_country_iso_code" Nullable(String) COMMENT 'quesmaMetadataV1:fieldName=geoip.country_iso_code',
174+
"products_taxless_price" Array(Float64) COMMENT 'quesmaMetadataV1:fieldName=products.taxless_price',
175+
"products_quantity" Array(Int64) COMMENT 'quesmaMetadataV1:fieldName=products.quantity',
176+
"customer_birth_date" Nullable(DateTime64) COMMENT 'quesmaMetadataV1:fieldName=customer_birth_date',
177+
178+
)
179+
ENGINE = MergeTree
180+
ORDER BY ("@timestamp")
181+
182+
COMMENT 'created by Quesma'`
183+
184+
fieldNames := []string{"@timestamp", "attributes_values", "attributes_metadata", "taxful_total_price", "sku", "taxless_total_price", "total_unique_products", "geoip_region_name", "category", "products_created_on", "products_taxful_price", "user", "currency", "day_of_week", "geoip_location_lat", "geoip_city_name", "products__id", "products_discount_amount", "customer_last_name", "total_quantity", "geoip_continent_name", "products_price", "products_sku", "products_discount_percentage", "type", "customer_full_name", "products_min_price", "products_unit_discount_amount", "products_manufacturer", "products_base_unit_price", "day_of_week_i", "manufacturer", "customer_first_name", "products_product_id", "customer_gender", "email", "order_id", "customer_phone", "order_date", "geoip_location_lon", "products_base_price", "products_category", "products_tax_amount", "products_product_name", "customer_id", "event_dataset", "geoip_country_iso_code", "products_taxless_price", "products_quantity", "customer_birth_date"}
185+
table, err := clickhouse.NewTable(q, nil)
186+
assert.NoError(t, err)
187+
assert.Equal(t, len(fieldNames), len(table.Cols))
188+
for _, fieldName := range fieldNames {
189+
assert.Contains(t, table.Cols, fieldName)
190+
}
191+
assert.Equal(t, "kibana_sample_data_ecommerce", table.Name)
192+
assert.Equal(t, "quesma_cluster", table.ClusterName)
193+
}
194+
122195
func TestParseLongNestedSchema(t *testing.T) {
123196
q := `CREATE TABLE IF NOT EXISTS "/_monitoring/bulk?system_id=kibana&system_api_version=7&interval=10000ms_2"
124197
(

quesma/ingest/processor.go

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/goccy/go-json"
2727
"slices"
2828
"sort"
29+
"strconv"
2930
"strings"
3031
"sync"
3132
"sync/atomic"
@@ -224,14 +225,18 @@ func Indexes(m SchemaMap) string {
224225
}
225226

226227
func createTableQuery(name string, columns string, config *chLib.ChTableConfig) string {
227-
createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s"
228+
var onClusterClause string
229+
if config.ClusterName != "" {
230+
onClusterClause = "ON CLUSTER " + strconv.Quote(config.ClusterName) + " "
231+
}
232+
createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s" %s
228233
(
229234
230235
%s
231236
)
232237
%s
233238
COMMENT 'created by Quesma'`,
234-
name, columns,
239+
name, onClusterClause, columns,
235240
config.CreateTablePostFieldsString())
236241
return createTableCmd
237242
}
@@ -337,11 +342,16 @@ func (ip *IngestProcessor) generateNewColumns(
337342
metadata.Values[comment_metadata.ElasticFieldName] = propertyName
338343
comment := metadata.Marshall()
339344

340-
alterTable := fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", table.Name, attrKeys[i], columnType)
345+
var maybeOnClusterClause string
346+
if table.ClusterName != "" {
347+
maybeOnClusterClause = " ON CLUSTER " + strconv.Quote(table.ClusterName)
348+
}
349+
350+
alterTable := fmt.Sprintf("ALTER TABLE \"%s\"%s ADD COLUMN IF NOT EXISTS \"%s\" %s", table.Name, maybeOnClusterClause, attrKeys[i], columnType)
341351
newColumns[attrKeys[i]] = &chLib.Column{Name: attrKeys[i], Type: chLib.NewBaseType(attrTypes[i]), Modifiers: modifiers, Comment: comment}
342352
alterCmd = append(alterCmd, alterTable)
343353

344-
alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment)
354+
alterColumn := fmt.Sprintf("ALTER TABLE \"%s\"%s COMMENT COLUMN \"%s\" '%s'", table.Name, maybeOnClusterClause, attrKeys[i], comment)
345355
alterCmd = append(alterCmd, alterColumn)
346356

347357
deleteIndexes = append(deleteIndexes, i)
@@ -632,7 +642,7 @@ func (ip *IngestProcessor) processInsertQuery(ctx context.Context,
632642
var tableConfig *chLib.ChTableConfig
633643
var createTableCmd string
634644
if table == nil {
635-
tableConfig = NewOnlySchemaFieldsCHConfig()
645+
tableConfig = NewOnlySchemaFieldsCHConfig(ip.cfg.ClusterName)
636646
columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig)
637647

638648
fieldOrigins := make(map[schema.FieldName]schema.FieldSource)
@@ -996,13 +1006,14 @@ func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb quesma_api.Backend
9961006
return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeClient: phoneHomeClient, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver}
9971007
}
9981008

999-
func NewOnlySchemaFieldsCHConfig() *chLib.ChTableConfig {
1009+
func NewOnlySchemaFieldsCHConfig(clusterName string) *chLib.ChTableConfig {
10001010
return &chLib.ChTableConfig{
10011011
HasTimestamp: true,
10021012
TimestampDefaultsNow: true,
10031013
Engine: "MergeTree",
10041014
OrderBy: "(" + `"@timestamp"` + ")",
10051015
PartitionBy: "",
1016+
ClusterName: clusterName,
10061017
PrimaryKey: "",
10071018
Ttl: "",
10081019
Attributes: []chLib.Attribute{chLib.NewDefaultStringAttribute()},
@@ -1011,6 +1022,7 @@ func NewOnlySchemaFieldsCHConfig() *chLib.ChTableConfig {
10111022
}
10121023
}
10131024

1025+
// NewDefaultCHConfig is used only in tests
10141026
func NewDefaultCHConfig() *chLib.ChTableConfig {
10151027
return &chLib.ChTableConfig{
10161028
HasTimestamp: true,
@@ -1031,6 +1043,7 @@ func NewDefaultCHConfig() *chLib.ChTableConfig {
10311043
}
10321044
}
10331045

1046+
// NewChTableConfigNoAttrs is used only in tests
10341047
func NewChTableConfigNoAttrs() *chLib.ChTableConfig {
10351048
return &chLib.ChTableConfig{
10361049
HasTimestamp: false,
@@ -1043,6 +1056,7 @@ func NewChTableConfigNoAttrs() *chLib.ChTableConfig {
10431056
}
10441057
}
10451058

1059+
// NewChTableConfigFourAttrs is used only in tests
10461060
func NewChTableConfigFourAttrs() *chLib.ChTableConfig {
10471061
return &chLib.ChTableConfig{
10481062
HasTimestamp: false,

0 commit comments

Comments
 (0)