Skip to content

Commit e728f3a

Browse files
authored
fix(streaming): event v1 customer filter (#3260)
1 parent d672afd commit e728f3a

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

openmeter/streaming/clickhouse/event_query.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (d queryEventsTable) toSQL() (string, []interface{}) {
106106
query := sqlbuilder.ClickHouse.NewSelectBuilder()
107107

108108
// Select columns
109-
selectColumns := []string{
109+
query.Select(
110110
"id",
111111
"type",
112112
"subject",
@@ -116,15 +116,13 @@ func (d queryEventsTable) toSQL() (string, []interface{}) {
116116
"ingested_at",
117117
"stored_at",
118118
"store_row_id",
119-
}
119+
)
120120

121121
// Select customer_id column if customer filter is provided
122122
if d.Customers != nil {
123123
query = selectCustomerIdColumn(d.EventsTableName, *d.Customers, query)
124124
}
125125

126-
query.Select(selectColumns...)
127-
128126
query.From(tableName)
129127

130128
// Add where clauses

openmeter/streaming/clickhouse/event_query_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ import (
66

77
"github.com/stretchr/testify/assert"
88

9+
"github.com/openmeterio/openmeter/openmeter/customer"
910
"github.com/openmeterio/openmeter/openmeter/streaming"
11+
"github.com/openmeterio/openmeter/pkg/models"
1012
)
1113

1214
func TestCreateEventsTable(t *testing.T) {
@@ -91,6 +93,42 @@ func TestQueryEventsTable(t *testing.T) {
9193
wantSQL: "SELECT id, type, subject, source, time, data, ingested_at, stored_at, store_row_id FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND time < ? AND id LIKE ? ORDER BY time DESC LIMIT ?",
9294
wantArgs: []interface{}{"my_namespace", from.Unix(), to.Unix(), "%event-id-1%", 100},
9395
},
96+
// Customer filter
97+
{
98+
query: queryEventsTable{
99+
Database: "openmeter",
100+
EventsTableName: "om_events",
101+
Namespace: "my_namespace",
102+
From: from,
103+
Limit: 100,
104+
Customers: &[]streaming.Customer{
105+
customer.Customer{
106+
ManagedResource: models.ManagedResource{
107+
NamespacedModel: models.NamespacedModel{
108+
Namespace: "my_namespace",
109+
},
110+
ID: "customer1",
111+
},
112+
UsageAttribution: customer.CustomerUsageAttribution{
113+
SubjectKeys: []string{"subject1", "subject2"},
114+
},
115+
},
116+
customer.Customer{
117+
ManagedResource: models.ManagedResource{
118+
NamespacedModel: models.NamespacedModel{
119+
Namespace: "my_namespace",
120+
},
121+
ID: "customer2",
122+
},
123+
UsageAttribution: customer.CustomerUsageAttribution{
124+
SubjectKeys: []string{"subject3"},
125+
},
126+
},
127+
},
128+
},
129+
wantSQL: "WITH map('subject1', 'customer1', 'subject2', 'customer1', 'subject3', 'customer2') as subject_to_customer_id SELECT id, type, subject, source, time, data, ingested_at, stored_at, store_row_id, subject_to_customer_id[om_events.subject] AS customer_id FROM openmeter.om_events WHERE namespace = ? AND time >= ? AND om_events.subject IN (?) ORDER BY time DESC LIMIT ?",
130+
wantArgs: []interface{}{"my_namespace", from.Unix(), []string{"subject1", "subject2", "subject3"}, 100},
131+
},
94132
}
95133

96134
for _, tt := range tests {

0 commit comments

Comments
 (0)