[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions#4104
[FLINK-36165][source-connector/mysql] Support capturing snapshot data with conditions#4104Mrart wants to merge 3 commits into
Conversation
… with conditions [FLINK-36165][source-connector/mysql] add docs [FLINK-36165][source-connector/mysql] Implement snapshot filter for MySQL table source [FLINK-36165][source-connector/mysql] Escape dot [FLINK-36165 ] fixed supported escape like 'city != 'China:beijing''
d0738af to
c3005b6
Compare
fixed checkstyle fixed test fixed MySqlTableSourceFactoryTest test error.
| <tr> | ||
| <td>scan.snapshot.filters</td> | ||
| <td>optional</td> | ||
| <td style="word-wrap: break-word;">(none)</td> | ||
| <td>String</td> | ||
| <td>When reading a table snapshot, the rows of captured tables will be filtered using the specified filter expression (AKA a SQL WHERE clause). <br> | ||
| By default, no filter is applied, meaning the entire table will be synchronized. <br> | ||
| A colon (:) separates table name and filter expression, while a semicolon (;) separate multiple filters, | ||
| e.g. `db1.user_table_[0-9]+:id > 100;db[1-2].[app|web]_order_\\.*:id < 0;`. | ||
| </td> | ||
| </tr> |
There was a problem hiding this comment.
How about explicitly stating that the filter conditions are combined using AND and that it has nothing to do with the binlog step?
| Map<Selectors, String> snapshotFilters = toSelector(filters); | ||
|
|
||
| String filter = null; | ||
| for (Selectors selector : snapshotFilters.keySet()) { | ||
| if (selector.isMatch( | ||
| org.apache.flink.cdc.common.event.TableId.tableId( | ||
| tableId.catalog(), tableId.table()))) { | ||
| filter = snapshotFilters.get(selector); | ||
| break; | ||
| } |
There was a problem hiding this comment.
Can we fail fast on unknown/nonexiststent columns in filters?
| public static Long queryRowCnt( | ||
| JdbcConnection jdbc, TableId tableId, String columnName, @Nullable String filter) | ||
| throws SQLException { | ||
|
|
||
| if (filter == null) { | ||
| return queryApproximateRowCnt(jdbc, tableId); | ||
| } | ||
|
|
||
| final String cntQuery = | ||
| String.format( | ||
| "SELECT COUNT(%s) FROM %s WHERE %s", | ||
| quote(columnName), quote(tableId), filter); | ||
| return jdbc.queryAndMap( | ||
| cntQuery, | ||
| rs -> { | ||
| if (!rs.next()) { | ||
| // this should never happen | ||
| throw new SQLException( | ||
| String.format( | ||
| "No result returned after running query [%s]", cntQuery)); | ||
| } | ||
| return rs.getLong(1); | ||
| }); | ||
| } | ||
|
|
There was a problem hiding this comment.
Because the filter is applied during split planning, the distribution factor may fall outside the configured bounds(0.05 ~1,000), affecting snapshot performance. I’d appreciate your thoughts on this.
SML0127
left a comment
There was a problem hiding this comment.
Thanks for the PR. I've left a few minor comments.
|
It seems the author of #3776 is actively working on their PR, and there's some duplicated work... It would be nice if we can discuss & implement this nice feature at one place. |
issue link: https://issues.apache.org/jira/browse/FLINK-36165
fixed #3776