Description
Proposal:
Add a function argument to the mean
aggregate function called maintainType:
(defaulted to false
to maintain backward compatibility), which when set will guarantee that the resulting value is of the same type as the initial value.
Current behavior:
Unlike the other aggregates, the mean
aggregate function is hardcoded to return an floating point. However, this is often not necessary (if the input domain is large enough that we don't care for decimal precision) and in many cases creates a very hard to overcome obstacle by changing the type of stream when passing through an aggregateWindow
.
Desired behavior:
When specified, an integer column should be downsampled to an integer result column.
Alternatives considered:
As per the influx documentation itself, I have considered using a types.isType
filter to split my stream into multiple streams. However, types.isType
is not a push-down filter, and therefore if I am to do:
from(bucket:"data")
|> range(start: -15m)
// |> filter(fn: (r) => types.isType(v: r._value, type: "float") // <- adding this filter breaks the pushdown
|> aggregateWindow(fn: mean, every:5m)
|> set(key: "ds", value: tag)
the entire aggregate calculation is pulled into flux and then dumped back into influx. On the current production server, doing this pushes my query from <30 seconds to ~10 minutes long. We have approx 1300 fields in total.
I have found no possible remedy to this problem that doesn't break the pushdown optimization, short of tagging my fields with their types. But this isn't DRY. Also, as far as I can tell, it isn't clear what happens when an |> to()
step fails due to a type mismatch: does all the subsequent data get dropped, is it only the data that isn't of the correct type? I do not know if this is defined behaviour per the documentation standard.
I am currently using a two step process where I manually compute mean aggregates using an ephemeral (1 hour RP) bucket:
a =
from(bucket:"data")
|> range(start: -15m)
|> aggregateWindow(fn: sum, every:5m)
|> set(key: "agg", value: "sum")
|> set(key: "ds", value: "5m")
b =
from(bucket:"data")
|> range(start: -15m)
|> aggregateWindow(fn: count, every:5m)
|> set(key: "agg", value: "count")
|> set(key: "ds", value: "5m")
join.time(left: a |> drop(columns: ["aggregate"]),
right: b |> drop(columns: ["aggregate"]),
as: (l,r) => ({l with _value: if types.isType(v: l._value, type: "float") then l._value / float(v: r._value) else l._value / r._value}))
(note that this method isn't completely DRY because it forces the developer to be aware of which tags exist on each field)
Use case:
this is extremely important for being able to write large, maintainable DRY code for downsampling and generic tasks, where the input data is either not known at design time or is too broad to hardcode into a task (due to there not being an iteration mechanism to run through a bag of fields).