Skip to content

Commit 0d30948

Browse files
xwm1992sodaRyCNPil0tXiacnzakiikarsonto
authored
Merge EventMesh Function branch to master (apache#5019)
* EventMesh function admin (apache#4851) * own * dependency * finish registry * EventMesh function admin (apache#4853) * own * dependency * finish registry * init * Eventmesh function admin (apache#4854) * own * dependency * finish registry * init * 0419 * 0419 * more discovery and move gRPC * fix dependency * EventMesh function connector runtime (apache#4858) * [ISSUE apache#4812] Set up Admin Endpoints v2 (apache#4813) * Remove redundant overloaded methods * Simplify write() result param * Add writeJson(); Add PUT; Add JavaDoc * Rename EventHttpHandler to EventMeshHttpHandler * Correct server thread name * Clean up messy & non-hierarchical overloading * No need to set headers manually any more * Set up v1&v2 endpoints * Set up v1&v2 response dto * Introduce fastjson2 * Fix fastjson2 "level too large : 2048" error caused by IPAddress * Correct @ConfigField naming * Return properties format json key * Add format option to query string * Introduce Result * Reduce duplicate builder code * Fix all checkstyle warnings in eventmesh-runtime * Add known dependency * [ISSUE apache#4814] Migrate from fastjson 1.2.83 to fastjson2 (apache#4819) * [Enhancement] Migrate from fastjson 1.2.83 to fastjson2 apache#4814 * fix_dependencies_problem * fix_check * [ISSUE apache#4551] modify the logic of time-consumption statistics (apache#4822) * init connector runtime v2 * [ISSUE apache#4804] Fix SubStreamHandler exception loop by closeOnError (apache#4807) * Handle exception loop by closeOnError * Lombok optimization * some format optimization * Avoid closing multiple times * Remove redundant set null * Revert "Avoid closing multiple times" This reverts commit 767bc59. * Use synchronized latch to keep senderOnComplete called once * Use boolean to prevent latch called by somebody else * Remove the unique callee/caller close() of onCompleted() * [ISSUE apache#4838] Deprecate unused `eventMesh.connector.plugin.type` etc. properties (apache#4839) * Remove all references of `eventMesh.connector.plugin.type` * Deprecate `eventMesh.connector.plugin.type` and sort properties * Remove misconfigured & not-used `registerIntervalInMills`, `fetchRegistryAddrIntervalInMills` * Remove 'defibus' related un-used usages * Supplement apache#4809 for `null != object` * [ISSUE apache#4832] Downgrade stale bot to v8 to resolve state cache reserving error (apache#4833) * Revert stale bot to v8 to resolve state cache reserving error * Reduce operations-per-run to default value to ease pressure * Unify yaml to yml * [ISSUE apache#4820] Bug fix EventHandler not return json (apache#4821) * bug fix * bug fix * bug fix * update runtime v2 * update connector runtime * update connector runtime * update connector runtime * update connector runtime * update connector runtime --------- Co-authored-by: Pil0tXia <[email protected]> Co-authored-by: Zaki <[email protected]> Co-authored-by: Karson <[email protected]> * [ISSUE apache#4931]Add Registry Module for Discovery AdminServer * [ISSUES apache#4933]Add Admin Module * [ISSUE apache#4935] Add and Move the Pojo Used By Both Runtime and Admin to Common * [ISSUE apache#4937]fix gradle dependecy and add runtime v2 * [ISSUES apache#4939]add canal connector * fix missing apache header * fix missing apache header * fix missing apache header * update gradle dependencies * fix admin server ci check error * fix admin server ci check error * fix ci checkStyle error * fix ci check error * [ISSUE apache#4979]Canal Connector supports bidirectional data synchronization * add bash files for admin & runtime-v2 * fix ack offset read & persist * fix checkStyle error * [ISSUE apache#4979] Canal Connector supports bidirectional data synchronization (apache#5011) * [ISSUE apache#4979]Canal Connector supports bidirectional data synchronization * add bash files for admin & runtime-v2 * fix ack offset read & persist * fix checkStyle error * fix http source connector test error --------- Co-authored-by: sodaRyCN <[email protected]> Co-authored-by: Pil0tXia <[email protected]> Co-authored-by: Zaki <[email protected]> Co-authored-by: Karson <[email protected]>
1 parent d76ee25 commit 0d30948

File tree

24 files changed

+701
-149
lines changed

24 files changed

+701
-149
lines changed

build.gradle

+6
Original file line numberDiff line numberDiff line change
@@ -163,9 +163,13 @@ tasks.register('dist') {
163163
["eventmesh-common",
164164
"eventmesh-meta:eventmesh-meta-api",
165165
"eventmesh-metrics-plugin:eventmesh-metrics-api",
166+
"eventmesh-openconnect:eventmesh-openconnect-java",
167+
"eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api",
166168
"eventmesh-protocol-plugin:eventmesh-protocol-api",
169+
"eventmesh-registry:eventmesh-registry-api",
167170
"eventmesh-retry:eventmesh-retry-api",
168171
"eventmesh-runtime",
172+
"eventmesh-runtime-v2",
169173
"eventmesh-security-plugin:eventmesh-security-api",
170174
"eventmesh-spi",
171175
"eventmesh-starter",
@@ -750,8 +754,10 @@ subprojects {
750754
dependency "software.amazon.awssdk:s3:2.26.3"
751755
dependency "com.github.rholder:guava-retrying:2.0.0"
752756

757+
dependency "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.1"
753758
dependency "com.alibaba:druid-spring-boot-starter:1.2.23"
754759
dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18"
760+
dependency "com.baomidou:mybatis-plus:3.5.6"
755761
dependency "com.mysql:mysql-connector-j:8.4.0"
756762
}
757763
}
+201
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
#!/bin/bash
2+
#
3+
# Licensed to Apache Software Foundation (ASF) under one or more contributor
4+
# license agreements. See the NOTICE file distributed with
5+
# this work for additional information regarding copyright
6+
# ownership. Apache Software Foundation (ASF) licenses this file to you under
7+
# the Apache License, Version 2.0 (the "License"); you may
8+
# not use this file except in compliance with the License.
9+
# You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
20+
#===========================================================================================
21+
# Java Environment Setting
22+
#===========================================================================================
23+
set -e
24+
# Server configuration may be inconsistent, add these configurations to avoid garbled code problems
25+
export LANG=en_US.UTF-8
26+
export LC_CTYPE=en_US.UTF-8
27+
export LC_ALL=en_US.UTF-8
28+
29+
TMP_JAVA_HOME="/customize/your/java/home/here"
30+
31+
# Detect operating system.
32+
OS=$(uname)
33+
34+
function is_java8_or_11 {
35+
local _java="$1"
36+
[[ -x "$_java" ]] || return 1
37+
[[ "$("$_java" -version 2>&1)" =~ 'java version "1.8' || "$("$_java" -version 2>&1)" =~ 'openjdk version "1.8' || "$("$_java" -version 2>&1)" =~ 'java version "11' || "$("$_java" -version 2>&1)" =~ 'openjdk version "11' ]] || return 2
38+
return 0
39+
}
40+
41+
function extract_java_version {
42+
local _java="$1"
43+
local version=$("$_java" -version 2>&1 | awk -F '"' '/version/ {print $2}' | awk -F '.' '{if ($1 == 1 && $2 == 8) print "8"; else if ($1 == 11) print "11"; else print "unknown"}')
44+
echo "$version"
45+
}
46+
47+
# 0(not running), 1(is running)
48+
#function is_proxyRunning {
49+
# local _pid="$1"
50+
# local pid=`ps ax | grep -i 'org.apache.eventmesh.runtime.boot.EventMeshStartup' |grep java | grep -v grep | awk '{print $1}'|grep $_pid`
51+
# if [ -z "$pid" ] ; then
52+
# return 0
53+
# else
54+
# return 1
55+
# fi
56+
#}
57+
58+
function get_pid {
59+
local ppid=""
60+
if [ -f ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file ]; then
61+
ppid=$(cat ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file)
62+
# If the process does not exist, it indicates that the previous process terminated abnormally.
63+
if [ ! -d /proc/$ppid ]; then
64+
# Remove the residual file.
65+
rm ${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
66+
echo -e "ERROR\t EventMesh process had already terminated unexpectedly before, please check log output."
67+
ppid=""
68+
fi
69+
else
70+
if [[ $OS =~ Msys ]]; then
71+
# There is a Bug on Msys that may not be able to kill the identified process
72+
ppid=`jps -v | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep java | grep -v grep | awk -F ' ' {'print $1'}`
73+
elif [[ $OS =~ Darwin ]]; then
74+
# Known problem: grep Java may not be able to accurately identify Java processes
75+
ppid=$(/bin/ps -o user,pid,command | grep "java" | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" |awk -F ' ' {'print $2'})
76+
else
77+
if [ $DOCKER ]; then
78+
# No need to exclude root user in Docker containers.
79+
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | awk -F ' ' {'print $2'})
80+
else
81+
# It is required to identify the process as accurately as possible on Linux.
82+
ppid=$(ps -C java -o user,pid,command --cols 99999 | grep -w $EVENTMESH_ADMIN_HOME | grep -i "org.apache.eventmesh.admin.server.ExampleAdminServer" | grep -Ev "^root" | awk -F ' ' {'print $2'})
83+
fi
84+
fi
85+
fi
86+
echo "$ppid";
87+
}
88+
89+
#===========================================================================================
90+
# Locate Java Executable
91+
#===========================================================================================
92+
93+
if [[ -d "$TMP_JAVA_HOME" ]] && is_java8_or_11 "$TMP_JAVA_HOME/bin/java"; then
94+
JAVA="$TMP_JAVA_HOME/bin/java"
95+
JAVA_VERSION=$(extract_java_version "$TMP_JAVA_HOME/bin/java")
96+
elif [[ -d "$JAVA_HOME" ]] && is_java8_or_11 "$JAVA_HOME/bin/java"; then
97+
JAVA="$JAVA_HOME/bin/java"
98+
JAVA_VERSION=$(extract_java_version "$JAVA_HOME/bin/java")
99+
elif is_java8_or_11 "$(which java)"; then
100+
JAVA="$(which java)"
101+
JAVA_VERSION=$(extract_java_version "$(which java)")
102+
else
103+
echo -e "ERROR\t Java 8 or 11 not found, operation abort."
104+
exit 9;
105+
fi
106+
107+
echo "EventMesh using Java version: $JAVA_VERSION, path: $JAVA"
108+
109+
EVENTMESH_ADMIN_HOME=$(cd "$(dirname "$0")/.." && pwd)
110+
export EVENTMESH_ADMIN_HOME
111+
112+
EVENTMESH_ADMIN_LOG_HOME="${EVENTMESH_ADMIN_HOME}/logs"
113+
export EVENTMESH_ADMIN_LOG_HOME
114+
115+
echo -e "EVENTMESH_ADMIN_HOME : ${EVENTMESH_ADMIN_HOME}\nEVENTMESH_ADMIN_LOG_HOME : ${EVENTMESH_ADMIN_LOG_HOME}"
116+
117+
function make_logs_dir {
118+
if [ ! -e "${EVENTMESH_ADMIN_LOG_HOME}" ]; then mkdir -p "${EVENTMESH_ADMIN_LOG_HOME}"; fi
119+
}
120+
121+
error_exit ()
122+
{
123+
echo -e "ERROR\t $1 !!"
124+
exit 1
125+
}
126+
127+
export JAVA_HOME
128+
129+
#===========================================================================================
130+
# JVM Configuration
131+
#===========================================================================================
132+
#if [ $1 = "prd" -o $1 = "benchmark" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
133+
#elif [ $1 = "sit" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms256M -Xmx512M -Xmn256m -XX:SurvivorRatio=4"
134+
#elif [ $1 = "dev" ]; then JAVA_OPT="${JAVA_OPT} -server -Xms128M -Xmx256M -Xmn128m -XX:SurvivorRatio=4"
135+
#fi
136+
137+
GC_LOG_FILE="${EVENTMESH_ADMIN_LOG_HOME}/eventmesh_admin_gc_%p.log"
138+
139+
#JAVA_OPT="${JAVA_OPT} -server -Xms2048M -Xmx4096M -Xmn2048m -XX:SurvivorRatio=4"
140+
JAVA_OPT=`cat ${EVENTMESH_ADMIN_HOME}/conf/server.env | grep APP_START_JVM_OPTION::: | awk -F ':::' {'print $2'}`
141+
JAVA_OPT="${JAVA_OPT} -XX:+UseG1GC -XX:G1HeapRegionSize=16m -XX:G1ReservePercent=25 -XX:InitiatingHeapOccupancyPercent=30 -XX:SoftRefLRUPolicyMSPerMB=0 -XX:SurvivorRatio=8 -XX:MaxGCPauseMillis=50"
142+
JAVA_OPT="${JAVA_OPT} -verbose:gc"
143+
if [[ "$JAVA_VERSION" == "8" ]]; then
144+
# Set JAVA_OPT for Java 8
145+
JAVA_OPT="${JAVA_OPT} -Xloggc:${GC_LOG_FILE} -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=5 -XX:GCLogFileSize=30m"
146+
JAVA_OPT="${JAVA_OPT} -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCApplicationStoppedTime -XX:+PrintAdaptiveSizePolicy"
147+
elif [[ "$JAVA_VERSION" == "11" ]]; then
148+
# Set JAVA_OPT for Java 11
149+
XLOG_PARAM="time,level,tags:filecount=5,filesize=30m"
150+
JAVA_OPT="${JAVA_OPT} -Xlog:gc*:${GC_LOG_FILE}:${XLOG_PARAM}"
151+
JAVA_OPT="${JAVA_OPT} -Xlog:safepoint:${GC_LOG_FILE}:${XLOG_PARAM} -Xlog:ergo*=debug:${GC_LOG_FILE}:${XLOG_PARAM}"
152+
fi
153+
JAVA_OPT="${JAVA_OPT} -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${EVENTMESH_ADMIN_LOG_HOME} -XX:ErrorFile=${EVENTMESH_ADMIN_LOG_HOME}/hs_err_%p.log"
154+
JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow"
155+
JAVA_OPT="${JAVA_OPT} -XX:+AlwaysPreTouch"
156+
JAVA_OPT="${JAVA_OPT} -XX:MaxDirectMemorySize=8G"
157+
JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages -XX:-UseBiasedLocking"
158+
JAVA_OPT="${JAVA_OPT} -Dio.netty.leakDetectionLevel=advanced"
159+
JAVA_OPT="${JAVA_OPT} -Dio.netty.allocator.type=pooled"
160+
JAVA_OPT="${JAVA_OPT} -Djava.security.egd=file:/dev/./urandom"
161+
JAVA_OPT="${JAVA_OPT} -Dlog4j.configurationFile=${EVENTMESH_ADMIN_HOME}/conf/log4j2.xml"
162+
JAVA_OPT="${JAVA_OPT} -Deventmesh.log.home=${EVENTMESH_ADMIN_LOG_HOME}"
163+
JAVA_OPT="${JAVA_OPT} -DconfPath=${EVENTMESH_ADMIN_HOME}/conf"
164+
JAVA_OPT="${JAVA_OPT} -DconfigurationPath=${EVENTMESH_ADMIN_HOME}/conf"
165+
JAVA_OPT="${JAVA_OPT} -Dlog4j2.AsyncQueueFullPolicy=Discard"
166+
JAVA_OPT="${JAVA_OPT} -Drocketmq.client.logUseSlf4j=true"
167+
JAVA_OPT="${JAVA_OPT} -DeventMeshPluginDir=${EVENTMESH_ADMIN_HOME}/plugin"
168+
169+
#if [ -f "pid.file" ]; then
170+
# pid=`cat pid.file`
171+
# if ! is_proxyRunning "$pid"; then
172+
# echo "proxy is running already"
173+
# exit 9;
174+
# else
175+
# echo "err pid$pid, rm pid.file"
176+
# rm pid.file
177+
# fi
178+
#fi
179+
180+
pid=$(get_pid)
181+
if [[ $pid == "ERROR"* ]]; then
182+
echo -e "${pid}"
183+
exit 9
184+
fi
185+
if [ -n "$pid" ]; then
186+
echo -e "ERROR\t The server is already running (pid=$pid), there is no need to execute start.sh again."
187+
exit 9
188+
fi
189+
190+
make_logs_dir
191+
192+
echo "Using Java version: $JAVA_VERSION, path: $JAVA" >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
193+
194+
EVENTMESH_ADMIN_MAIN=org.apache.eventmesh.admin.server.ExampleAdminServer
195+
if [ $DOCKER ]; then
196+
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out
197+
else
198+
$JAVA $JAVA_OPT -classpath ${EVENTMESH_ADMIN_HOME}/conf:${EVENTMESH_ADMIN_HOME}/apps/*:${EVENTMESH_ADMIN_HOME}/lib/* $EVENTMESH_ADMIN_MAIN >> ${EVENTMESH_ADMIN_LOG_HOME}/eventmesh-admin.out 2>&1 &
199+
echo $!>${EVENTMESH_ADMIN_HOME}/bin/pid-admin.file
200+
fi
201+
exit 0

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/db/entity/EventMeshJobDetail.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.eventmesh.common.remote.job.JobTransportType;
2222
import org.apache.eventmesh.common.remote.offset.RecordPosition;
2323

24+
import java.util.List;
2425
import java.util.Map;
2526

2627
import lombok.Data;
@@ -42,7 +43,7 @@ public class EventMeshJobDetail {
4243

4344
private String sinkConnectorDesc;
4445

45-
private RecordPosition position;
46+
private List<RecordPosition> position;
4647

4748
private JobState state;
4849
}

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/EventMeshPositionBizService.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
2626
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
2727

28+
import java.util.List;
29+
2830
import org.springframework.beans.factory.annotation.Autowired;
2931
import org.springframework.stereotype.Service;
3032

@@ -38,7 +40,7 @@ public class EventMeshPositionBizService {
3840
PositionHandlerFactory factory;
3941

4042
// called isValidateReportRequest before call this
41-
public RecordPosition getPosition(FetchPositionRequest request, Metadata metadata) {
43+
public List<RecordPosition> getPosition(FetchPositionRequest request, Metadata metadata) {
4244
if (request == null) {
4345
return null;
4446
}
@@ -68,7 +70,7 @@ public boolean reportPosition(ReportPositionRequest request, Metadata metadata)
6870
return handler.handler(request, metadata);
6971
}
7072

71-
public RecordPosition getPositionByJobID(Integer jobID, DataSourceType type) {
73+
public List<RecordPosition> getPositionByJobID(Integer jobID, DataSourceType type) {
7274
if (jobID == null || type == null) {
7375
return null;
7476
}

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/IFetchPositionHandler.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,12 @@
2121
import org.apache.eventmesh.common.remote.offset.RecordPosition;
2222
import org.apache.eventmesh.common.remote.request.FetchPositionRequest;
2323

24+
import java.util.List;
25+
2426
/**
2527
* IFetchPositionHandler
2628
*/
2729
public interface IFetchPositionHandler {
2830

29-
RecordPosition handler(FetchPositionRequest request, Metadata metadata);
31+
List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata);
3032
}

eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/service/position/impl/MysqlPositionHandler.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.eventmesh.common.remote.request.ReportPositionRequest;
3232
import org.apache.eventmesh.common.utils.JsonUtils;
3333

34+
import java.util.ArrayList;
3435
import java.util.List;
3536

3637
import org.springframework.beans.factory.annotation.Autowired;
@@ -142,20 +143,21 @@ public boolean handler(ReportPositionRequest request, Metadata metadata) {
142143
}
143144

144145
@Override
145-
public RecordPosition handler(FetchPositionRequest request, Metadata metadata) {
146-
EventMeshMysqlPosition position = positionService.getOne(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
146+
public List<RecordPosition> handler(FetchPositionRequest request, Metadata metadata) {
147+
List<EventMeshMysqlPosition> positionList = positionService.list(Wrappers.<EventMeshMysqlPosition>query().eq("jobID",
147148
request.getJobID()));
148-
RecordPosition recordPosition = null;
149-
if (position != null) {
149+
List<RecordPosition> recordPositionList = new ArrayList<>();
150+
for (EventMeshMysqlPosition position : positionList) {
151+
RecordPosition recordPosition = new RecordPosition();
150152
CanalRecordPartition partition = new CanalRecordPartition();
151153
partition.setTimeStamp(position.getTimestamp());
152154
partition.setJournalName(position.getJournalName());
155+
recordPosition.setRecordPartition(partition);
153156
CanalRecordOffset offset = new CanalRecordOffset();
154157
offset.setOffset(position.getPosition());
155-
recordPosition = new RecordPosition();
156-
recordPosition.setRecordPartition(partition);
157158
recordPosition.setRecordOffset(offset);
159+
recordPositionList.add(recordPosition);
158160
}
159-
return recordPosition;
161+
return recordPositionList;
160162
}
161163
}

eventmesh-admin-server/src/main/resources/application.yaml

+2-2
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@
1818
spring:
1919
datasource:
2020
url: jdbc:mysql://localhost:3306/eventmesh?serverTimezone=GMT%2B8&characterEncoding=utf-8&useSSL=false&allowPublicKeyRetrieval=true
21-
username: root
22-
password: mike920830
21+
username: //db_username
22+
password: //db_password
2323
driver-class-name: com.mysql.cj.jdbc.Driver
2424
mybatis-plus:
2525
mapper-locations: classpath:mapper/*.xml

eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkConfig.java

+10-5
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,20 @@
2727
@EqualsAndHashCode(callSuper = true)
2828
public class CanalSinkConfig extends SinkConfig {
2929

30-
private Integer batchSize = 50; // batchSize
30+
// batchSize
31+
private Integer batchSize = 50;
3132

32-
private Boolean useBatch = true; // enable batch
33+
// enable batch
34+
private Boolean useBatch = true;
3335

34-
private Integer poolSize = 5; // sink thread size for single channel
36+
// sink thread size for single channel
37+
private Integer poolSize = 5;
3538

36-
private SyncMode syncMode; // sync mode: field/row
39+
// sync mode: field/row
40+
private SyncMode syncMode;
3741

38-
private Boolean skipException = false; // skip sink process exception
42+
// skip sink process exception
43+
private Boolean skipException = false;
3944

4045
public SinkConnectorConfig sinkConnectorConfig;
4146

0 commit comments

Comments
 (0)