Skip to content

VariationTableFunction extension #15550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

package org.apache.iotdb.commons.udf.builtin.relational.tvf;

import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException;
import org.apache.iotdb.udf.api.exception.UDFException;
import org.apache.iotdb.udf.api.relational.TableFunction;
import org.apache.iotdb.udf.api.relational.access.Record;
Expand Down Expand Up @@ -50,6 +51,8 @@ public class VariationTableFunction implements TableFunction {
private static final String DATA_PARAMETER_NAME = "DATA";
private static final String COL_PARAMETER_NAME = "COL";
private static final String DELTA_PARAMETER_NAME = "DELTA";
private static final String IGNORE_NULL_PARAMETER_NAME = "IGNORE_NULL";
private static Type type = null;

@Override
public List<ParameterSpecification> getArgumentsSpecifications() {
Expand All @@ -62,19 +65,44 @@ public List<ParameterSpecification> getArgumentsSpecifications() {
ScalarParameterSpecification.builder()
.name(DELTA_PARAMETER_NAME)
.type(Type.DOUBLE)
.defaultValue(0.0)
.build(),
ScalarParameterSpecification.builder()
.name(IGNORE_NULL_PARAMETER_NAME)
.type(Type.BOOLEAN)
.defaultValue(false)
.build());
}

@Override
public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDFException {
double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue();
TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME);
String expectedFieldName =
(String) ((ScalarArgument) arguments.get(COL_PARAMETER_NAME)).getValue();
int requiredIndex =
findColumnIndex(
tableArgument,
expectedFieldName,
ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE));
ImmutableSet.of(
Type.INT32,
Type.INT64,
Type.FLOAT,
Type.DOUBLE,
Type.BOOLEAN,
Type.STRING,
Type.TEXT,
Type.BLOB,
Type.TIMESTAMP));
type = tableArgument.getFieldTypes().get(requiredIndex);
if ((type.equals(Type.BOOLEAN)
|| type.equals(Type.STRING)
|| type.equals(Type.TEXT)
|| type.equals(Type.BLOB))
&& delta != 0.0) {
throw new UDFArgumentNotValidException(
"delta argument for function variation() must be 0.0 if the data type of the column is boolean, string, text or blob");
}
DescribedSchema properColumnSchema =
new DescribedSchema.Builder().addField("window_index", Type.INT64).build();
// outputColumnSchema
Expand All @@ -83,6 +111,9 @@ public TableFunctionAnalysis analyze(Map<String, Argument> arguments) throws UDF
.addProperty(
DELTA_PARAMETER_NAME,
((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue())
.addProperty(
IGNORE_NULL_PARAMETER_NAME,
((ScalarArgument) arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue())
.build();
return TableFunctionAnalysis.builder()
.properColumnSchema(properColumnSchema)
Expand All @@ -102,41 +133,130 @@ public TableFunctionProcessorProvider getProcessorProvider(
TableFunctionHandle tableFunctionHandle) {
double delta =
(double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME);
boolean ignoreNull =
(boolean)
((MapTableFunctionHandle) tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME);
return new TableFunctionProcessorProvider() {
@Override
public TableFunctionDataProcessor getDataProcessor() {
return new VariationDataProcessor(delta);
if (type.equals(Type.INT32)
|| type.equals(Type.INT64)
|| type.equals(Type.FLOAT)
|| type.equals(Type.DOUBLE)
|| type.equals(Type.TIMESTAMP)) {
return new VariationDataProcessor(delta, ignoreNull);
} else {
return new NonNumericProcessor(ignoreNull);
}
}
};
}

private static class VariationDataProcessor implements TableFunctionDataProcessor {

private final double gap;
private final boolean ignoreNull;
private long currentStartIndex = -1;
private double baseValue = 0;
private long curIndex = 0;
private long windowIndex = 0;

public VariationDataProcessor(double delta) {
public VariationDataProcessor(double delta, boolean ignoreNull) {
this.gap = delta;
this.ignoreNull = ignoreNull;
}

@Override
public void process(
Record input,
List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
double minValue = Double.MIN_VALUE;
if (input.isNull(0)) {
if (baseValue != minValue) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
currentStartIndex = curIndex;
baseValue = Double.MIN_VALUE;
}
} else {
double value = input.getDouble(0);
if (currentStartIndex == -1) {
// init the first window
currentStartIndex = curIndex;
baseValue = value;
} else if (baseValue == minValue) {
if (!ignoreNull) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
}
currentStartIndex = curIndex;
baseValue = value;
} else if (Math.abs(value - baseValue) > gap) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
currentStartIndex = curIndex;
// use the first value in the window as the base value
baseValue = value;
}
}
curIndex++;
}

@Override
public void finish(
List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
}

private void outputWindow(
List<ColumnBuilder> properColumnBuilders, ColumnBuilder passThroughIndexBuilder) {
for (long i = currentStartIndex; i < curIndex; i++) {
properColumnBuilders.get(0).writeLong(windowIndex);
passThroughIndexBuilder.writeLong(i);
}
windowIndex++;
}
}

private static class NonNumericProcessor implements TableFunctionDataProcessor {

private final boolean ignoreNull;
private long currentStartIndex = -1;
private Object baseValue = null;
private long curIndex = 0;
private long windowIndex = 0;

public NonNumericProcessor(boolean ignoreNull) {
this.ignoreNull = ignoreNull;
}

@Override
public void process(
Record input,
List<ColumnBuilder> properColumnBuilders,
ColumnBuilder passThroughIndexBuilder) {
double value = input.getDouble(0);
if (currentStartIndex == -1) {
// init the first window
currentStartIndex = curIndex;
baseValue = value;
} else if (Math.abs(value - baseValue) > gap) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
currentStartIndex = curIndex;
// use the first value in the window as the base value
baseValue = value;
if (input.isNull(0)) {
if (baseValue != null) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
currentStartIndex = curIndex;
baseValue = null;
}
} else {
Object value = input.getObject(0);
if (currentStartIndex == -1) {
// init the first window
currentStartIndex = curIndex;
baseValue = value;
} else if (baseValue == null) {
if (!ignoreNull) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
}
currentStartIndex = curIndex;
baseValue = value;
} else if (!baseValue.equals(value)) {
outputWindow(properColumnBuilders, passThroughIndexBuilder);
currentStartIndex = curIndex;
// use the first value in the window as the base value
baseValue = value;
}
}
curIndex++;
}
Expand Down
Loading