55import com .github .shyiko .mysql .binlog .event .deserialization .EventDeserializer ;
66import io .tapdata .common .concurrent .ConcurrentProcessor ;
77import io .tapdata .common .concurrent .TapExecutors ;
8+ import io .tapdata .common .ddl .DDLFactory ;
9+ import io .tapdata .common .ddl .ccj .CCJBaseDDLWrapper ;
10+ import io .tapdata .common .ddl .type .DDLParserType ;
11+ import io .tapdata .common .ddl .wrapper .DDLWrapperConfig ;
812import io .tapdata .connector .mysql .config .MysqlConfig ;
913import io .tapdata .connector .mysql .entity .MysqlBinlogPosition ;
1014import io .tapdata .connector .mysql .util .MySQLJsonParser ;
1115import io .tapdata .entity .event .TapEvent ;
16+ import io .tapdata .entity .event .ddl .TapDDLEvent ;
17+ import io .tapdata .entity .event .ddl .TapDDLUnknownEvent ;
1218import io .tapdata .entity .event .dml .TapDeleteRecordEvent ;
1319import io .tapdata .entity .event .dml .TapInsertRecordEvent ;
1420import io .tapdata .entity .event .dml .TapUpdateRecordEvent ;
@@ -46,6 +52,8 @@ public class MysqlReaderV2 {
4652 private final Map <String , LinkedHashMap <String , String >> dataTypeMap = new ConcurrentHashMap <>();
4753 private final Map <String , Map <String , Object []>> enumDataTypeMap = new ConcurrentHashMap <>();
4854 private final TimeZone timeZone ;
55+ private final DDLWrapperConfig DDL_WRAPPER_CONFIG = CCJBaseDDLWrapper .CCJDDLWrapperConfig .create ().split ("`" );
56+ private final DDLParserType ddlParserType = DDLParserType .MYSQL_CCJ_SQL_PARSER ;
4957
5058 public MysqlReaderV2 (MysqlJdbcContextV2 mysqlJdbcContext , Log tapLogger , TimeZone timeZone ) {
5159 this .tapLogger = tapLogger ;
@@ -94,6 +102,8 @@ public void startMiner(Supplier<Boolean> isAlive) throws Throwable {
94102 tapLogger .info ("Binlog rotated to: {}/{}" , rotateEventData .getBinlogFilename (), rotateEventData .getBinlogPosition ());
95103 } else if (eventType == EventType .TABLE_MAP ) {
96104 handleTableMapEvent (event );
105+ } else if (eventType == EventType .QUERY ) {
106+ concurrentProcessor .runAsyncWithBlocking (new ScanEvent (event , currentBinlogFile .get ()), this ::emit );
97107 }
98108
99109 // 异步处理事件
@@ -153,6 +163,39 @@ private OffsetEvent emit(ScanEvent scanEvent) {
153163 return null ;
154164 }
155165
166+ if (eventType == EventType .QUERY ) {
167+ QueryEventData queryEventData = event .getData ();
168+ long eventTime = header .getTimestamp ();
169+ String ddl = StringKit .removeSqlNote (queryEventData .getSql ());
170+ OffsetEvent offsetEvent = new OffsetEvent ();
171+ List <TapEvent > ddlEvents = new ArrayList <>();
172+ try {
173+ DDLFactory .ddlToTapDDLEvent (
174+ ddlParserType ,
175+ ddl ,
176+ DDL_WRAPPER_CONFIG ,
177+ tableMap ,
178+ tapDDLEvent -> {
179+ tapDDLEvent .setTime (System .currentTimeMillis ());
180+ tapDDLEvent .setReferenceTime (eventTime );
181+ tapDDLEvent .setOriginDDL (ddl );
182+ ddlEvents .add (tapDDLEvent );
183+ tapLogger .info ("Read DDL: " + ddl + ", about to be packaged as some event(s)" );
184+ }
185+ );
186+ } catch (Throwable e ) {
187+ TapDDLEvent tapDDLEvent = new TapDDLUnknownEvent ();
188+ tapDDLEvent .setTime (System .currentTimeMillis ());
189+ tapDDLEvent .setReferenceTime (eventTime );
190+ tapDDLEvent .setOriginDDL (ddl );
191+ ddlEvents .add (tapDDLEvent );
192+ }
193+ offsetEvent .setTapEvent (ddlEvents );
194+ offsetEvent .setMysqlBinlogPosition (extractBinlogPosition (event , scanEvent .getFileName ()));
195+ ddlEvents .forEach (e -> ddlFlush (((TapDDLEvent ) e ).getTableId ()));
196+ return offsetEvent ;
197+ }
198+
156199 // 处理数据变更事件
157200 List <TapEvent > tapEvents ;
158201 MysqlBinlogPosition position ;
@@ -198,7 +241,14 @@ private void handleTableMapEvent(Event event) {
198241
199242 // 保存映射关系
200243 tableMapEventByTableId .put (tableId , tableMapEventData );
244+ ddlFlush (table );
245+ tapLogger .debug ("Table map event: tableId={}, database={}, table={}" , tableId , database , table );
246+ }
201247
248+ private void ddlFlush (String table ) {
249+ if (EmptyKit .isBlank (table )) {
250+ return ;
251+ }
202252 LinkedHashMap <String , String > dataTypes = tableMap .get (table ).getNameFieldMap ().entrySet ().stream ()
203253 .collect (Collectors .toMap (Map .Entry ::getKey , e -> StringKit .removeParentheses (e .getValue ().getDataType ()),
204254 (existing , replacement ) -> existing , LinkedHashMap ::new ));
@@ -218,7 +268,6 @@ private void handleTableMapEvent(Event event) {
218268 return enumValues ;
219269 }));
220270 enumDataTypeMap .put (table , enumMap );
221- tapLogger .debug ("Table map event: tableId={}, database={}, table={}" , tableId , database , table );
222271 }
223272
224273 /**
@@ -459,6 +508,9 @@ static class OffsetEvent {
459508 private List <TapEvent > tapEvents ;
460509 private MysqlBinlogPosition mysqlBinlogPosition ;
461510
511+ public OffsetEvent () {
512+ }
513+
462514 public OffsetEvent (List <TapEvent > tapEvents , MysqlBinlogPosition mysqlBinlogPosition ) {
463515 this .tapEvents = tapEvents ;
464516 this .mysqlBinlogPosition = mysqlBinlogPosition ;
0 commit comments