Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat][Connector-v2][Iceberg]support filter conditions in iceberg source #9095

Open
wants to merge 14 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions docs/en/connector-v2/source/Iceberg.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,10 @@ libfb303-xxx.jar
| use_snapshot_id | long | no | - | Instructs this scan to look for use the given snapshot ID. |
| use_snapshot_timestamp | long | no | - | Instructs this scan to look for use the most recent snapshot as of the given time in milliseconds. timestamp – the timestamp in millis since the Unix epoch |
| stream_scan_strategy | enum | no | FROM_LATEST_SNAPSHOT | Starting strategy for stream mode execution, Default to use `FROM_LATEST_SNAPSHOT` if don’t specify any value,The optional values are:<br/>TABLE_SCAN_THEN_INCREMENTAL: Do a regular table scan then switch to the incremental mode.<br/>FROM_LATEST_SNAPSHOT: Start incremental mode from the latest snapshot inclusive.<br/>FROM_EARLIEST_SNAPSHOT: Start incremental mode from the earliest snapshot inclusive.<br/>FROM_SNAPSHOT_ID: Start incremental mode from a snapshot with a specific id inclusive.<br/>FROM_SNAPSHOT_TIMESTAMP: Start incremental mode from a snapshot with a specific timestamp inclusive. |
| increment.scan-interval | long | no | 2000 | The interval of increment scan(mills) |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
| increment.scan-interval | long | no | 2000 | The interval of increment scan(mills) |
| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](../source-common-options.md) for details. |
| where_clause | String | no | - | The where clause to filter the iceberg data. It mustn't contain the table name. For example: There is an iceberg table contains an `id` column with the `int` type, now we want to filter the data that `id`>100. Then you can input `id > 100`. |


