From 8278bc5b5ee76cd22995d45e4fde2ed2ab1fec50 Mon Sep 17 00:00:00 2001 From: Zhaoshuai Date: Wed, 21 May 2025 12:08:54 +0800 Subject: [PATCH 1/3] extend variation --- .../tvf/VariationTableFunction.java | 146 ++++++++++++++++-- 1 file changed, 133 insertions(+), 13 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index 284f623f3018..d50a6562500e 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -19,6 +19,7 @@ package org.apache.iotdb.commons.udf.builtin.relational.tvf; +import org.apache.iotdb.udf.api.exception.UDFArgumentNotValidException; import org.apache.iotdb.udf.api.exception.UDFException; import org.apache.iotdb.udf.api.relational.TableFunction; import org.apache.iotdb.udf.api.relational.access.Record; @@ -50,6 +51,8 @@ public class VariationTableFunction implements TableFunction { private static final String DATA_PARAMETER_NAME = "DATA"; private static final String COL_PARAMETER_NAME = "COL"; private static final String DELTA_PARAMETER_NAME = "DELTA"; + private static final String IGNORE_NULL_PARAMETER_NAME = "IGNORE_NULL"; + private static Type type = null; @Override public List getArgumentsSpecifications() { @@ -62,11 +65,18 @@ public List getArgumentsSpecifications() { ScalarParameterSpecification.builder() .name(DELTA_PARAMETER_NAME) .type(Type.DOUBLE) + .defaultValue(0.0) + .build(), + ScalarParameterSpecification.builder() + .name(IGNORE_NULL_PARAMETER_NAME) + .type(Type.BOOLEAN) + .defaultValue(false) .build()); } @Override public TableFunctionAnalysis analyze(Map arguments) throws UDFException { + double delta = (double) ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue(); TableArgument tableArgument = (TableArgument) arguments.get(DATA_PARAMETER_NAME); String expectedFieldName = (String) ((ScalarArgument) arguments.get(COL_PARAMETER_NAME)).getValue(); @@ -74,7 +84,25 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF findColumnIndex( tableArgument, expectedFieldName, - ImmutableSet.of(Type.INT32, Type.INT64, Type.FLOAT, Type.DOUBLE)); + ImmutableSet.of( + Type.INT32, + Type.INT64, + Type.FLOAT, + Type.DOUBLE, + Type.BOOLEAN, + Type.STRING, + Type.TEXT, + Type.BLOB, + Type.TIMESTAMP)); + type = tableArgument.getFieldTypes().get(requiredIndex); + if ((type.equals(Type.BOOLEAN) + || type.equals(Type.STRING) + || type.equals(Type.TEXT) + || type.equals(Type.BLOB)) + && delta != 0.0) { + throw new UDFArgumentNotValidException( + "delta argument for function variation() must be 0.0 if the data type of the column is boolean, string, text or blob"); + } DescribedSchema properColumnSchema = new DescribedSchema.Builder().addField("window_index", Type.INT64).build(); // outputColumnSchema @@ -83,6 +111,9 @@ public TableFunctionAnalysis analyze(Map arguments) throws UDF .addProperty( DELTA_PARAMETER_NAME, ((ScalarArgument) arguments.get(DELTA_PARAMETER_NAME)).getValue()) + .addProperty( + IGNORE_NULL_PARAMETER_NAME, + ((ScalarArgument) arguments.get(IGNORE_NULL_PARAMETER_NAME)).getValue()) .build(); return TableFunctionAnalysis.builder() .properColumnSchema(properColumnSchema) @@ -102,10 +133,21 @@ public TableFunctionProcessorProvider getProcessorProvider( TableFunctionHandle tableFunctionHandle) { double delta = (double) ((MapTableFunctionHandle) tableFunctionHandle).getProperty(DELTA_PARAMETER_NAME); + boolean ignoreNull = + (boolean) + ((MapTableFunctionHandle) tableFunctionHandle).getProperty(IGNORE_NULL_PARAMETER_NAME); return new TableFunctionProcessorProvider() { @Override public TableFunctionDataProcessor getDataProcessor() { - return new VariationDataProcessor(delta); + if (type.equals(Type.INT32) + || type.equals(Type.INT64) + || type.equals(Type.FLOAT) + || type.equals(Type.DOUBLE) + || type.equals(Type.TIMESTAMP)) { + return new VariationDataProcessor(delta, ignoreNull); + } else { + return new NonNumericProcessor(ignoreNull); + } } }; } @@ -113,13 +155,77 @@ public TableFunctionDataProcessor getDataProcessor() { private static class VariationDataProcessor implements TableFunctionDataProcessor { private final double gap; + private final boolean ignoreNull; private long currentStartIndex = -1; private double baseValue = 0; private long curIndex = 0; private long windowIndex = 0; + private final double minValue = Double.MIN_VALUE; - public VariationDataProcessor(double delta) { + public VariationDataProcessor(double delta, boolean ignoreNull) { this.gap = delta; + this.ignoreNull = ignoreNull; + } + + @Override + public void process( + Record input, + List properColumnBuilders, + ColumnBuilder passThroughIndexBuilder) { + if (input.isNull(0)) { + if (baseValue != minValue) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + baseValue = Double.MIN_VALUE; + } + } else { + double value = input.getDouble(0); + if (currentStartIndex == -1) { + // init the first window + currentStartIndex = curIndex; + baseValue = value; + } else if (baseValue == minValue) { + if (!ignoreNull) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + } + currentStartIndex = curIndex; + baseValue = value; + } else if (Math.abs(value - baseValue) > gap) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + // use the first value in the window as the base value + baseValue = value; + } + } + curIndex++; + } + + @Override + public void finish( + List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + } + + private void outputWindow( + List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { + for (long i = currentStartIndex; i < curIndex; i++) { + properColumnBuilders.get(0).writeLong(windowIndex); + passThroughIndexBuilder.writeLong(i); + } + windowIndex++; + } + } + + private static class NonNumericProcessor implements TableFunctionDataProcessor { + + private final boolean ignoreNull; + private long currentStartIndex = -1; + private Object baseValue = null; + private long curIndex = 0; + private long windowIndex = 0; + + public NonNumericProcessor(boolean ignoreNull) { + this.ignoreNull = ignoreNull; } @Override @@ -127,16 +233,30 @@ public void process( Record input, List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - double value = input.getDouble(0); - if (currentStartIndex == -1) { - // init the first window - currentStartIndex = curIndex; - baseValue = value; - } else if (Math.abs(value - baseValue) > gap) { - outputWindow(properColumnBuilders, passThroughIndexBuilder); - currentStartIndex = curIndex; - // use the first value in the window as the base value - baseValue = value; + if (input.isNull(0)) { + if (baseValue != null) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + baseValue = null; + } + } else { + Object value = input.getObject(0); + if (currentStartIndex == -1) { + // init the first window + currentStartIndex = curIndex; + baseValue = value; + } else if (baseValue == null) { + if (!ignoreNull) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + } + currentStartIndex = curIndex; + baseValue = value; + } else if (!baseValue.equals(value)) { + outputWindow(properColumnBuilders, passThroughIndexBuilder); + currentStartIndex = curIndex; + // use the first value in the window as the base value + baseValue = value; + } } curIndex++; } From 1f1c1ed831df00af2e98e3104c9cb13f950af3c4 Mon Sep 17 00:00:00 2001 From: Zhaoshuai Date: Wed, 21 May 2025 12:10:18 +0800 Subject: [PATCH 2/3] extend variation --- .../udf/builtin/relational/tvf/VariationTableFunction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index d50a6562500e..ea08490f24f2 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -160,9 +160,8 @@ private static class VariationDataProcessor implements TableFunctionDataProcesso private double baseValue = 0; private long curIndex = 0; private long windowIndex = 0; - private final double minValue = Double.MIN_VALUE; - public VariationDataProcessor(double delta, boolean ignoreNull) { + public VariationDataProcessor(double delta, boolean ignoreNull) { this.gap = delta; this.ignoreNull = ignoreNull; } @@ -172,7 +171,8 @@ public void process( Record input, List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - if (input.isNull(0)) { + double minValue = Double.MIN_VALUE; + if (input.isNull(0)) { if (baseValue != minValue) { outputWindow(properColumnBuilders, passThroughIndexBuilder); currentStartIndex = curIndex; From f3e570a857771f1889e30f6980f4b874268f83de Mon Sep 17 00:00:00 2001 From: Zhaoshuai Date: Wed, 21 May 2025 15:44:48 +0800 Subject: [PATCH 3/3] spotless:apply --- .../udf/builtin/relational/tvf/VariationTableFunction.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java index ea08490f24f2..a3695d8d8c9c 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/udf/builtin/relational/tvf/VariationTableFunction.java @@ -161,7 +161,7 @@ private static class VariationDataProcessor implements TableFunctionDataProcesso private long curIndex = 0; private long windowIndex = 0; - public VariationDataProcessor(double delta, boolean ignoreNull) { + public VariationDataProcessor(double delta, boolean ignoreNull) { this.gap = delta; this.ignoreNull = ignoreNull; } @@ -171,8 +171,8 @@ public void process( Record input, List properColumnBuilders, ColumnBuilder passThroughIndexBuilder) { - double minValue = Double.MIN_VALUE; - if (input.isNull(0)) { + double minValue = Double.MIN_VALUE; + if (input.isNull(0)) { if (baseValue != minValue) { outputWindow(properColumnBuilders, passThroughIndexBuilder); currentStartIndex = curIndex;