Skip to content

Commit 46a8b24

Browse files
committed
[core] Introduce transform predicate for complex predicate
1 parent 86e2e2f commit 46a8b24

18 files changed

Lines changed: 713 additions & 0 deletions

paimon-common/src/main/java/org/apache/paimon/data/BinaryString.java

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1012,4 +1012,89 @@ public static int decodeUTF8Strict(MemorySegment segment, int sp, int len, char[
10121012
public static String defaultDecodeUTF8(byte[] bytes, int offset, int len) {
10131013
return new String(bytes, offset, len, StandardCharsets.UTF_8);
10141014
}
1015+
1016+
/**
1017+
* Concatenates input strings together into a single string. Returns NULL if any argument is
1018+
* NULL.
1019+
*/
1020+
public static BinaryString concat(Iterable<BinaryString> inputs) {
1021+
// Compute the total length of the result.
1022+
int totalLength = 0;
1023+
for (BinaryString input : inputs) {
1024+
if (input == null) {
1025+
return null;
1026+
}
1027+
1028+
totalLength += input.getSizeInBytes();
1029+
}
1030+
1031+
// Allocate a new byte array, and copy the inputs one by one into it.
1032+
final byte[] result = new byte[totalLength];
1033+
int offset = 0;
1034+
for (BinaryString input : inputs) {
1035+
if (input != null) {
1036+
int len = input.getSizeInBytes();
1037+
MemorySegmentUtils.copyToBytes(
1038+
input.getSegments(), input.getOffset(), result, offset, len);
1039+
offset += len;
1040+
}
1041+
}
1042+
return fromBytes(result);
1043+
}
1044+
1045+
/**
1046+
* Concatenates input strings together into a single string using the separator. Returns NULL If
1047+
* the separator is NULL.
1048+
*
1049+
* <p>Note: CONCAT_WS() does not skip any empty strings, however it does skip any NULL values
1050+
* after the separator. For example, concat_ws(",", "a", null, "c") would yield "a,c".
1051+
*/
1052+
public static BinaryString concatWs(BinaryString separator, Iterable<BinaryString> inputs) {
1053+
if (null == separator) {
1054+
return null;
1055+
}
1056+
1057+
int numInputBytes = 0; // total number of bytes from the inputs
1058+
int numInputs = 0; // number of non-null inputs
1059+
for (BinaryString input : inputs) {
1060+
if (input != null) {
1061+
numInputBytes += input.getSizeInBytes();
1062+
numInputs++;
1063+
}
1064+
}
1065+
1066+
if (numInputs == 0) {
1067+
// Return an empty string if there is no input, or all the inputs are null.
1068+
return EMPTY_UTF8;
1069+
}
1070+
1071+
// Allocate a new byte array, and copy the inputs one by one into it.
1072+
// The size of the new array is the size of all inputs, plus the separators.
1073+
final byte[] result =
1074+
new byte[numInputBytes + (numInputs - 1) * separator.getSizeInBytes()];
1075+
int offset = 0;
1076+
1077+
int j = 0;
1078+
for (BinaryString input : inputs) {
1079+
if (input != null) {
1080+
int len = input.getSizeInBytes();
1081+
MemorySegmentUtils.copyToBytes(
1082+
input.getSegments(), input.getOffset(), result, offset, len);
1083+
offset += len;
1084+
1085+
j++;
1086+
// Add separator if this is not the last input.
1087+
if (j < numInputs) {
1088+
MemorySegmentUtils.copyToBytes(
1089+
separator.getSegments(),
1090+
separator.getOffset(),
1091+
result,
1092+
offset,
1093+
separator.getSizeInBytes());
1094+
offset += separator.getSizeInBytes();
1095+
}
1096+
}
1097+
}
1098+
return fromBytes(result);
1099+
}
10151100
}

paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexPredicate.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.paimon.predicate.PredicateVisitor;
3232
import org.apache.paimon.predicate.SortValue;
3333
import org.apache.paimon.predicate.TopN;
34+
import org.apache.paimon.predicate.TransformPredicate;
3435
import org.apache.paimon.types.RowType;
3536

3637
import org.slf4j.Logger;
@@ -131,6 +132,11 @@ public Set<String> visit(CompoundPredicate predicate) {
131132
}
132133
return names;
133134
}
135+
136+
@Override
137+
public Set<String> visit(TransformPredicate predicate) {
138+
throw new UnsupportedOperationException();
139+
}
134140
});
135141
}
136142

