Skip to content
This repository was archived by the owner on Nov 7, 2025. It is now read-only.

Commit d869a07

Browse files
authored
new aggregation IP Prefix (#1101)
1 parent fc99f64 commit d869a07

File tree

9 files changed

+1051
-18
lines changed

9 files changed

+1051
-18
lines changed

docs/public/docs/limitations.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ Currently supported:
3434
including: `boolean`, `match`, `match phrase`, `multi-match`, `query string`, `nested`, `match all`, `exists`, `prefix`, `range`, `term`, `terms`, `wildcard`
3535
- most popular [Aggregations](https://www.elastic.co/guide/en/elasticsearch/reference/current/search-aggregations.html),
3636
including: `avg`, `cardinality`, `max`, `min`, `percentile ranks`, `percentiles`, `stats`, `sum`, `top hits`, `top metrics`, `value counts`,
37-
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `singificant terms`, `terms`
37+
`date histogram`, `date range`, `filter`, `filters`, `histogram`, `range`, `singificant terms`, `terms`, `ip prefix`
3838

3939
Which as a result allows you to run Kibana/OSD queries and dashboards on data residing in ClickHouse/Hydrolix.
4040

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package logger
4+
5+
import (
6+
"context"
7+
"quesma/util"
8+
"time"
9+
)
10+
11+
var throttleMap = util.SyncMap[string, time.Time]{}
12+
13+
const throttleDuration = 30 * time.Minute
14+
15+
// WarnWithCtxAndThrottling - logs a warning message when encountering unexpected parameter in query.
16+
// We only log once per throttleDuration for each aggrName+paramName combination, so that we don't spam the logs.
17+
// Very simple mechanism, good enough for this specific use case.
18+
// Probably will require (at least slight) refactor, if you need it for some other things.
19+
func WarnWithCtxAndThrottling(ctx context.Context, aggrName, paramName, format string, v ...any) {
20+
mapKey := aggrName + paramName
21+
timestamp, ok := throttleMap.Load(mapKey)
22+
weThrottle := ok && time.Since(timestamp) < throttleDuration
23+
if !weThrottle {
24+
WarnWithCtx(ctx).Msgf(format, v...)
25+
throttleMap.Store(mapKey, time.Now())
26+
}
27+
}

quesma/model/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ More info: https://www.elastic.co/guide/en/elasticsearch/reference/current/searc
2727
Max | :white_check_mark: | Geotile grid | :x: | Min bucket | :white_check_mark: |
2828
Median absolute deviation | :x: | Global | :x: | Moving function | :wavy_dash: |
2929
Min | :white_check_mark: | Histogram | :white_check_mark: | Moving percentiles | :x: |
30-
Percentile ranks | :white_check_mark: | IP prefix | :x: | Normalize | :x: |
30+
Percentile ranks | :white_check_mark: | IP prefix | :white_check_mark: | Normalize | :x: |
3131
Percentiles | :white_check_mark: | IP range | :x: | Percentiles bucket | :x: |
3232
Rate | :x: | Missing | :x: | Serial differencing | :white_check_mark: |
3333
Scripted metric | :x: | Multi-terms | :white_check_mark: | Stats bucket | :x: |
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
// Copyright Quesma, licensed under the Elastic License 2.0.
2+
// SPDX-License-Identifier: Elastic-2.0
3+
package bucket_aggregations
4+
5+
import (
6+
"context"
7+
"fmt"
8+
"quesma/logger"
9+
"quesma/model"
10+
"reflect"
11+
)
12+
13+
// Current limitation: we expect Clickhouse field to be IPv4 (and not IPv6)
14+
15+
// Clickhouse table to test SQLs:
16+
// CREATE TABLE __quesma_table_name (clientip IPv4) ENGINE=Log
17+
// INSERT INTO __quesma_table_name VALUES ('0.0.0.0'), ('5.5.5.5'), ('90.180.90.180'), ('128.200.0.8'), ('192.168.1.67'), ('222.168.22.67')
18+
19+
// TODO make part of QueryType interface and implement for all aggregations
20+
// TODO add bad requests to tests
21+
// Doing so will ensure we see 100% of what we're interested in in our logs (now we see ~95%)
22+
func CheckParamsIpPrefix(ctx context.Context, paramsRaw any) error {
23+
requiredParams := map[string]string{
24+
"field": "string",
25+
"prefix_length": "float64", // TODO should be int, will be fixed
26+
}
27+
optionalParams := map[string]string{
28+
"is_ipv6": "bool",
29+
"append_prefix_length": "bool",
30+
"keyed": "bool",
31+
"min_doc_count": "int",
32+
}
33+
logIfYouSeeThemParams := []string{"min_doc_count"} // we don't use min_doc_count yet. We'll log if "is_ipv6" == true, also.
34+
35+
params, ok := paramsRaw.(model.JsonMap)
36+
if !ok {
37+
return fmt.Errorf("params is not a map, but %+v", paramsRaw)
38+
}
39+
40+
// check if required are present
41+
for paramName, paramType := range requiredParams {
42+
paramVal, exists := params[paramName]
43+
if !exists {
44+
return fmt.Errorf("required parameter %s not found in params", paramName)
45+
}
46+
if reflect.TypeOf(paramVal).Name() != paramType { // TODO I'll make a small rewrite to not use reflect here
47+
return fmt.Errorf("required parameter %s is not of type %s, but %T", paramName, paramType, paramVal)
48+
}
49+
}
50+
51+
// check if only required/optional are present
52+
for paramName := range params {
53+
if _, isRequired := requiredParams[paramName]; !isRequired {
54+
wantedType, isOptional := optionalParams[paramName]
55+
if !isOptional {
56+
return fmt.Errorf("unexpected parameter %s found in IP Range params %v", paramName, params)
57+
}
58+
if reflect.TypeOf(params[paramName]).Name() != wantedType { // TODO I'll make a small rewrite to not use reflect here
59+
return fmt.Errorf("optional parameter %s is not of type %s, but %T", paramName, wantedType, params[paramName])
60+
}
61+
}
62+
}
63+
64+
// log if you see them
65+
for _, warnParam := range logIfYouSeeThemParams {
66+
if _, exists := params[warnParam]; exists {
67+
logger.WarnWithCtxAndThrottling(ctx, "ip_prefix", warnParam, "we didn't expect %s in IP Range params %v", warnParam, params)
68+
}
69+
}
70+
if isIpv6, exists := params["is_ipv6"]; exists && isIpv6.(bool) {
71+
logger.WarnWithCtxAndThrottling(ctx, "ip_prefix", "is_ipv6", "is_ipv6 is true in IP Range params %v, we don't support IPv6 yet", params)
72+
}
73+
74+
return nil
75+
}
76+
77+
type IpPrefix struct {
78+
ctx context.Context
79+
field model.Expr
80+
prefixLength int
81+
isIpv6 bool
82+
appendPrefixLength bool
83+
keyed bool
84+
minDocCount int
85+
}
86+
87+
func NewIpPrefix(ctx context.Context, field model.Expr, prefixLength int, isIpv6 bool, appendPrefixLength bool, keyed bool, minDocCount int) *IpPrefix {
88+
return &IpPrefix{
89+
ctx: ctx,
90+
field: field,
91+
prefixLength: prefixLength,
92+
isIpv6: isIpv6,
93+
appendPrefixLength: appendPrefixLength,
94+
keyed: keyed,
95+
minDocCount: minDocCount,
96+
}
97+
}
98+
99+
func (query *IpPrefix) AggregationType() model.AggregationType {
100+
return model.BucketAggregation
101+
}
102+
103+
func (query *IpPrefix) TranslateSqlResponseToJson(rows []model.QueryResultRow) model.JsonMap {
104+
var netmask, keySuffix string
105+
if !query.isIpv6 {
106+
netmask = query.calcNetMask()
107+
}
108+
if query.appendPrefixLength {
109+
keySuffix = fmt.Sprintf("/%d", query.prefixLength)
110+
}
111+
buckets := make([]model.JsonMap, 0, len(rows))
112+
for _, row := range rows {
113+
var docCount any
114+
var originalKey uint32
115+
if query.prefixLength == 0 {
116+
if len(row.Cols) != 1 {
117+
logger.ErrorWithCtx(query.ctx).Msgf(
118+
"unexpected number of columns in ip_prefix aggregation response, len: %d, row: %v", len(row.Cols), row)
119+
continue
120+
}
121+
docCount = row.Cols[0].Value
122+
} else {
123+
if len(row.Cols) != 2 {
124+
logger.ErrorWithCtx(query.ctx).Msgf(
125+
"unexpected number of columns in ip_prefix aggregation response, len: %d, row: %v", len(row.Cols), row)
126+
continue
127+
}
128+
var ok bool
129+
originalKey, ok = row.Cols[0].Value.(uint32)
130+
if !ok {
131+
logger.ErrorWithCtx(query.ctx).Msgf("unexpected type of key in ip_prefix aggregation response, got %T", row.Cols[0])
132+
continue
133+
}
134+
docCount = row.Cols[1].Value
135+
}
136+
137+
bucket := model.JsonMap{
138+
"key": query.calcKey(originalKey) + keySuffix,
139+
"doc_count": docCount,
140+
"prefix_length": query.prefixLength,
141+
"is_ipv6": query.isIpv6,
142+
}
143+
if !query.isIpv6 {
144+
bucket["netmask"] = netmask
145+
}
146+
buckets = append(buckets, bucket)
147+
}
148+
149+
// usual case
150+
if !query.keyed {
151+
return model.JsonMap{
152+
"buckets": buckets,
153+
}
154+
}
155+
156+
// unusual case: transform result buckets a bit
157+
keyedBuckets := make(model.JsonMap, len(buckets))
158+
for _, bucket := range buckets {
159+
key := bucket["key"].(string)
160+
delete(bucket, "key")
161+
keyedBuckets[key] = bucket
162+
}
163+
return model.JsonMap{
164+
"buckets": keyedBuckets,
165+
}
166+
}
167+
168+
func (query *IpPrefix) String() string {
169+
return "ip_prefix"
170+
}
171+
172+
// SqlSelectQuery returns the SQL query: intDiv(ip_field, some_power_of_2)
173+
// ipv4 only for now
174+
func (query *IpPrefix) SqlSelectQuery() model.Expr {
175+
if query.prefixLength == 0 {
176+
return nil
177+
}
178+
return model.NewFunction("intDiv", query.field, model.NewLiteral(query.divideByToGroupBy()))
179+
}
180+
181+
func (query *IpPrefix) divideByToGroupBy() uint32 {
182+
return 1 << (32 - query.prefixLength)
183+
}
184+
185+
func (query *IpPrefix) calcKey(originalKey uint32) string {
186+
if query.prefixLength == 0 {
187+
return "0.0.0.0"
188+
}
189+
ipAsInt := originalKey * query.divideByToGroupBy()
190+
part4 := ipAsInt % 256
191+
ipAsInt /= 256
192+
part3 := ipAsInt % 256
193+
ipAsInt /= 256
194+
part2 := ipAsInt % 256
195+
ipAsInt /= 256
196+
part1 := ipAsInt % 256
197+
return fmt.Sprintf("%d.%d.%d.%d", part1, part2, part3, part4)
198+
}
199+
200+
func (query *IpPrefix) calcNetMask() string {
201+
if query.prefixLength == 0 {
202+
return "0.0.0.0"
203+
}
204+
biggestPossibleKey := uint32(1<<query.prefixLength - 1)
205+
return query.calcKey(biggestPossibleKey) // netmask is the same as ip of biggest possible key
206+
}

quesma/queryparser/aggregation_parser.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,16 @@ func (cw *ClickhouseQueryTranslator) parseArrayField(queryMap QueryMap, fieldNam
302302
return nil, fmt.Errorf("array field '%s' not found in aggregation queryMap: %v", fieldName, queryMap)
303303
}
304304

305+
func (cw *ClickhouseQueryTranslator) parseBoolField(queryMap QueryMap, fieldName string, defaultValue bool) bool {
306+
if valueRaw, exists := queryMap[fieldName]; exists {
307+
if asBool, ok := valueRaw.(bool); ok {
308+
return asBool
309+
}
310+
logger.WarnWithCtx(cw.Ctx).Msgf("%s is not a bool, but %T, value: %v. Using default: %v", fieldName, valueRaw, valueRaw, defaultValue)
311+
}
312+
return defaultValue
313+
}
314+
305315
// parseFieldFieldMaybeScript is basically almost a copy of parseFieldField above, but it also handles a basic script, if "field" is missing.
306316
func (cw *ClickhouseQueryTranslator) parseFieldFieldMaybeScript(shouldBeMap any, aggregationType string) (field model.Expr, isFromScript bool) {
307317
Map, ok := shouldBeMap.(QueryMap)

quesma/queryparser/pancake_aggregation_parser_buckets.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ func (cw *ClickhouseQueryTranslator) pancakeTryBucketAggregation(aggregation *pa
3636
}},
3737
{"multi_terms", cw.parseMultiTerms},
3838
{"composite", cw.parseComposite},
39+
{"ip_prefix", cw.parseIpPrefix},
3940
}
4041

4142
for _, aggr := range aggregationHandlers {
@@ -359,6 +360,35 @@ func (cw *ClickhouseQueryTranslator) parseComposite(aggregation *pancakeAggregat
359360
return nil
360361
}
361362

363+
func (cw *ClickhouseQueryTranslator) parseIpPrefix(aggregation *pancakeAggregationTreeNode, params QueryMap) error {
364+
const (
365+
defaultIsIpv6 = false
366+
defaultAppendPrefixLength = false
367+
defaultKeyed = false
368+
defaultMinDocCount = 1
369+
)
370+
371+
if err := bucket_aggregations.CheckParamsIpPrefix(cw.Ctx, params); err != nil {
372+
return err
373+
}
374+
375+
aggr := bucket_aggregations.NewIpPrefix(
376+
cw.Ctx,
377+
cw.parseFieldField(params, "ip_prefix"),
378+
cw.parseIntField(params, "prefix_length", 0), // default doesn't matter, it's required
379+
cw.parseBoolField(params, "is_ipv6", defaultIsIpv6),
380+
cw.parseBoolField(params, "append_prefix_length", defaultAppendPrefixLength),
381+
cw.parseBoolField(params, "keyed", defaultKeyed),
382+
cw.parseIntField(params, "min_doc_count", defaultMinDocCount),
383+
)
384+
if sql := aggr.SqlSelectQuery(); sql != nil {
385+
aggregation.selectedColumns = append(aggregation.selectedColumns, sql)
386+
aggregation.orderBy = append(aggregation.orderBy, model.NewOrderByExprWithoutOrder(sql))
387+
}
388+
aggregation.queryType = aggr
389+
return nil
390+
}
391+
362392
func (cw *ClickhouseQueryTranslator) parseOrder(params QueryMap, fieldExpressions []model.Expr) ([]model.OrderByExpr, error) {
363393
defaultDirection := model.DescOrder
364394
defaultOrderBy := model.NewOrderByExpr(model.NewCountFunc(), defaultDirection)

0 commit comments

Comments
 (0)