Skip to content

feat(streaming): clickhouse query optimization #2974

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .dagger/versions_pinned.go
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ package main

const (
kafkaVersion = "3.6"
clickhouseVersion = "24.5.5.78"
clickhouseVersion = "24.10"
redisVersion = "7.0.12"
postgresVersion = "14.9"
svixVersion = "v1.44"
1 change: 1 addition & 0 deletions app/common/streaming.go
Original file line number Diff line number Diff line change
@@ -35,6 +35,7 @@ func NewStreamingConnector(
AsyncInsert: conf.AsyncInsert,
AsyncInsertWait: conf.AsyncInsertWait,
InsertQuerySettings: conf.InsertQuerySettings,
MeterQuerySettings: conf.MeterQuerySettings,
ProgressManager: progressmanager,
QueryCacheEnabled: conf.QueryCache.Enabled,
QueryCacheNamespaceTemplate: conf.QueryCache.NamespaceTemplate,
4 changes: 4 additions & 0 deletions app/config/aggregation.go
Original file line number Diff line number Diff line change
@@ -30,6 +30,10 @@ type AggregationConfiguration struct {
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
InsertQuerySettings map[string]string

// MeterQuerySettings is the settings for the meter query
// For example, you can set the `enable_parallel_replicas` and `max_parallel_replicas` settings.
MeterQuerySettings map[string]string

// QueryCache is the cache configuration
QueryCache AggregationQueryCacheConfiguration
}
2 changes: 1 addition & 1 deletion deploy/charts/openmeter/templates/clickhouse.yaml
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@ spec:
spec:
containers:
- name: clickhouse
image: clickhouse/clickhouse-server:23.3
image: clickhouse/clickhouse-server:24.10
volumeMounts:
- name: data-storage-vc-template
mountPath: /var/lib/clickhouse
2 changes: 1 addition & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -27,7 +27,7 @@ services:
retries: 30

clickhouse:
image: clickhouse/clickhouse-server:24.9-alpine
image: clickhouse/clickhouse-server:24.10-alpine
ports:
- "127.0.0.1:8123:8123"
- "127.0.0.1:9000:9000"
2 changes: 1 addition & 1 deletion examples/collectors/database/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ services:
clickhouse:
profiles:
- clickhouse
image: clickhouse/clickhouse-server:23.8.9.54-alpine
image: clickhouse/clickhouse-server:24.0-alpine
ports:
- 127.0.0.1:8123:8123
- 127.0.0.1:9000:9000
2 changes: 2 additions & 0 deletions openmeter/streaming/clickhouse/connector.go
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@ type Config struct {
AsyncInsert bool
AsyncInsertWait bool
InsertQuerySettings map[string]string
MeterQuerySettings map[string]string
ProgressManager progressmanager.Service
SkipCreateTables bool
QueryCacheEnabled bool
@@ -205,6 +206,7 @@ func (c *Connector) QueryMeter(ctx context.Context, namespace string, meter mete
GroupBy: groupBy,
WindowSize: params.WindowSize,
WindowTimeZone: params.WindowTimeZone,
QuerySettings: c.config.MeterQuerySettings,
}

// Load cached rows if any
80 changes: 55 additions & 25 deletions openmeter/streaming/clickhouse/meter_query.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ import (
"fmt"
"math"
"sort"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
@@ -26,6 +27,7 @@ type queryMeter struct {
GroupBy []string
WindowSize *meterpkg.WindowSize
WindowTimeZone *time.Location
QuerySettings map[string]string
}

// from returns the from time for the query.
@@ -143,7 +145,7 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
// TODO: remove this when we don't round to the nearest minute anymore
// We round them to the nearest minute to ensure the result is the same as with
// streaming connector using materialized views with per minute windows
selectColumn := fmt.Sprintf("tumbleStart(min(%s), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(%s), toIntervalMinute(1)) AS windowend", timeColumn, timeColumn)
selectColumn := fmt.Sprintf("toStartOfMinute(min(%s)) AS windowstart, toStartOfMinute(max(%s)) + INTERVAL 1 MINUTE AS windowend", timeColumn, timeColumn)
selectColumns = append(selectColumns, selectColumn)
}

@@ -200,6 +202,9 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
query := sqlbuilder.ClickHouse.NewSelectBuilder()
query.Select(selectColumns...)
query.From(tableName)

// Prewhere clauses

query.Where(query.Equal(getColumn("namespace"), d.Namespace))
query.Where(query.Equal(getColumn("type"), d.Meter.EventType))

@@ -211,15 +216,32 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
query.Where(query.Or(slicesx.Map(d.Subject, mapFunc)...))
}

// Apply the time where clause
from := d.from()

if from != nil {
query.Where(query.GreaterEqualThan(timeColumn, from.Unix()))
}

if d.To != nil {
query.Where(query.LessThan(timeColumn, d.To.Unix()))
}

var sqlPreWhere string

if len(d.FilterGroupBy) > 0 {
// We sort the group by s to ensure the query is deterministic
groupByKeys := make([]string, 0, len(d.FilterGroupBy))
sqlPreWhere, _ = query.Build()
dataColumn := getColumn("data")

// We sort the group bys to ensure the query is deterministic
filterGroupByKeys := make([]string, 0, len(d.FilterGroupBy))
for k := range d.FilterGroupBy {
groupByKeys = append(groupByKeys, k)
filterGroupByKeys = append(filterGroupByKeys, k)
}
sort.Strings(groupByKeys)
sort.Strings(filterGroupByKeys)

for _, groupByKey := range groupByKeys {
// Where clauses
for _, groupByKey := range filterGroupByKeys {
if _, ok := d.Meter.GroupBy[groupByKey]; !ok {
return "", nil, fmt.Errorf("meter does not have group by: %s", groupByKey)
}
@@ -231,38 +253,46 @@ func (d *queryMeter) toSQL() (string, []interface{}, error) {
return "", nil, fmt.Errorf("empty filter for group by: %s", groupByKey)
}
mapFunc := func(value string) string {
column := fmt.Sprintf("JSON_VALUE(%s, '%s')", getColumn("data"), groupByJSONPath)

// Subject is a special case
if groupByKey == "subject" {
column = "subject"
}

return fmt.Sprintf("%s = '%s'", column, sqlbuilder.Escape((value)))
return fmt.Sprintf("JSON_VALUE(%s, '%s') = '%s'", dataColumn, groupByJSONPath, sqlbuilder.Escape((value)))
}

query.Where(query.Or(slicesx.Map(values, mapFunc)...))
}
}

// Apply the time where clause
from := d.from()

if from != nil {
query.Where(query.GreaterEqualThan(timeColumn, from.Unix()))
}

if d.To != nil {
query.Where(query.LessThan(timeColumn, d.To.Unix()))
}

// Group by
query.GroupBy(groupByColumns...)

if groupByWindowSize {
query.OrderBy("windowstart")
}

sql, args := query.Build()

// Only add prewhere if there are filters on JSON data
if sqlPreWhere != "" {
sqlParts := strings.Split(sql, sqlPreWhere)
sqlAfter := sqlParts[1]

if strings.HasPrefix(sqlAfter, " AND") {
sqlAfter = strings.Replace(sqlAfter, "AND", "WHERE", 1)
}

sqlPreWhere = strings.Replace(sqlPreWhere, "WHERE", "PREWHERE", 1)
sql = fmt.Sprintf("%s%s", sqlPreWhere, sqlAfter)
}

// Add settings
settings := []string{
"optimize_move_to_prewhere = 1",
"allow_reorder_prewhere_conditions = 1",
}
for key, value := range d.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
}

sql = sql + fmt.Sprintf(" SETTINGS %s", strings.Join(settings, ", "))

return sql, args, nil
}

28 changes: 14 additions & 14 deletions openmeter/streaming/clickhouse/meter_query_test.go
Original file line number Diff line number Diff line change
@@ -43,7 +43,7 @@ func TestQueryMeter(t *testing.T) {
GroupBy: []string{"subject", "group1", "group2"},
WindowSize: &windowSize,
},
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart",
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend, subject, group1, group2 ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", "subject1", from.Unix(), to.Unix()},
},
{ // Aggregate all available data
@@ -62,7 +62,7 @@ func TestQueryMeter(t *testing.T) {
},
},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate with count aggregation
@@ -80,7 +80,7 @@ func TestQueryMeter(t *testing.T) {
},
},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, toFloat64(count(*)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate with LATEST aggregation
@@ -99,7 +99,7 @@ func TestQueryMeter(t *testing.T) {
},
},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ?",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, argMax(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null), om_events.time) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data from start
@@ -119,7 +119,7 @@ func TestQueryMeter(t *testing.T) {
},
From: &from,
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ?",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", from.Unix()},
},
{ // Aggregate data between period
@@ -140,7 +140,7 @@ func TestQueryMeter(t *testing.T) {
From: &from,
To: &to,
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ?",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data between period, groupped by window size
@@ -162,7 +162,7 @@ func TestQueryMeter(t *testing.T) {
To: &to,
WindowSize: &windowSize,
},
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart",
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'UTC') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'UTC') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data between period in a different timezone, groupped by window size
@@ -185,7 +185,7 @@ func TestQueryMeter(t *testing.T) {
WindowSize: &windowSize,
WindowTimeZone: tz,
},
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart",
wantSQL: "SELECT tumbleStart(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowstart, tumbleEnd(om_events.time, toIntervalHour(1), 'Asia/Shanghai') AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND om_events.time >= ? AND om_events.time < ? GROUP BY windowstart, windowend ORDER BY windowstart SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", from.Unix(), to.Unix()},
},
{ // Aggregate data for a single subject
@@ -206,7 +206,7 @@ func TestQueryMeter(t *testing.T) {
Subject: []string{subject},
GroupBy: []string{"subject"},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", "subject1"},
},
{ // Aggregate data for a single subject and group by additional fields
@@ -227,7 +227,7 @@ func TestQueryMeter(t *testing.T) {
Subject: []string{subject},
GroupBy: []string{"subject", "group1", "group2"},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject, JSON_VALUE(om_events.data, '$.group1') as group1, JSON_VALUE(om_events.data, '$.group2') as group2 FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ?) GROUP BY subject, group1, group2 SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", "subject1"},
},
{ // Aggregate data for a multiple subjects
@@ -248,7 +248,7 @@ func TestQueryMeter(t *testing.T) {
Subject: []string{subject, "subject2"},
GroupBy: []string{"subject"},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value, om_events.subject FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (om_events.subject = ? OR om_events.subject = ?) GROUP BY subject SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1", "subject1", "subject2"},
},
{ // Aggregate data with filtering for a single group and single value
@@ -268,7 +268,7 @@ func TestQueryMeter(t *testing.T) {
},
FilterGroupBy: map[string][]string{"g1": {"g1v1"}},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1')",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data with filtering for a single group and multiple values
@@ -288,7 +288,7 @@ func TestQueryMeter(t *testing.T) {
},
FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2')",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
{ // Aggregate data with filtering for multiple groups and multiple values
@@ -308,7 +308,7 @@ func TestQueryMeter(t *testing.T) {
},
FilterGroupBy: map[string][]string{"g1": {"g1v1", "g1v2"}, "g2": {"g2v1", "g2v2"}},
},
wantSQL: "SELECT tumbleStart(min(om_events.time), toIntervalMinute(1)) AS windowstart, tumbleEnd(max(om_events.time), toIntervalMinute(1)) AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events WHERE om_events.namespace = ? AND om_events.type = ? AND (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2')",
wantSQL: "SELECT toStartOfMinute(min(om_events.time)) AS windowstart, toStartOfMinute(max(om_events.time)) + INTERVAL 1 MINUTE AS windowend, sum(ifNotFinite(toFloat64OrNull(JSON_VALUE(om_events.data, '$.value')), null)) AS value FROM openmeter.om_events PREWHERE om_events.namespace = ? AND om_events.type = ? WHERE (JSON_VALUE(om_events.data, '$.group1') = 'g1v1' OR JSON_VALUE(om_events.data, '$.group1') = 'g1v2') AND (JSON_VALUE(om_events.data, '$.group2') = 'g2v1' OR JSON_VALUE(om_events.data, '$.group2') = 'g2v2') SETTINGS optimize_move_to_prewhere = 1, allow_reorder_prewhere_conditions = 1",
wantArgs: []interface{}{"my_namespace", "event1"},
},
}