Skip to content

Commit 1f6623e

Browse files
authored
[ISSUE apache#5052] Enhancement for source\sink connector (apache#5066)
* [ISSUE apache#5040] Support gtid mode for sync data with mysql * fix conflicts with master * fix checkstyle error * [ISSUE apache#5044] Data synchronization strong verification in mariadb gtid mode * fix checkstyle error * [ISSUE apache#5048] Add report verify request to admin for connector runtime * fix checkstyle error * [ISSUE apache#5052] Enhancement for source\sink connector * fix checkstyle error * fix checkstyle error
1 parent 9de7dc3 commit 1f6623e

File tree

52 files changed

+344
-77
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

52 files changed

+344
-77
lines changed

eventmesh-admin-server/conf/eventmesh.sql

+18-34
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;
2424

2525

26-
-- 导出 eventmesh 的数据库结构
26+
-- export eventmesh database
2727
CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
2828
USE `eventmesh`;
2929

30-
-- 导出 表 eventmesh.event_mesh_data_source 结构
30+
-- export table eventmesh.event_mesh_data_source structure
3131
CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
3232
`id` int unsigned NOT NULL AUTO_INCREMENT,
3333
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
3939
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
4040
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
4141
PRIMARY KEY (`id`) USING BTREE
42-
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
43-
44-
-- 数据导出被取消选择。
42+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
4543

46-
-- 导出 表 eventmesh.event_mesh_job_info 结构
44+
-- export table eventmesh.event_mesh_job_info structure
4745
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
4846
`id` int unsigned NOT NULL AUTO_INCREMENT,
4947
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
@@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
6159
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
6260
PRIMARY KEY (`id`) USING BTREE,
6361
UNIQUE KEY `jobID` (`jobID`)
64-
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
65-
66-
-- 数据导出被取消选择。
62+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
6763

68-
-- 导出 表 eventmesh.event_mesh_mysql_position 结构
64+
-- export table eventmesh.event_mesh_mysql_position structure
6965
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
7066
`id` int unsigned NOT NULL AUTO_INCREMENT,
7167
`jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
8076
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
8177
PRIMARY KEY (`id`),
8278
UNIQUE KEY `jobID` (`jobID`)
83-
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
84-
85-
-- 数据导出被取消选择。
79+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;
8680

87-
-- 导出 表 eventmesh.event_mesh_position_reporter_history 结构
81+
-- export table eventmesh.event_mesh_position_reporter_history structure
8882
CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
8983
`id` bigint NOT NULL AUTO_INCREMENT,
9084
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
@@ -94,57 +88,49 @@ CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
9488
PRIMARY KEY (`id`),
9589
KEY `job` (`job`),
9690
KEY `address` (`address`)
97-
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';
98-
99-
-- 数据导出被取消选择。
91+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='record position reporter changes';
10092

101-
-- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构
93+
-- export table eventmesh.event_mesh_runtime_heartbeat structure
10294
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
10395
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
10496
`adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
10597
`runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
10698
`jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
107-
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间',
99+
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime local report time',
108100
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
109101
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
110102
PRIMARY KEY (`id`),
111103
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
112104
KEY `jobID` (`jobID`)
113-
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
114-
115-
-- 数据导出被取消选择。
105+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
116106

117-
-- 导出 表 eventmesh.event_mesh_runtime_history 结构
107+
-- export table eventmesh.event_mesh_runtime_history structure
118108
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
119109
`id` bigint NOT NULL AUTO_INCREMENT,
120110
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
121111
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
122112
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
123113
PRIMARY KEY (`id`),
124114
KEY `address` (`address`)
125-
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';
126-
127-
-- 数据导出被取消选择。
115+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';
128116

129-
-- 导出 表 eventmesh.event_mesh_task_info 结构
117+
-- export table eventmesh.event_mesh_task_info structure
130118
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
131119
`id` int unsigned NOT NULL AUTO_INCREMENT,
132120
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
133121
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
134122
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
135-
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState',
123+
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
136124
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
137125
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
138126
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
139127
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
140128
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
141129
PRIMARY KEY (`id`) USING BTREE,
142130
UNIQUE KEY `taskID` (`taskID`)
143-
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
144-
145-
-- 数据导出被取消选择。
131+
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
146132

147-
-- 导出 表 eventmesh.event_mesh_verify 结构
133+
-- export table eventmesh.event_mesh_verify structure
148134
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
149135
`id` int NOT NULL,
150136
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
@@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
157143
PRIMARY KEY (`id`)
158144
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
159145

160-
-- 数据导出被取消选择。
161-
162146
/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
163147
/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
164148
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/job/JobInfoBizService.java

-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050

5151
/**
5252
* for table 'event_mesh_job_info' db operation
53-
* 2024-05-09 15:51:45
5453
*/
5554
@Service
5655
@Slf4j

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java

+38-3
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@
3838
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
3939
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
4040
import org.apache.eventmesh.openconnect.api.sink.Sink;
41+
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
42+
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
4143
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
4244

4345
import org.apache.commons.lang.StringUtils;
@@ -146,6 +148,11 @@ public String name() {
146148
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
147149
}
148150

151+
@Override
152+
public void onException(ConnectRecord record) {
153+
154+
}
155+
149156
@Override
150157
public void stop() {
151158
executor.shutdown();
@@ -159,7 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
159166
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData();
160167
canalConnectRecordList = filterRecord(canalConnectRecordList);
161168
if (isDdlDatas(canalConnectRecordList)) {
162-
doDdl(context, canalConnectRecordList);
169+
doDdl(context, canalConnectRecordList, connectRecord);
163170
} else if (sinkConfig.isGTIDMode()) {
164171
doLoadWithGtid(context, sinkConfig, connectRecord);
165172
} else {
@@ -197,7 +204,7 @@ private List<CanalConnectRecord> filterRecord(List<CanalConnectRecord> canalConn
197204
.collect(Collectors.toList());
198205
}
199206

200-
private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList) {
207+
private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList, ConnectRecord connectRecord) {
201208
for (final CanalConnectRecord record : canalConnectRecordList) {
202209
try {
203210
Boolean result = jdbcTemplate.execute(new StatementCallback<Boolean>() {
@@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce
217224
context.getFailedRecords().add(record);
218225
}
219226
} catch (Throwable e) {
227+
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e));
220228
throw new RuntimeException(e);
221229
}
222230
}
231+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
232+
}
233+
234+
private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
235+
SendExceptionContext sendExceptionContext = new SendExceptionContext();
236+
sendExceptionContext.setMessageId(record.getRecordId());
237+
sendExceptionContext.setCause(e);
238+
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
239+
sendExceptionContext.setTopic(record.getExtension("topic"));
240+
}
241+
return sendExceptionContext;
242+
}
243+
244+
private SendResult convertToSendResult(ConnectRecord record) {
245+
SendResult result = new SendResult();
246+
result.setMessageId(record.getRecordId());
247+
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
248+
result.setTopic(record.getExtension("topic"));
249+
}
250+
return result;
223251
}
224252

