|
| 1 | +/* |
| 2 | + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one |
| 3 | + * or more contributor license agreements. Licensed under the Elastic License |
| 4 | + * 2.0; you may not use this file except in compliance with the Elastic License |
| 5 | + * 2.0. |
| 6 | + */ |
| 7 | + |
| 8 | +package org.elasticsearch.xpack.esql.analysis; |
| 9 | + |
| 10 | +import org.elasticsearch.xpack.esql.core.expression.Alias; |
| 11 | +import org.elasticsearch.xpack.esql.core.expression.Expression; |
| 12 | +import org.elasticsearch.xpack.esql.core.expression.Literal; |
| 13 | +import org.elasticsearch.xpack.esql.core.expression.NamedExpression; |
| 14 | +import org.elasticsearch.xpack.esql.core.expression.TypedAttribute; |
| 15 | +import org.elasticsearch.xpack.esql.core.type.DataType; |
| 16 | +import org.elasticsearch.xpack.esql.core.util.Holder; |
| 17 | +import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; |
| 18 | +import org.elasticsearch.xpack.esql.expression.function.aggregate.FilteredExpression; |
| 19 | +import org.elasticsearch.xpack.esql.expression.function.aggregate.HistogramMergeOverTime; |
| 20 | +import org.elasticsearch.xpack.esql.expression.function.aggregate.LastOverTime; |
| 21 | +import org.elasticsearch.xpack.esql.expression.function.aggregate.TimeSeriesAggregateFunction; |
| 22 | +import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; |
| 23 | +import org.elasticsearch.xpack.esql.plan.logical.TimeSeriesAggregate; |
| 24 | +import org.elasticsearch.xpack.esql.rule.Rule; |
| 25 | + |
| 26 | +import java.util.List; |
| 27 | + |
| 28 | +/** |
| 29 | + * Ensures that {@link TypedAttribute}s used inside a {@link TimeSeriesAggregate} are wrapped in a |
| 30 | + * {@link TimeSeriesAggregateFunction}. |
| 31 | + * Examples: |
| 32 | + * <pre> |
| 33 | + * foo + bar -> |
| 34 | + * LAST_OVER_TIME(foo) + LAST_OVER_TIME(bar) |
| 35 | + * |
| 36 | + * SUM(foo + LAST_OVER_TIME(bar)) -> |
| 37 | + * SUM(LAST_OVER_TIME(foo) + LAST_OVER_TIME(bar)) |
| 38 | + * |
| 39 | + * foo / 2 + bar * 2 -> |
| 40 | + * LAST_OVER_TIME(foo) / 2 + LAST_OVER_TIME(bar) * 2 |
| 41 | + * </pre> |
| 42 | + */ |
| 43 | +public class InsertDefaultInnerTimeSeriesAggregate extends Rule<LogicalPlan, LogicalPlan> { |
| 44 | + @Override |
| 45 | + public LogicalPlan apply(LogicalPlan logicalPlan) { |
| 46 | + return logicalPlan.transformUp(node -> node instanceof TimeSeriesAggregate, this::rule); |
| 47 | + } |
| 48 | + |
| 49 | + public LogicalPlan rule(TimeSeriesAggregate aggregate) { |
| 50 | + Holder<Boolean> changed = new Holder<>(false); |
| 51 | + List<NamedExpression> newAggregates = aggregate.aggregates().stream().map(agg -> { |
| 52 | + if (agg instanceof Alias alias) { |
| 53 | + return alias.replaceChild(addDefaultInnerAggs(alias.child(), aggregate.timestamp(), changed)); |
| 54 | + } else { |
| 55 | + return agg; |
| 56 | + } |
| 57 | + }).toList(); |
| 58 | + if (changed.get() == false) { |
| 59 | + return aggregate; |
| 60 | + } |
| 61 | + return aggregate.with(aggregate.groupings(), newAggregates); |
| 62 | + } |
| 63 | + |
| 64 | + private static Expression addDefaultInnerAggs(Expression expression, Expression timestamp, Holder<Boolean> changed) { |
| 65 | + return expression.transformDownSkipBranch((expr, skipBranch) -> { |
| 66 | + // the default is to end the traversal here as we're either done or a recursive call will handle it |
| 67 | + skipBranch.set(true); |
| 68 | + return switch (expr) { |
| 69 | + // this is already a time series aggregation, no need to go deeper |
| 70 | + case TimeSeriesAggregateFunction ts -> ts; |
| 71 | + // only transform field, not all children (such as inline filter or window) |
| 72 | + case AggregateFunction af -> af.withField(addDefaultInnerAggs(af.field(), timestamp, changed)); |
| 73 | + // avoid modifying filter conditions, just the delegate |
| 74 | + case FilteredExpression filtered -> filtered.withDelegate(addDefaultInnerAggs(filtered.delegate(), timestamp, changed)); |
| 75 | + // if we reach a TypedAttribute, it hasn't been wrapped in a TimeSeriesAggregateFunction yet |
| 76 | + // (otherwise the traversal would have stopped earlier) |
| 77 | + // so we wrap it with a default one |
| 78 | + case TypedAttribute ta -> insertDefaultInnerAggregation(ta, timestamp, changed); |
| 79 | + default -> { |
| 80 | + // for other expressions, continue the traversal |
| 81 | + skipBranch.set(false); |
| 82 | + yield expr; |
| 83 | + } |
| 84 | + }; |
| 85 | + }); |
| 86 | + } |
| 87 | + |
| 88 | + private static TimeSeriesAggregateFunction insertDefaultInnerAggregation( |
| 89 | + TypedAttribute attr, |
| 90 | + Expression timestamp, |
| 91 | + Holder<Boolean> changed |
| 92 | + ) { |
| 93 | + changed.set(true); |
| 94 | + if (attr.dataType() == DataType.EXPONENTIAL_HISTOGRAM || attr.dataType() == DataType.TDIGEST) { |
| 95 | + return new HistogramMergeOverTime(attr.source(), attr, Literal.TRUE, AggregateFunction.NO_WINDOW); |
| 96 | + } else { |
| 97 | + return new LastOverTime(attr.source(), attr, AggregateFunction.NO_WINDOW, timestamp); |
| 98 | + } |
| 99 | + } |
| 100 | +} |
0 commit comments