Skip to content

Commit 0676902

Browse files
committed
[flink]some fix
1 parent 7e3c311 commit 0676902

File tree

9 files changed

+173
-79
lines changed

9 files changed

+173
-79
lines changed

fluss-common/src/main/java/com/alibaba/fluss/predicate/Contains.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20-
import com.alibaba.fluss.row.BinaryString;
2120
import com.alibaba.fluss.types.DataType;
2221

2322
import java.util.List;
@@ -36,8 +35,8 @@ private Contains() {}
3635

3736
@Override
3837
public boolean test(DataType type, Object field, Object patternLiteral) {
39-
BinaryString fieldString = (BinaryString) field;
40-
return fieldString.contains((BinaryString) patternLiteral);
38+
String fieldString = field.toString();
39+
return fieldString.contains((String) patternLiteral);
4140
}
4241

4342
@Override

fluss-common/src/main/java/com/alibaba/fluss/predicate/EndsWith.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20-
import com.alibaba.fluss.row.BinaryString;
2120
import com.alibaba.fluss.types.DataType;
2221

2322
import java.util.List;
@@ -39,8 +38,8 @@ private EndsWith() {}
3938

4039
@Override
4140
public boolean test(DataType type, Object field, Object patternLiteral) {
42-
BinaryString fieldString = (BinaryString) field;
43-
return fieldString.endsWith((BinaryString) patternLiteral);
41+
String fieldString = field.toString();
42+
return fieldString.endsWith((String) patternLiteral);
4443
}
4544

4645
@Override

fluss-common/src/main/java/com/alibaba/fluss/predicate/StartsWith.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717

1818
package com.alibaba.fluss.predicate;
1919

20-
import com.alibaba.fluss.row.BinaryString;
2120
import com.alibaba.fluss.types.DataType;
2221

2322
import java.util.List;
@@ -39,8 +38,8 @@ private StartsWith() {}
3938

4039
@Override
4140
public boolean test(DataType type, Object field, Object patternLiteral) {
42-
BinaryString fieldString = (BinaryString) field;
43-
return fieldString.startsWith((BinaryString) patternLiteral);
41+
String fieldString = field.toString();
42+
return fieldString.startsWith((String) patternLiteral);
4443
}
4544

4645
@Override
@@ -51,9 +50,9 @@ public boolean test(
5150
Object max,
5251
Long nullCount,
5352
Object patternLiteral) {
54-
BinaryString minStr = (BinaryString) min;
55-
BinaryString maxStr = (BinaryString) max;
56-
BinaryString pattern = (BinaryString) patternLiteral;
53+
String minStr = min.toString();
54+
String maxStr = max.toString();
55+
String pattern = patternLiteral.toString();
5756
return (minStr.startsWith(pattern) || minStr.compareTo(pattern) <= 0)
5857
&& (maxStr.startsWith(pattern) || maxStr.compareTo(pattern) >= 0);
5958
}