## Task Example

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,10 @@ public class IcebergSourceOptions extends IcebergCommonOptions {
.longType()
.defaultValue(2000L)
.withDescription(" the interval of increment scan(mills)");

public static final Option<String> WHERE_CLAUSE =
Options.key("where_clause")
.stringType()
.noDefaultValue()
.withDescription("the iceberg where clause");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,222 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.iceberg.config;

import org.apache.commons.lang3.StringUtils;
import org.apache.iceberg.expressions.Expressions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.expression.DoubleValue;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.LongValue;
import net.sf.jsqlparser.expression.NotExpression;
import net.sf.jsqlparser.expression.NullValue;
import net.sf.jsqlparser.expression.Parenthesis;
import net.sf.jsqlparser.expression.StringValue;
import net.sf.jsqlparser.expression.operators.conditional.AndExpression;
import net.sf.jsqlparser.expression.operators.conditional.OrExpression;
import net.sf.jsqlparser.expression.operators.relational.ComparisonOperator;
import net.sf.jsqlparser.expression.operators.relational.EqualsTo;
import net.sf.jsqlparser.expression.operators.relational.GreaterThan;
import net.sf.jsqlparser.expression.operators.relational.GreaterThanEquals;
import net.sf.jsqlparser.expression.operators.relational.InExpression;
import net.sf.jsqlparser.expression.operators.relational.IsNullExpression;
import net.sf.jsqlparser.expression.operators.relational.LikeExpression;
import net.sf.jsqlparser.expression.operators.relational.MinorThan;
import net.sf.jsqlparser.expression.operators.relational.MinorThanEquals;
import net.sf.jsqlparser.expression.operators.relational.NotEqualsTo;
import net.sf.jsqlparser.expression.operators.relational.ParenthesedExpressionList;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;

import java.util.ArrayList;
import java.util.List;

class SQLToIcebergExprConverter {
private static final Logger log = LoggerFactory.getLogger(SQLToIcebergExprConverter.class);

public static org.apache.iceberg.expressions.Expression parseWhereClauseToIcebergExpression(
String whereClauseStr) {
if (StringUtils.isNotBlank(whereClauseStr)) {
try {
// use the JsqlParser to parse the where clause
Select select =
(Select) CCJSqlParserUtil.parse("SELECT * FROM t WHERE " + whereClauseStr);

PlainSelect plainSelect = (PlainSelect) select.getSelectBody();
net.sf.jsqlparser.expression.Expression whereClause = plainSelect.getWhere();

org.apache.iceberg.expressions.Expression icebergExpr =
convertToIcebergExpression(whereClause);
return icebergExpr;
} catch (JSQLParserException e) {
log.error("Failed to parse where clause: {}", whereClauseStr, e);
}
}
return Expressions.alwaysTrue();
}

private static org.apache.iceberg.expressions.Expression convertToIcebergExpression(
net.sf.jsqlparser.expression.Expression expr) {
if (expr instanceof Parenthesis) {
return convertToIcebergExpression(((Parenthesis) expr).getExpression());
}
if (expr instanceof AndExpression) {
return handleAndExpression((AndExpression) expr);
}
if (expr instanceof OrExpression) {
return handleOrExpression((OrExpression) expr);
}
if (expr instanceof NotExpression) {
return handleNotExpression((NotExpression) expr);
}
if (expr instanceof IsNullExpression) {
return handleIsNullExpression((IsNullExpression) expr);
}
if (expr instanceof InExpression) {
return handleInExpression((InExpression) expr);
}
if (expr instanceof LikeExpression) {
return handleLikeExpression((LikeExpression) expr);
}
if (expr instanceof ComparisonOperator) {
return handleComparisonOperator((ComparisonOperator) expr);
} else {
throw new UnsupportedOperationException(
"Unsupported expression type: " + expr.getClass());
}
}

private static org.apache.iceberg.expressions.Expression handleLikeExpression(
LikeExpression expr) {
String columnName = ((Column) expr.getLeftExpression()).getColumnName();
String value = ((StringValue) expr.getRightExpression()).getValue();
LikeExpression.KeyWord keyWord = expr.getLikeKeyWord();
if (keyWord == LikeExpression.KeyWord.LIKE) {
return Expressions.startsWith(columnName, value);
} else {
throw new UnsupportedOperationException("Unsupported like keyword: " + keyWord);
}
}

private static org.apache.iceberg.expressions.Expression handleInExpression(InExpression expr) {
String columnName = ((Column) expr.getLeftExpression()).getColumnName();
ParenthesedExpressionList<Expression> list =
(ParenthesedExpressionList) expr.getRightExpression();
List<Object> values = extractValuesFromItemsList(list);

if (expr.isNot()) {
// handle NOT IN
return Expressions.notIn(columnName, values.toArray());
} else {
// handle IN
return Expressions.in(columnName, values.toArray());
}
}

private static List<Object> extractValuesFromItemsList(
ParenthesedExpressionList<Expression> list) {
List<Object> res = new ArrayList<>();
for (Expression expression : list) {
res.add(getValueFromExpression(expression));
}
return res;
}

private static org.apache.iceberg.expressions.Expression handleAndExpression(
AndExpression expr) {
return Expressions.and(
convertToIcebergExpression(expr.getLeftExpression()),
convertToIcebergExpression(expr.getRightExpression()));
}

private static org.apache.iceberg.expressions.Expression handleOrExpression(OrExpression expr) {
return Expressions.or(
convertToIcebergExpression(expr.getLeftExpression()),
convertToIcebergExpression(expr.getRightExpression()));
}

private static org.apache.iceberg.expressions.Expression handleNotExpression(
NotExpression expr) {
return Expressions.not(convertToIcebergExpression(expr.getExpression()));
}

private static org.apache.iceberg.expressions.Expression handleComparisonOperator(
ComparisonOperator expr) {
String columnName = ((Column) expr.getLeftExpression()).getColumnName();
Object value = getValueFromExpression(expr.getRightExpression());

if (expr instanceof EqualsTo) {
return Expressions.equal(columnName, value);
} else if (expr instanceof NotEqualsTo) {
return Expressions.notEqual(columnName, value);
} else if (expr instanceof GreaterThan) {
return Expressions.greaterThan(columnName, value);
} else if (expr instanceof GreaterThanEquals) {
return Expressions.greaterThanOrEqual(columnName, value);
} else if (expr instanceof MinorThan) {
return Expressions.lessThan(columnName, value);
} else if (expr instanceof MinorThanEquals) {
return Expressions.lessThanOrEqual(columnName, value);
} else {
throw new UnsupportedOperationException(
"Unsupported comparison operator: " + expr.getClass());
}
}

private static org.apache.iceberg.expressions.Expression handleIsNullExpression(
IsNullExpression expr) {
String columnName = ((Column) expr.getLeftExpression()).getColumnName();
if (expr.isNot()) {
// handle IS NOT NULL
return Expressions.notNull(columnName);
} else {
// handle IS NULL
return Expressions.isNull(columnName);
}
}

private static Object getValueFromExpression(net.sf.jsqlparser.expression.Expression expr) {
if (expr instanceof LongValue) {
return ((LongValue) expr).getValue();
} else if (expr instanceof NullValue) {
return null;
} else if (expr instanceof StringValue) {
return ((StringValue) expr).getValue();
} else if (expr instanceof DoubleValue) {
return ((DoubleValue) expr).getValue();
} else if (expr instanceof Column) {
String columnName = ((Column) expr).getColumnName();
if (Boolean.TRUE.equals(Boolean.parseBoolean(columnName))) {
return Boolean.TRUE;
} else if (Boolean.FALSE.equals(Boolean.parseBoolean(columnName))) {
return Boolean.FALSE;
}
return ((Column) expr).getColumnName();
} else {
throw new UnsupportedOperationException("Unsupported value type: " + expr.getClass());
}
}
}
Loading
Loading