77import com .oceanbase .oms .logmessage .DataMessage ;
88import com .oceanbase .oms .logmessage .LogMessage ;
99import io .netty .util .BooleanSupplier ;
10+ import io .tapdata .common .concurrent .ConcurrentProcessor ;
11+ import io .tapdata .common .concurrent .TapExecutors ;
1012import io .tapdata .common .ddl .DDLFactory ;
1113import io .tapdata .common .ddl .ccj .CCJBaseDDLWrapper ;
1214import io .tapdata .common .ddl .type .DDLParserType ;
3133import java .math .BigDecimal ;
3234import java .time .LocalDate ;
3335import java .time .ZoneOffset ;
34- import java .util .ArrayList ;
35- import java .util .HashMap ;
36- import java .util .List ;
37- import java .util .Map ;
38- import java .util .concurrent .atomic .AtomicInteger ;
36+ import java .util .*;
37+ import java .util .concurrent .TimeUnit ;
3938import java .util .concurrent .atomic .AtomicReference ;
4039
40+ import static io .tapdata .base .ConnectorBase .list ;
41+
4142public class OceanbaseReader {
4243
4344 private OceanbaseConfig oceanbaseConfig ;
@@ -71,97 +72,74 @@ public void start(BooleanSupplier isAlive) throws Throwable {
7172 config .setTableWhiteList (oceanbaseConfig .getTenant () + "." + oceanbaseConfig .getDatabase () + ".*" );
7273 LogProxyClient client = new LogProxyClient (oceanbaseConfig .getLogProxyHost (), oceanbaseConfig .getLogProxyPort (), config );
7374 AtomicReference <Throwable > throwable = new AtomicReference <>();
74- AtomicReference <List <TapEvent >> eventList = new AtomicReference <>(new ArrayList <>());
75- AtomicInteger heartbeatCount = new AtomicInteger (0 );
76- client .addListener (new RecordListener () {
77- @ Override
78- public void notify (LogMessage message ) {
75+ try (
76+ ConcurrentProcessor <LogMessage , MessageEvent > concurrentProcessor = TapExecutors .createSimple (8 , 32 , "OceanBaseReader-Processor" )
77+ ) {
78+ Thread t = new Thread (() -> {
79+ AtomicReference <List <TapEvent >> events = new AtomicReference <>(list ());
80+ int heartbeat = 0 ;
81+ long lastTimestamp = 0L ;
7982 try {
80- String op = message .getOpt ().name ();
81- if (!tableList .contains (message .getTableName ())) {
82- switch (op ) {
83- case "INSERT" :
84- case "UPDATE" :
85- case "DELETE" :
86- return ;
87- }
88- }
89- Map <String , Object > after = DataMap .create ();
90- Map <String , Object > before = DataMap .create ();
91- analyzeMessage (message , after , before );
92- switch (op ) {
93- case "INSERT" :
94- eventList .get ().add (new TapInsertRecordEvent ().init ().table (message .getTableName ()).after (after ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
95- break ;
96- case "UPDATE" :
97- eventList .get ().add (new TapUpdateRecordEvent ().init ().table (message .getTableName ()).after (after ).before (before ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
98- break ;
99- case "DELETE" :
100- eventList .get ().add (new TapDeleteRecordEvent ().init ().table (message .getTableName ()).before (before ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
101- break ;
102- case "HEARTBEAT" :
103- if (heartbeatCount .incrementAndGet () >= 10 ) {
104- eventList .get ().add (new HeartbeatEvent ().init ().referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
105- heartbeatCount .set (0 );
106- consumer .accept (eventList .get (), Long .valueOf (message .getTimestamp ()));
107- eventList .set (new ArrayList <>());
83+ while (isAlive .get ()) {
84+ MessageEvent messageEvent = concurrentProcessor .get (100 , TimeUnit .MILLISECONDS );
85+ if (EmptyKit .isNotNull (messageEvent )) {
86+ if (EmptyKit .isNull (messageEvent .getEvent ())) {
87+ continue ;
10888 }
109- break ;
110- case "DDL" : {
111- String ddlStr = message .getFieldList ().get (0 ).getValue ().toString ();
112- if (StringUtils .isNotBlank (ddlStr )) {
113- try {
114- DDLFactory .ddlToTapDDLEvent (
115- ddlParserType ,
116- ddlStr ,
117- DDL_WRAPPER_CONFIG ,
118- tableMap ,
119- tapDDLEvent -> {
120- tapDDLEvent .setTime (System .currentTimeMillis ());
121- tapDDLEvent .setReferenceTime (Long .parseLong (message .getTimestamp ()) * 1000 );
122- tapDDLEvent .setOriginDDL (ddlStr );
123- eventList .get ().add (tapDDLEvent );
124- }, (ddl , wrapper ) -> {
125- boolean unIgnoreTable = true ;
126- if (wrapper instanceof MysqlDDLWrapper ) {
127- String tableName = ((MysqlDDLWrapper ) wrapper ).getTableName (ddl );
128- unIgnoreTable = null == tableList || tableList .contains (tableName );
129- }
130- return unIgnoreTable ;
131- }
132- );
133- } catch (Throwable e ) {
134- TapDDLEvent tapDDLEvent = new TapDDLUnknownEvent ();
135- tapDDLEvent .setTime (System .currentTimeMillis ());
136- tapDDLEvent .setReferenceTime (Long .parseLong (message .getTimestamp ()) * 1000 );
137- tapDDLEvent .setOriginDDL (ddlStr );
138- eventList .get ().add (tapDDLEvent );
89+ if ("HEARTBEAT" .equals (messageEvent .getMessage ().getOpt ().name ())) {
90+ if (heartbeat ++ > 3 ) {
91+ consumer .accept (Collections .singletonList (new HeartbeatEvent ().init ().referenceTime (Long .parseLong (messageEvent .getMessage ().getTimestamp ()) * 1000 )), Long .parseLong (messageEvent .getMessage ().getTimestamp ()));
92+ heartbeat = 0 ;
13993 }
94+ continue ;
95+ }
96+ events .get ().addAll (messageEvent .getEvent ());
97+ lastTimestamp = Long .parseLong (messageEvent .getMessage ().getTimestamp ());
98+ if (events .get ().size () >= recordSize ) {
99+ consumer .accept (events .get (), lastTimestamp );
100+ events .set (new ArrayList <>());
101+ }
102+ } else {
103+ if (events .get ().size () > 0 ) {
104+ consumer .accept (events .get (), lastTimestamp );
105+ events .set (new ArrayList <>());
140106 }
141- break ;
142107 }
143- default :
144- break ;
145- }
146- if (eventList .get ().size () >= recordSize ) {
147- consumer .accept (eventList .get (), Long .valueOf (message .getTimestamp ()));
148- eventList .set (new ArrayList <>());
149108 }
150109 } catch (Exception e ) {
151110 throwable .set (e );
152111 }
153- }
112+ });
113+ t .setName ("OceanBaseReader-Consumer" );
114+ t .start ();
115+ client .addListener (new RecordListener () {
116+ @ Override
117+ public void notify (LogMessage message ) {
118+ try {
119+ concurrentProcessor .runAsync (message , e -> {
120+ try {
121+ return emit (e );
122+ } catch (Exception er ) {
123+ throwable .set (er );
124+ return null ;
125+ }
126+ });
127+ } catch (Exception e ) {
128+ throwable .set (e );
129+ }
130+ }
154131
155- @ Override
156- public void onException (LogProxyClientException e ) {
157- if (e .needStop ()) {
158- client .stop ();
132+ @ Override
133+ public void onException (LogProxyClientException e ) {
134+ if (e .needStop ()) {
135+ client .stop ();
136+ }
159137 }
160- }
161- } );
162- client . start ();
163- consumer . streamReadStarted ();
164- client . join ();
138+ });
139+ client . start ( );
140+ consumer . streamReadStarted ();
141+ client . join ();
142+ }
165143 consumer .streamReadEnded ();
166144 if (EmptyKit .isNotNull (throwable .get ())) {
167145 throw throwable .get ();
@@ -171,6 +149,72 @@ public void onException(LogProxyClientException e) {
171149 }
172150 }
173151
152+ private MessageEvent emit (LogMessage message ) {
153+ String op = message .getOpt ().name ();
154+ if (!tableList .contains (message .getTableName ())) {
155+ switch (op ) {
156+ case "INSERT" :
157+ case "UPDATE" :
158+ case "DELETE" :
159+ return null ;
160+ }
161+ }
162+ List <TapEvent > tapEvents = new ArrayList <>();
163+ Map <String , Object > after = DataMap .create ();
164+ Map <String , Object > before = DataMap .create ();
165+ analyzeMessage (message , after , before );
166+ switch (op ) {
167+ case "INSERT" :
168+ tapEvents .add (new TapInsertRecordEvent ().init ().table (message .getTableName ()).after (after ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
169+ break ;
170+ case "UPDATE" :
171+ tapEvents .add (new TapUpdateRecordEvent ().init ().table (message .getTableName ()).after (after ).before (before ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
172+ break ;
173+ case "DELETE" :
174+ tapEvents .add (new TapDeleteRecordEvent ().init ().table (message .getTableName ()).before (before ).referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
175+ break ;
176+ case "HEARTBEAT" :
177+ tapEvents .add (new HeartbeatEvent ().init ().referenceTime (Long .parseLong (message .getTimestamp ()) * 1000 ));
178+ break ;
179+ case "DDL" : {
180+ String ddlStr = message .getFieldList ().get (0 ).getValue ().toString ();
181+ if (StringUtils .isNotBlank (ddlStr )) {
182+ try {
183+ DDLFactory .ddlToTapDDLEvent (
184+ ddlParserType ,
185+ ddlStr ,
186+ DDL_WRAPPER_CONFIG ,
187+ tableMap ,
188+ tapDDLEvent -> {
189+ tapDDLEvent .setTime (System .currentTimeMillis ());
190+ tapDDLEvent .setReferenceTime (Long .parseLong (message .getTimestamp ()) * 1000 );
191+ tapDDLEvent .setOriginDDL (ddlStr );
192+ tapEvents .add (tapDDLEvent );
193+ }, (ddl , wrapper ) -> {
194+ boolean unIgnoreTable = true ;
195+ if (wrapper instanceof MysqlDDLWrapper ) {
196+ String tableName = ((MysqlDDLWrapper ) wrapper ).getTableName (ddl );
197+ unIgnoreTable = null == tableList || tableList .contains (tableName );
198+ }
199+ return unIgnoreTable ;
200+ }
201+ );
202+ } catch (Throwable e ) {
203+ TapDDLEvent tapDDLEvent = new TapDDLUnknownEvent ();
204+ tapDDLEvent .setTime (System .currentTimeMillis ());
205+ tapDDLEvent .setReferenceTime (Long .parseLong (message .getTimestamp ()) * 1000 );
206+ tapDDLEvent .setOriginDDL (ddlStr );
207+ tapEvents .add (tapDDLEvent );
208+ }
209+ }
210+ break ;
211+ }
212+ default :
213+ return null ;
214+ }
215+ return new MessageEvent (tapEvents , message );
216+ }
217+
174218 private void analyzeMessage (LogMessage message , Map <String , Object > after , Map <String , Object > before ) {
175219 String table = message .getTableName ();
176220 switch (message .getOpt ().name ()) {
@@ -245,4 +289,22 @@ private Object parseField(String table, DataMessage.Record.Field field) {
245289 return field .getValue ().toString ();
246290 }
247291 }
292+
293+ static class MessageEvent {
294+ private final LogMessage message ;
295+ private final List <TapEvent > event ;
296+
297+ public MessageEvent (List <TapEvent > event , LogMessage message ) {
298+ this .message = message ;
299+ this .event = event ;
300+ }
301+
302+ public LogMessage getMessage () {
303+ return message ;
304+ }
305+
306+ public List <TapEvent > getEvent () {
307+ return event ;
308+ }
309+ }
248310}
0 commit comments