225253
private void doBefore(List<CanalConnectRecord> canalConnectRecordList, final DbLoadData loadData) {
@@ -291,21 +319,26 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
291319
Exception ex = null;
292320
try {
293321
ex = result.get();
322+
if (ex == null) {
323+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
324+
}
294325
} catch (Exception e) {
295326
ex = e;
296327
}
297328
Boolean skipException = sinkConfig.getSkipException();
298329
if (skipException != null && skipException) {
299330
if (ex != null) {
300331
// do skip
301-
log.warn("skip exception for data : {} , caused by {}",
332+
log.warn("skip exception will ack data : {} , caused by {}",
302333
filteredRows,
303334
ExceptionUtils.getFullStackTrace(ex));
304335
GtidBatchManager.removeGtidBatch(gtid);
336+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
305337
}
306338
} else {
307339
if (ex != null) {
308340
log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
341+
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex));
309342
gtidSingleExecutor.shutdown();
310343
System.exit(1);
311344
} else {
@@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
314347
}
315348
} else {
316349
log.info("Batch received, waiting for other batches.");
350+
// ack this record
351+
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
317352
}
318353
}
319354

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,11 @@ public String name() {
109109
return null;
110110
}
111111

112+
@Override
113+
public void onException(ConnectRecord record) {
114+
115+
}
116+
112117
@Override
113118
public void put(List<ConnectRecord> sinkRecords) {
114119
if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) {

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,11 @@ public String name() {
267267
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
268268
}
269269

270+
@Override
271+
public void onException(ConnectRecord record) {
272+
273+
}
274+
270275
@Override
271276
public void stop() {
272277
if (!running) {

eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,11 @@ public String name() {
159159
return this.config.getConnectorConfig().getConnectorName();
160160
}
161161

162+
@Override
163+
public void onException(ConnectRecord record) {
164+
165+
}
166+
162167
@Override
163168
public List<ConnectRecord> poll() {
164169
while (flag.get()) {

eventmesh-connectors/eventmesh-connector-chatgpt/src/main/java/org/apache/eventmesh/connector/chatgpt/source/connector/ChatGPTSourceConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,11 @@ public String name() {
224224
return this.sourceConfig.getConnectorConfig().getConnectorName();
225225
}
226226

227+
@Override
228+
public void onException(ConnectRecord record) {
229+
230+
}
231+
227232
@Override
228233
public void stop() {
229234
Throwable t = this.server.close().cause();

eventmesh-connectors/eventmesh-connector-dingtalk/src/main/java/org/apache/eventmesh/connector/dingtalk/sink/connector/DingDingSinkConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public String name() {
103103
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
104104
}
105105

106+
@Override
107+
public void onException(ConnectRecord record) {
108+
109+
}
110+
106111
@Override
107112
public void stop() {
108113
isRunning = false;

eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/sink/connector/FileSinkConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ public String name() {
103103
return this.sinkConfig.getConnectorConfig().getConnectorName();
104104
}
105105

106+
@Override
107+
public void onException(ConnectRecord record) {
108+
109+
}
110+
106111
@Override
107112
public void stop() {
108113
outputStream.flush();

eventmesh-connectors/eventmesh-connector-file/src/main/java/org/apache/eventmesh/connector/file/source/connector/FileSourceConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,11 @@ public String name() {
8686
return this.sourceConfig.getConnectorConfig().getConnectorName();
8787
}
8888

89+
@Override
90+
public void onException(ConnectRecord record) {
91+
92+
}
93+
8994
@Override
9095
public void stop() {
9196
try {

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/sink/HttpSinkConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ public String name() {
107107
return this.httpSinkConfig.connectorConfig.getConnectorName();
108108
}
109109

110+
@Override
111+
public void onException(ConnectRecord record) {
112+
113+
}
114+
110115
@Override
111116
public void stop() throws Exception {
112117
this.sinkHandler.stop();

eventmesh-connectors/eventmesh-connector-http/src/main/java/org/apache/eventmesh/connector/http/source/HttpSourceConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,11 @@ public String name() {
144144
return this.sourceConfig.getConnectorConfig().getConnectorName();
145145
}
146146

147+
@Override
148+
public void onException(ConnectRecord record) {
149+
150+
}
151+
147152
@Override
148153
public void stop() {
149154
if (this.server != null) {

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/sink/JdbcSinkConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,11 @@ public String name() {
139139
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
140140
}
141141

142+
@Override
143+
public void onException(ConnectRecord record) {
144+
145+
}
146+
142147
/**
143148
* Stops the Connector.
144149
*

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/source/JdbcSourceConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,11 @@ public String name() {
192192
return "JDBC Source Connector";
193193
}
194194

195+
@Override
196+
public void onException(ConnectRecord record) {
197+
198+
}
199+
195200
/**
196201
* Stops the Connector.
197202
*

eventmesh-connectors/eventmesh-connector-kafka/src/main/java/org/apache/eventmesh/connector/kafka/sink/connector/KafkaSinkConnector.java

+5
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public String name() {
9494
return this.sinkConfig.getConnectorConfig().getConnectorName();
9595
}
9696

97+
@Override
98+
public void onException(ConnectRecord record) {
99+
100+
}
101+
97102
@Override
98103
public void stop() {
99104
producer.close();

0 commit comments

Comments
 (0)