fluss-common/src/test/java/com/alibaba/fluss/predicate/PredicateTest.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -237,15 +237,32 @@ public void testNotInNull() {
237237
@Test
238238
public void testEndsWith() {
239239
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
240-
Predicate predicate = builder.endsWith(0, fromString("bcc"));
240+
Predicate predicate = builder.endsWith(0, ("bcc"));
241241
GenericRow row = GenericRow.of(fromString("aabbcc"));
242242

243-
GenericRow max = GenericRow.of(fromString("aaba"));
244-
GenericRow min = GenericRow.of(fromString("aabb"));
245-
Integer[] nullCount = {null};
246243
assertThat(predicate.test(row)).isEqualTo(true);
247244
}
248245

246+
@Test
247+
public void testStartWith() {
248+
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
249+
Predicate predicate = builder.startsWith(0, ("aab"));
250+
GenericRow row = GenericRow.of(fromString("aabbcc"));
251+
252+
assertThat(predicate.test(row)).isEqualTo(true);
253+
}
254+
255+
@Test
256+
public void testContainsWith() {
257+
PredicateBuilder builder = new PredicateBuilder(RowType.of(new StringType()));
258+
Predicate predicate = builder.contains(0, ("def"));
259+
GenericRow row1 = GenericRow.of(fromString("aabbdefcc"));
260+
GenericRow row2 = GenericRow.of(fromString("aabbdcefcc"));
261+
262+
assertThat(predicate.test(row1)).isEqualTo(true);
263+
assertThat(predicate.test(row2)).isEqualTo(false);
264+
}
265+
249266
@Test
250267
public void testLargeIn() {
251268
PredicateBuilder builder = new PredicateBuilder(RowType.of(new IntType()));

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/row/FlinkAsFlussRow.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,18 @@
1717

1818
package com.alibaba.fluss.flink.row;
1919

20+
import com.alibaba.fluss.flink.utils.FlinkConversions;
2021
import com.alibaba.fluss.row.BinaryString;
2122
import com.alibaba.fluss.row.Decimal;
2223
import com.alibaba.fluss.row.InternalRow;
2324
import com.alibaba.fluss.row.TimestampLtz;
2425
import com.alibaba.fluss.row.TimestampNtz;
2526

2627
import org.apache.flink.table.data.DecimalData;
28+
import org.apache.flink.table.data.GenericRowData;
2729
import org.apache.flink.table.data.RowData;
2830
import org.apache.flink.table.data.TimestampData;
31+
import org.apache.flink.table.types.DataType;
2932

3033
/** Wraps a Flink {@link RowData} as a Fluss {@link InternalRow}. */
3134
public class FlinkAsFlussRow implements InternalRow {
@@ -132,4 +135,12 @@ public byte[] getBinary(int pos, int length) {
132135
public byte[] getBytes(int pos) {
133136
return flinkRow.getBinary(pos);
134137
}
138+
139+
public static Object fromFlinkObject(Object o, DataType type) {
140+
if (o == null) {
141+
return null;
142+
}
143+
return InternalRow.createFieldGetter(FlinkConversions.toFlussType(type), 0)
144+
.getFieldOrNull((new FlinkAsFlussRow()).replace(GenericRowData.of(o)));
145+
}
135146
}

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkSource.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class FlinkSource<OUT>
6464
private final boolean streaming;
6565
private final FlussDeserializationSchema<OUT> deserializationSchema;
6666

67-
private Predicate predicate;
67+
private Predicate partitionFilters;
6868

6969
public FlinkSource(
7070
Configuration flussConf,
@@ -77,7 +77,7 @@ public FlinkSource(
7777
long scanPartitionDiscoveryIntervalMs,
7878
FlussDeserializationSchema<OUT> deserializationSchema,
7979
boolean streaming,
80-
Predicate predicate) {
80+
Predicate partitionFilters) {
8181
this.flussConf = flussConf;
8282
this.tablePath = tablePath;
8383
this.hasPrimaryKey = hasPrimaryKey;
@@ -88,7 +88,7 @@ public FlinkSource(
8888
this.scanPartitionDiscoveryIntervalMs = scanPartitionDiscoveryIntervalMs;
8989
this.deserializationSchema = deserializationSchema;
9090
this.streaming = streaming;
91-
this.predicate = predicate;
91+
this.partitionFilters = partitionFilters;
9292
}
9393

9494
@Override
@@ -108,7 +108,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> createEnumerator(
108108
offsetsInitializer,
109109
scanPartitionDiscoveryIntervalMs,
110110
streaming,
111-
predicate);
111+
partitionFilters);
112112
}
113113

114114
@Override
@@ -126,7 +126,7 @@ public SplitEnumerator<SourceSplitBase, SourceEnumeratorState> restoreEnumerator
126126
offsetsInitializer,
127127
scanPartitionDiscoveryIntervalMs,
128128
streaming,
129-
predicate);
129+
partitionFilters);
130130
}
131131

132132
@Override

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/FlinkTableSource.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ public class FlinkTableSource
133133

