diff --git a/README.md b/README.md index f3e29051ef..42d99530d4 100644 --- a/README.md +++ b/README.md @@ -99,5 +99,5 @@ canal 作为 MySQL binlog 增量获取和解析工具,可将变更记录投递 - [阿里巴巴数据库连接池开源项目 Druid](https://github.com/alibaba/druid) - [阿里巴巴实时数据同步工具 DTS](https://www.aliyun.com/product/dts) -## 问题反馈 +## 问题反馈 - 报告 issue: [github issues](https://github.com/alibaba/canal/issues) diff --git a/deployer/src/main/resources/canal.properties b/deployer/src/main/resources/canal.properties index e6bdc4cc7d..442c4cd3de 100644 --- a/deployer/src/main/resources/canal.properties +++ b/deployer/src/main/resources/canal.properties @@ -64,9 +64,6 @@ canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false -canal.instance.filter.dml.insert = false -canal.instance.filter.dml.update = false -canal.instance.filter.dml.delete = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED diff --git a/deployer/src/main/resources/example/instance.properties b/deployer/src/main/resources/example/instance.properties index fa8af1c82a..894da56c7f 100644 --- a/deployer/src/main/resources/example/instance.properties +++ b/deployer/src/main/resources/example/instance.properties @@ -45,6 +45,12 @@ canal.instance.filter.black.regex=mysql\\.slave_.* #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch +# table regex for insert-dml +canal.instance.filter.dml.insert.regex = +# table regex for update-dml +canal.instance.filter.dml.update.regex = +# table regex for delete-dml +canal.instance.filter.dml.delete.regex = # mq config canal.mq.topic=example diff --git a/deployer/src/main/resources/spring/default-instance.xml b/deployer/src/main/resources/spring/default-instance.xml index 9907ae80ba..ed1a97d410 100644 --- a/deployer/src/main/resources/spring/default-instance.xml +++ b/deployer/src/main/resources/spring/default-instance.xml @@ -174,9 +174,24 @@ - - - + + + + + + + + + + + + + + + + + + diff --git a/deployer/src/main/resources/spring/file-instance.xml b/deployer/src/main/resources/spring/file-instance.xml index 800f981284..cb1e36ea0d 100644 --- a/deployer/src/main/resources/spring/file-instance.xml +++ b/deployer/src/main/resources/spring/file-instance.xml @@ -160,9 +160,24 @@ - - - + + + + + + + + + + + + + + + + + + diff --git a/deployer/src/main/resources/spring/group-instance.xml b/deployer/src/main/resources/spring/group-instance.xml index b0b887f9fd..b412042684 100644 --- a/deployer/src/main/resources/spring/group-instance.xml +++ b/deployer/src/main/resources/spring/group-instance.xml @@ -157,9 +157,24 @@ - - - + + + + + + + + + + + + + + + + + + diff --git a/deployer/src/main/resources/spring/memory-instance.xml b/deployer/src/main/resources/spring/memory-instance.xml index a7dc634c17..7fcc6df361 100644 --- a/deployer/src/main/resources/spring/memory-instance.xml +++ b/deployer/src/main/resources/spring/memory-instance.xml @@ -148,9 +148,24 @@ - - - + + + + + + + + + + + + + + + + + + diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java index a8768131c2..10a34d52e9 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/AbstractMysqlEventParser.java @@ -39,9 +39,9 @@ public abstract class AbstractMysqlEventParser extends AbstractEventParser { protected boolean filterTableError = false; protected boolean useDruidDdlFilter = true; - protected boolean filterDmlInsert = false; - protected boolean filterDmlUpdate = false; - protected boolean filterDmlDelete = false; + protected CanalEventFilter dmlInsertTableFilter = null; + protected CanalEventFilter dmlUpdateTableFilter = null; + protected CanalEventFilter dmlDeleteTableFilter = null; // instance received binlog bytes protected final AtomicLong receivedBinlogBytes = new AtomicLong(0L); private final AtomicLong eventsPublishBlockingTime = new AtomicLong(0L); @@ -178,7 +178,10 @@ protected MultiStageCoprocessor buildMultiStageCoprocessor() { parallelThreadSize, (LogEventConvert) binlogParser, transactionBuffer, - destination, filterDmlInsert, filterDmlUpdate, filterDmlDelete); + destination, + (AviaterRegexFilter)dmlInsertTableFilter, + (AviaterRegexFilter)dmlUpdateTableFilter, + (AviaterRegexFilter)dmlDeleteTableFilter); mysqlMultiStageCoprocessor.setEventsPublishBlockingTime(eventsPublishBlockingTime); return mysqlMultiStageCoprocessor; } @@ -229,28 +232,15 @@ public void setUseDruidDdlFilter(boolean useDruidDdlFilter) { this.useDruidDdlFilter = useDruidDdlFilter; } - public boolean isFilterDmlInsert() { - return filterDmlInsert; + public void setDmlInsertTableFilter(CanalEventFilter dmlInsertTableFilter) { + this.dmlInsertTableFilter = dmlInsertTableFilter; } - - public void setFilterDmlInsert(boolean filterDmlInsert) { - this.filterDmlInsert = filterDmlInsert; - } - - public boolean isFilterDmlUpdate() { - return filterDmlUpdate; - } - - public void setFilterDmlUpdate(boolean filterDmlUpdate) { - this.filterDmlUpdate = filterDmlUpdate; - } - - public boolean isFilterDmlDelete() { - return filterDmlDelete; + public void setDmlUpdateTableFilter(CanalEventFilter dmlUpdateTableFilter) { + this.dmlUpdateTableFilter = dmlUpdateTableFilter; } - public void setFilterDmlDelete(boolean filterDmlDelete) { - this.filterDmlDelete = filterDmlDelete; + public void setDmlDeleteTableFilter(CanalEventFilter dmlDeleteTableFilter) { + this.dmlDeleteTableFilter = dmlDeleteTableFilter; } public void setEnableTsdb(boolean enableTsdb) { diff --git a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java index 06b38c7a24..0252062dd3 100644 --- a/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java +++ b/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/mysql/MysqlMultiStageCoprocessor.java @@ -37,6 +37,8 @@ import com.taobao.tddl.dbsync.binlog.event.RowsLogEvent; import com.taobao.tddl.dbsync.binlog.event.UpdateRowsLogEvent; import com.taobao.tddl.dbsync.binlog.event.WriteRowsLogEvent; +import com.alibaba.otter.canal.filter.aviater.AviaterRegexFilter; +import com.taobao.tddl.dbsync.binlog.exception.TableIdNotFoundException; /** * 针对解析器提供一个多阶段协同的处理 @@ -71,21 +73,23 @@ public class MysqlMultiStageCoprocessor extends AbstractCanalLifeCycle implement private BatchEventProcessor simpleParserStage; private BatchEventProcessor sinkStoreStage; private LogContext logContext; - protected boolean filterDmlInsert = false; - protected boolean filterDmlUpdate = false; - protected boolean filterDmlDelete = false; + protected volatile AviaterRegexFilter dmlInsertTableNameFilter = null; + protected volatile AviaterRegexFilter dmlUpdateTableNameFilter = null; + protected volatile AviaterRegexFilter dmlDeleteTableNameFilter = null; public MysqlMultiStageCoprocessor(int ringBufferSize, int parserThreadCount, LogEventConvert logEventConvert, EventTransactionBuffer transactionBuffer, String destination, - boolean filterDmlInsert, boolean filterDmlUpdate, boolean filterDmlDelete){ + AviaterRegexFilter dmlInsertTableNameFilter, + AviaterRegexFilter dmlUpdateTableNameFilter, + AviaterRegexFilter dmlDeleteTableNameFilter){ this.ringBufferSize = ringBufferSize; this.parserThreadCount = parserThreadCount; this.logEventConvert = logEventConvert; this.transactionBuffer = transactionBuffer; this.destination = destination; - this.filterDmlInsert = filterDmlInsert; - this.filterDmlUpdate = filterDmlUpdate; - this.filterDmlDelete = filterDmlDelete; + this.dmlInsertTableNameFilter = dmlInsertTableNameFilter; + this.dmlUpdateTableNameFilter = dmlUpdateTableNameFilter; + this.dmlDeleteTableNameFilter = dmlDeleteTableNameFilter; } @Override @@ -261,6 +265,14 @@ public SimpleParserStage(LogContext context){ } } + private Boolean isDmlFilterTable(AviaterRegexFilter nameFilter, RowsLogEvent event){ + if (event.getTable() == null) { + // tableId对应的记录不存在 + throw new TableIdNotFoundException("not found tableId:" + event.getTableId()); + } + return nameFilter.filter(event.getTable().getDbName() + "." + event.getTable().getTableName()); + } + public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throws Exception { try { LogEvent logEvent = event.getEvent(); @@ -276,7 +288,7 @@ public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throw switch (eventType) { case LogEvent.WRITE_ROWS_EVENT_V1: case LogEvent.WRITE_ROWS_EVENT: - if (!filterDmlInsert) { + if (!isDmlFilterTable(dmlInsertTableNameFilter, (WriteRowsLogEvent) logEvent)) { tableMeta = logEventConvert.parseRowsEventForTableMeta((WriteRowsLogEvent) logEvent); needDmlParse = true; } @@ -284,14 +296,14 @@ public void onEvent(MessageEvent event, long sequence, boolean endOfBatch) throw case LogEvent.UPDATE_ROWS_EVENT_V1: case LogEvent.PARTIAL_UPDATE_ROWS_EVENT: case LogEvent.UPDATE_ROWS_EVENT: - if (!filterDmlUpdate) { + if (!isDmlFilterTable(dmlUpdateTableNameFilter, (UpdateRowsLogEvent) logEvent)) { tableMeta = logEventConvert.parseRowsEventForTableMeta((UpdateRowsLogEvent) logEvent); needDmlParse = true; } break; case LogEvent.DELETE_ROWS_EVENT_V1: case LogEvent.DELETE_ROWS_EVENT: - if (!filterDmlDelete) { + if (!isDmlFilterTable(dmlDeleteTableNameFilter, (DeleteRowsLogEvent) logEvent)) { tableMeta = logEventConvert.parseRowsEventForTableMeta((DeleteRowsLogEvent) logEvent); needDmlParse = true; }