@@ -197,5 +203,10 @@ public FileIndexResult visit(CompoundPredicate predicate) {
197203
return compoundResult == null ? REMAIN : compoundResult;
198204
}
199205
}
206+
207+
@Override
208+
public FileIndexResult visit(TransformPredicate predicate) {
209+
throw new UnsupportedOperationException();
210+
}
200211
}
201212
}

paimon-common/src/main/java/org/apache/paimon/fileindex/FileIndexReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.paimon.predicate.FieldRef;
2222
import org.apache.paimon.predicate.FunctionVisitor;
2323
import org.apache.paimon.predicate.TopN;
24+
import org.apache.paimon.predicate.TransformPredicate;
2425

2526
import java.util.List;
2627

@@ -124,4 +125,9 @@ public FileIndexResult visitOr(List<FileIndexResult> children) {
124125
public FileIndexResult visitTopN(TopN topN, FileIndexResult result) {
125126
return REMAIN;
126127
}
128+
129+
@Override
130+
public FileIndexResult visit(TransformPredicate predicate) {
131+
return REMAIN;
132+
}
127133
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import org.apache.paimon.data.BinaryString;
22+
23+
import java.util.List;
24+
25+
/** Concat {@link Transform}. */
26+
public class ConcatTransform extends StringTransform {
27+
28+
private static final long serialVersionUID = 1L;
29+
30+
public ConcatTransform(List<Object> inputs) {
31+
super(inputs);
32+
}
33+
34+
@Override
35+
public BinaryString transform(List<BinaryString> inputs) {
36+
return BinaryString.concat(inputs);
37+
}
38+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.paimon.predicate;
20+
21+
import org.apache.paimon.data.BinaryString;
22+
23+
import java.util.List;
24+
25+
import static org.apache.paimon.utils.Preconditions.checkArgument;
26+
27+
/** ConcatWs {@link Transform}. */
28+
public class ConcatWsTransform extends StringTransform {
29+
30+
private static final long serialVersionUID = 1L;
31+
32+
public ConcatWsTransform(List<Object> inputs) {
33+
super(inputs);
34+
checkArgument(inputs.size() >= 2);
35+
}
36+
37+
@Override
38+
public BinaryString transform(List<BinaryString> inputs) {
39+
BinaryString separator = inputs.get(0);
40+
return BinaryString.concatWs(separator, inputs.subList(1, inputs.size()));
41+
}
42+
}

paimon-common/src/main/java/org/apache/paimon/predicate/LeafPredicateExtractor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,4 +41,9 @@ public Map<String, LeafPredicate> visit(CompoundPredicate predicate) {
4141
}
4242
return Collections.emptyMap();
4343
}
44+
45+
@Override
46+
public Map<String, LeafPredicate> visit(TransformPredicate predicate) {
47+
throw new UnsupportedOperationException();
48+
}
4449
}

paimon-common/src/main/java/org/apache/paimon/predicate/OnlyPartitionKeyEqualVisitor.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,9 @@ public Boolean visitAnd(List<Boolean> children) {
121121
public Boolean visitOr(List<Boolean> children) {
122122
return false;
123123
}
124+
125+
@Override
126+
public Boolean visit(TransformPredicate predicate) {
127+
return false;
128+
}
124129
}

paimon-common/src/main/java/org/apache/paimon/predicate/PartitionPredicateVisitor.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,4 +45,17 @@ public Boolean visit(CompoundPredicate predicate) {
4545
}
4646
return true;
4747
}
48+
49+
@Override
50+
public Boolean visit(TransformPredicate predicate) {
51+
Transform transform = predicate.transform();
52+
for (Object input : transform.inputs()) {
53+
if (input instanceof FieldRef) {
54+
if (!partitionKeys.contains(((FieldRef) input).name())) {
55+
return false;
56+
}
57+
}
58+
}
59+
return true;
60+
}
4861
}

paimon-common/src/main/java/org/apache/paimon/predicate/PredicateProjectionConverter.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,4 +63,9 @@ public Optional<Predicate> visit(CompoundPredicate predicate) {
6363
}
6464
return Optional.of(new CompoundPredicate(predicate.function(), converted));
6565
}
66+
67+
@Override
68+
public Optional<Predicate> visit(TransformPredicate predicate) {
69+
throw new UnsupportedOperationException("TODO");
70+
}
6671
}

paimon-common/src/main/java/org/apache/paimon/predicate/PredicateVisitor.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,4 +24,6 @@ public interface PredicateVisitor<T> {
2424
T visit(LeafPredicate predicate);
2525

2626
T visit(CompoundPredicate predicate);
27+
28+
T visit(TransformPredicate predicate);
2729
}

0 commit comments

Comments
 (0)