134134
private long limit = -1;
135135

136-
@Nullable protected Predicate predicate;
136+
@Nullable protected Predicate partitionFilters;
137137

138138
public FlinkTableSource(
139139
TablePath tablePath,
@@ -275,7 +275,7 @@ public boolean isBounded() {
275275
scanPartitionDiscoveryIntervalMs,
276276
new RowDataDeserializationSchema(),
277277
streaming,
278-
predicate);
278+
partitionFilters);
279279

280280
if (!streaming) {
281281
// return a bounded source provide to make planner happy,
@@ -369,7 +369,7 @@ public DynamicTableSource copy() {
369369
source.projectedFields = projectedFields;
370370
source.singleRowFilter = singleRowFilter;
371371
source.modificationScanType = modificationScanType;
372-
source.predicate = predicate;
372+
source.partitionFilters = partitionFilters;
373373
return source;
374374
}
375375

@@ -466,7 +466,7 @@ && hasPrimaryKey()
466466
converted.add(p);
467467
}
468468
}
469-
predicate = converted.isEmpty() ? null : PredicateBuilder.and(converted);
469+
partitionFilters = converted.isEmpty() ? null : PredicateBuilder.and(converted);
470470
return Result.of(acceptedFilters, remainingFilters);
471471
}
472472

fluss-flink/fluss-flink-common/src/main/java/com/alibaba/fluss/flink/source/PredicateConverter.java

Lines changed: 19 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,12 @@
1818

1919
package com.alibaba.fluss.flink.source;
2020

21+
import com.alibaba.fluss.flink.row.FlinkAsFlussRow;
2122
import com.alibaba.fluss.flink.utils.FlinkConversions;
2223
import com.alibaba.fluss.predicate.Predicate;
2324
import com.alibaba.fluss.predicate.PredicateBuilder;
24-
import com.alibaba.fluss.row.BinaryString;
2525
import com.alibaba.fluss.utils.TypeUtils;
2626

27-
import org.apache.flink.table.data.conversion.DataStructureConverters;
2827
import org.apache.flink.table.expressions.CallExpression;
2928
import org.apache.flink.table.expressions.Expression;
3029
import org.apache.flink.table.expressions.ExpressionVisitor;
@@ -38,14 +37,12 @@
3837
import org.apache.flink.table.types.logical.LogicalType;
3938
import org.apache.flink.table.types.logical.LogicalTypeFamily;
4039
import org.apache.flink.table.types.logical.RowType;
41-
import org.apache.paimon.flink.FlinkRowWrapper;
4240

4341
import java.util.ArrayList;
4442
import java.util.List;
4543
import java.util.Objects;
4644
import java.util.Optional;
4745
import java.util.function.BiFunction;
48-
import java.util.regex.Matcher;
4946
import java.util.regex.Pattern;
5047

5148
import static org.apache.flink.table.types.logical.utils.LogicalTypeCasts.supportsImplicitCast;
@@ -72,7 +69,10 @@ public PredicateConverter(PredicateBuilder builder) {
7269
}
7370

7471
/** Accepts simple LIKE patterns like "abc%". */
75-
private static final Pattern BEGIN_PATTERN = Pattern.compile("([^%]+)%");
72+
private static final Pattern BEGIN_PATTERN = Pattern.compile("^[^%_]+%$");
73+
74+
private static final Pattern END_PATTERN = Pattern.compile("^%[^%_]+$");
75+
private static final Pattern CONTAINS_PATTERN = Pattern.compile("^%[^%_]+%$");
7676

7777
@Override
7878
public Predicate visit(CallExpression call) {
@@ -137,51 +137,20 @@ public Predicate visit(CallExpression call) {
137137
fieldRefExpr.getOutputDataType(),
138138
children.get(2)))
139139
.toString();
140-
String escapedSqlPattern = sqlPattern;
141-
boolean allowQuick = false;
142-
if (escape == null && !sqlPattern.contains("_")) {
143-
allowQuick = true;
144-
} else if (escape != null) {
145-
if (escape.length() != 1) {
146-
throw new UnsupportedExpression();
147-
}
148-
char escapeChar = escape.charAt(0);
149-
boolean matched = true;
150-
int i = 0;
151-
StringBuilder sb = new StringBuilder();
152-
while (i < sqlPattern.length() && matched) {
153-
char c = sqlPattern.charAt(i);
154-
if (c == escapeChar) {
155-
if (i == (sqlPattern.length() - 1)) {
156-
throw new UnsupportedExpression();
157-
}
158-
char nextChar = sqlPattern.charAt(i + 1);
159-
if (nextChar == '%') {
160-
matched = false;
161-
} else if ((nextChar == '_') || (nextChar == escapeChar)) {
162-
sb.append(nextChar);
163-
i += 1;
164-
} else {
165-
throw new UnsupportedExpression();
166-
}
167-
} else if (c == '_') {
168-
matched = false;
169-
} else {
170-
sb.append(c);
171-
}
172-
i = i + 1;
140+
141+
if (escape == null) {
142+
if (BEGIN_PATTERN.matcher(sqlPattern).matches()) {
143+
String prefix = sqlPattern.substring(0, sqlPattern.length() - 1);
144+
return builder.startsWith(builder.indexOf(fieldRefExpr.getName()), prefix);
173145
}
174-
if (matched) {
175-
allowQuick = true;
176-
escapedSqlPattern = sb.toString();
146+
if (END_PATTERN.matcher(sqlPattern).matches()) {
147+
String suffix = sqlPattern.substring(1);
148+
return builder.endsWith(builder.indexOf(fieldRefExpr.getName()), suffix);
177149
}
178-
}
179-
if (allowQuick) {
180-
Matcher beginMatcher = BEGIN_PATTERN.matcher(escapedSqlPattern);
181-
if (beginMatcher.matches()) {
182-
return builder.startsWith(
183-
builder.indexOf(fieldRefExpr.getName()),
184-
BinaryString.fromString(beginMatcher.group(1)));
150+
if (CONTAINS_PATTERN.matcher(sqlPattern).matches()
151+
&& sqlPattern.indexOf('%', 1) == sqlPattern.length() - 1) {
152+
String mid = sqlPattern.substring(1, sqlPattern.length() - 1);
153+
return builder.contains(builder.indexOf(fieldRefExpr.getName()), mid);
185154
}
186155
}
187156
}
@@ -197,7 +166,7 @@ private Predicate visitBiFunction(
197166
BiFunction<Integer, Object, Predicate> visit1,
198167
BiFunction<Integer, Object, Predicate> visit2) {
199168
Optional<FieldReferenceExpression> fieldRefExpr = extractFieldReference(children.get(0));
200-
if (fieldRefExpr.isPresent()) {
169+
if (fieldRefExpr.isPresent() && builder.indexOf(fieldRefExpr.get().getName()) != -1) {
201170
Object literal =
202171
extractLiteral(fieldRefExpr.get().getOutputDataType(), children.get(1));
203172
return visit1.apply(builder.indexOf(fieldRefExpr.get().getName()), literal);
@@ -239,10 +208,7 @@ private Object extractLiteral(DataType expectedType, Expression expression) {
239208
Object value = valueOpt.get();
240209
if (actualLogicalType.getTypeRoot().equals(expectedLogicalType.getTypeRoot())
241210
&& !isStringType(expectedLogicalType)) {
242-
return FlinkRowWrapper.fromFlinkObject(
243-
DataStructureConverters.getConverter(expectedType)
244-
.toInternalOrNull(value),
245-
expectedLogicalType);
211+
return FlinkAsFlussRow.fromFlinkObject(value, expectedType);
246212
} else if (isStringType(actualLogicalType) || isStringType(expectedLogicalType)) {
247213
return value.toString();
248214
} else if (supportsImplicitCast(actualLogicalType, expectedLogicalType)) {

0 commit comments

Comments
 (0)