From 0374ca32cfe9ab1394816afe007fbde310368f07 Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Wed, 19 Jun 2024 19:22:42 +0800
Subject: [PATCH 01/15] use conf instead of resource
---
build.gradle | 44 ++++++-
eventmesh-admin-server/build.gradle | 23 ++--
.../main/resources => conf}/application.yaml | 0
.../eventmesh-admin.properties | 0
.../main/resources => conf}/eventmesh.sql | 0
eventmesh-admin-server/conf/log4j2.xml | 108 ++++++++++++++++++
.../mapper/EventMeshDataSourceMapper.xml | 0
.../mapper/EventMeshJobInfoMapper.xml | 0
.../mapper/EventMeshMysqlPositionMapper.xml | 0
...EventMeshPositionReporterHistoryMapper.xml | 0
.../EventMeshRuntimeHeartbeatMapper.xml | 0
.../mapper/EventMeshRuntimeHistoryMapper.xml | 0
.../admin/server/ExampleAdminServer.java | 2 +-
13 files changed, 167 insertions(+), 10 deletions(-)
rename eventmesh-admin-server/{src/main/resources => conf}/application.yaml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/eventmesh-admin.properties (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/eventmesh.sql (100%)
create mode 100644 eventmesh-admin-server/conf/log4j2.xml
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshDataSourceMapper.xml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshJobInfoMapper.xml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshMysqlPositionMapper.xml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshPositionReporterHistoryMapper.xml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshRuntimeHeartbeatMapper.xml (100%)
rename eventmesh-admin-server/{src/main/resources => conf}/mapper/EventMeshRuntimeHistoryMapper.xml (100%)
diff --git a/build.gradle b/build.gradle
index 63e9301a43..0408231fd7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -210,6 +210,48 @@ tasks.register('dist') {
}
}
+tasks.register('dist-admin') {
+ subprojects.forEach { subProject ->
+ dependsOn("${subProject.path}:jar")
+ }
+ def includedProjects =
+ [
+ "eventmesh-admin-server",
+ "eventmesh-common",
+ "eventmesh-spi",
+ "eventmesh-registry:eventmesh-registry-api",
+ "eventmesh-registry:eventmesh-registry-nacos",
+ "eventmesh-openconnect:eventmesh-openconnect-offsetmgmt-plugin:eventmesh-openconnect-offsetmgmt-api"
+ ]
+ doLast {
+ includedProjects.each {
+ def subProject = findProject(it)
+ copy {
+ from subProject.jar.archivePath
+ into rootProject.file('dist/apps')
+ }
+ copy {
+ from subProject.configurations.runtimeClasspath
+ into rootProject.file('dist/lib')
+ exclude 'eventmesh-*'
+ }
+ copy {
+ from subProject.file('bin')
+ into rootProject.file('dist/bin')
+ }
+ copy {
+ from subProject.file('conf')
+ from subProject.sourceSets.main.resources.srcDirs
+ into rootProject.file('dist/conf')
+ duplicatesStrategy = DuplicatesStrategy.EXCLUDE
+ exclude 'META-INF'
+ }
+
+ }
+ }
+
+}
+
tasks.register('installPlugin') {
var pluginProjects = subprojects.findAll {
it.file('gradle.properties').exists()
@@ -754,8 +796,8 @@ subprojects {
dependency "software.amazon.awssdk:s3:2.26.3"
dependency "com.github.rholder:guava-retrying:2.0.0"
- dependency "org.mybatis.spring.boot:mybatis-spring-boot-starter:2.3.2"
dependency "com.alibaba:druid-spring-boot-starter:1.2.23"
+ dependency "com.baomidou:mybatis-plus-boot-starter:3.5.5"
dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18"
dependency "com.baomidou:mybatis-plus:3.5.7"
dependency "com.mysql:mysql-connector-j:8.4.0"
diff --git a/eventmesh-admin-server/build.gradle b/eventmesh-admin-server/build.gradle
index 6f91f48001..6de881725a 100644
--- a/eventmesh-admin-server/build.gradle
+++ b/eventmesh-admin-server/build.gradle
@@ -31,18 +31,25 @@ dependencies {
implementation "io.grpc:grpc-stub"
implementation "io.grpc:grpc-netty-shaded"
- // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter
- implementation 'com.baomidou:mybatis-plus-boot-starter:3.5.7'
- implementation "org.reflections:reflections:0.10.2"
+ // https://mvnrepository.com/artifact/com.baomidou/mybatis-plus-boot-starter
+ implementation "com.baomidou:mybatis-plus-boot-starter"
- // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter
- implementation "com.alibaba:druid-spring-boot-starter"
- compileOnly 'com.mysql:mysql-connector-j'
- compileOnly 'org.projectlombok:lombok'
- annotationProcessor 'org.projectlombok:lombok'
+ // https://mvnrepository.com/artifact/com.alibaba/druid-spring-boot-starter
+ implementation "com.alibaba:druid-spring-boot-starter"
+ implementation 'com.mysql:mysql-connector-j'
+ compileOnly 'org.projectlombok:lombok'
+ annotationProcessor 'org.projectlombok:lombok'
}
configurations.implementation {
exclude group: "org.springframework.boot", module: "spring-boot-starter-logging"
}
+sourceSets {
+ main {
+ resources {
+ srcDirs = ['src/main/resources', 'conf']
+ }
+ }
+}
+
diff --git a/eventmesh-admin-server/src/main/resources/application.yaml b/eventmesh-admin-server/conf/application.yaml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/application.yaml
rename to eventmesh-admin-server/conf/application.yaml
diff --git a/eventmesh-admin-server/src/main/resources/eventmesh-admin.properties b/eventmesh-admin-server/conf/eventmesh-admin.properties
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/eventmesh-admin.properties
rename to eventmesh-admin-server/conf/eventmesh-admin.properties
diff --git a/eventmesh-admin-server/src/main/resources/eventmesh.sql b/eventmesh-admin-server/conf/eventmesh.sql
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/eventmesh.sql
rename to eventmesh-admin-server/conf/eventmesh.sql
diff --git a/eventmesh-admin-server/conf/log4j2.xml b/eventmesh-admin-server/conf/log4j2.xml
new file mode 100644
index 0000000000..6341a0e629
--- /dev/null
+++ b/eventmesh-admin-server/conf/log4j2.xml
@@ -0,0 +1,108 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshDataSourceMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshDataSourceMapper.xml
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshJobInfoMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshJobInfoMapper.xml
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshMysqlPositionMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshMysqlPositionMapper.xml
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshPositionReporterHistoryMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshPositionReporterHistoryMapper.xml
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHeartbeatMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshRuntimeHeartbeatMapper.xml
diff --git a/eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml b/eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml
similarity index 100%
rename from eventmesh-admin-server/src/main/resources/mapper/EventMeshRuntimeHistoryMapper.xml
rename to eventmesh-admin-server/conf/mapper/EventMeshRuntimeHistoryMapper.xml
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java
index c6a6e16504..7f5fa22dda 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ExampleAdminServer.java
@@ -23,7 +23,7 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
-@SpringBootApplication
+@SpringBootApplication()
public class ExampleAdminServer {
public static void main(String[] args) throws Exception {
From f040cf7e6385109589af40ee0f084ed8b217154a Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Sun, 30 Jun 2024 21:00:58 +0800
Subject: [PATCH 02/15] prepare for full and check
---
.../apache/eventmesh/admin/server/Admin.java | 1 +
.../eventmesh/admin/server/AdminServer.java | 11 +++----
.../admin/server/web/BaseServer.java | 6 ++--
.../admin/server/web/GrpcServer.java | 13 ++++----
.../eventmesh/common/AbstractComponent.java | 26 ++++++++++++++++
.../eventmesh/common}/ComponentLifeCycle.java | 6 ++--
.../connector/rdb/canal/RdbDBDefinition.java | 14 +++++++++
.../rdb/canal/RdbTableDefinition.java | 11 +++++++
.../connector/rdb/canal/RdbTableMgr.java | 30 +++++++++++++++++++
.../rdb/canal/SourceConnectorConfig.java | 6 ++--
.../rdb/canal/mysql/MysqlTableDef.java | 17 +++++++++++
.../openconnect/api/connector/Connector.java | 17 ++---------
12 files changed, 119 insertions(+), 39 deletions(-)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
rename {eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server => eventmesh-common/src/main/java/org/apache/eventmesh/common}/ComponentLifeCycle.java (89%)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java
index 71c6d67be2..9ee25fadb2 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/Admin.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.admin.server;
+import org.apache.eventmesh.common.ComponentLifeCycle;
import org.apache.eventmesh.common.remote.Task;
import org.apache.eventmesh.common.remote.request.ReportHeartBeatRequest;
import org.apache.eventmesh.common.utils.PagedList;
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java
index 98247d19b6..ae2736aaaa 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/AdminServer.java
@@ -17,6 +17,8 @@
package org.apache.eventmesh.admin.server;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.common.config.CommonConfiguration;
import org.apache.eventmesh.common.config.ConfigService;
@@ -28,16 +30,11 @@
import org.apache.eventmesh.registry.RegisterServerInfo;
import org.apache.eventmesh.registry.RegistryFactory;
import org.apache.eventmesh.registry.RegistryService;
-
-import org.apache.commons.lang3.StringUtils;
-
-import javax.annotation.PostConstruct;
-
import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Service;
-import lombok.extern.slf4j.Slf4j;
+import javax.annotation.PostConstruct;
@Service
@Slf4j
@@ -102,7 +99,7 @@ public void start() {
}
@Override
- public void destroy() {
+ public void stop() {
if (configuration.isEventMeshRegistryPluginEnabled()) {
registryService.unRegister(adminServeInfo);
try {
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java
index 24085dd89e..9bbe4ce305 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/BaseServer.java
@@ -17,7 +17,7 @@
package org.apache.eventmesh.admin.server.web;
-import org.apache.eventmesh.admin.server.ComponentLifeCycle;
+import org.apache.eventmesh.common.ComponentLifeCycle;
import org.apache.eventmesh.common.remote.payload.PayloadFactory;
import javax.annotation.PostConstruct;
@@ -40,9 +40,9 @@ public void init() throws Exception {
}
@PreDestroy
- public void shutdown() {
+ public void shutdown() throws Exception {
log.info("[{}] server will destroy", this.getClass().getSimpleName());
- destroy();
+ stop();
log.info("[{}] server has be destroy", this.getClass().getSimpleName());
}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java
index 5fbb34f489..3a4bbe3d71 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java
+++ b/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/web/GrpcServer.java
@@ -17,17 +17,14 @@
package org.apache.eventmesh.admin.server.web;
+import io.grpc.Server;
+import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
+import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.admin.server.AdminServerProperties;
-
-import java.util.concurrent.TimeUnit;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
-import io.grpc.Server;
-import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
-
-import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.TimeUnit;
@Controller
@Slf4j
@@ -52,7 +49,7 @@ public void start() throws Exception {
}
@Override
- public void destroy() {
+ public void stop() {
try {
if (server != null) {
server.shutdown();
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
new file mode 100644
index 0000000000..9a1c0a404f
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
@@ -0,0 +1,26 @@
+package org.apache.eventmesh.common;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Description:
+ */
+@Slf4j
+public abstract class AbstractComponent implements ComponentLifeCycle {
+ @Override
+ public void start() throws Exception {
+ log.info("component [{}] will start", this.getClass());
+ startup();
+ log.info("component [{}] has started successfully", this.getClass());
+ }
+
+ @Override
+ public void stop() throws Exception {
+ log.info("component [{}] will stop", this.getClass());
+ shutdown();
+ log.info("component [{}] has stopped successfully", this.getClass());
+ }
+
+ protected abstract void startup() throws Exception;
+ protected abstract void shutdown() throws Exception;
+}
diff --git a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java
similarity index 89%
rename from eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java
index 392eebfbba..76fdd548d0 100644
--- a/eventmesh-admin-server/src/main/java/org/apache/eventmesh/admin/server/ComponentLifeCycle.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/ComponentLifeCycle.java
@@ -15,14 +15,14 @@
* limitations under the License.
*/
-package org.apache.eventmesh.admin.server;
+package org.apache.eventmesh.common;
/**
- * adminServer ComponentLifeCycle
+ * LifeCycle of EventMesh Component
*/
public interface ComponentLifeCycle {
void start() throws Exception;
- void destroy();
+ void stop() throws Exception;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
new file mode 100644
index 0000000000..450d0afce0
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
@@ -0,0 +1,14 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+
+import java.util.Set;
+
+/**
+ * @Description: as class name
+ */
+@Data
+public class RdbDBDefinition {
+ private String schemaName;
+ private Set tables;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
new file mode 100644
index 0000000000..cae88fdaa9
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
@@ -0,0 +1,11 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+
+/**
+ * @Description:
+ */
+@Data
+public class RdbTableDefinition {
+ protected String tableName;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
new file mode 100644
index 0000000000..31607a1e4d
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
@@ -0,0 +1,30 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.AbstractComponent;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Description:
+ */
+@Slf4j
+@AllArgsConstructor
+public class RdbTableMgr extends AbstractComponent {
+ private final SourceConnectorConfig config;
+ private final Map tables = new HashMap<>();
+
+ @Override
+ protected void startup() throws Exception {
+ if (config != null) {
+
+ }
+ }
+
+ @Override
+ protected void shutdown() throws Exception {
+
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
index e9ae466079..07501625a5 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
@@ -19,6 +19,8 @@
import lombok.Data;
+import java.util.Set;
+
/**
* Represents the configuration for a database connector.
*/
@@ -37,8 +39,6 @@ public class SourceConnectorConfig {
private String passWord;
- private String schemaName;
-
- private String tableName;
+ private Set schemas;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
new file mode 100644
index 0000000000..4b012f8245
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
@@ -0,0 +1,17 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal.mysql;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+
+import java.util.Set;
+
+/**
+ * @Description:
+ */
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class MysqlTableDef extends RdbTableDefinition {
+ private Set colNames;
+ private Set primaryKeys;
+}
diff --git a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
index 11c2b77454..8ac09eac38 100644
--- a/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
+++ b/eventmesh-openconnect/eventmesh-openconnect-java/src/main/java/org/apache/eventmesh/openconnect/api/connector/Connector.java
@@ -17,13 +17,14 @@
package org.apache.eventmesh.openconnect.api.connector;
+import org.apache.eventmesh.common.ComponentLifeCycle;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
/**
* Connector
*/
-public interface Connector {
+public interface Connector extends ComponentLifeCycle {
/**
* Returns the class type of the configuration for this Connector.
@@ -52,13 +53,6 @@ public interface Connector {
*/
void init(ConnectorContext connectorContext) throws Exception;
- /**
- * Starts the Connector.
- *
- * @throws Exception if the start operation fails
- */
- void start() throws Exception;
-
/**
* Commits the specified ConnectRecord object.
*
@@ -73,11 +67,4 @@ public interface Connector {
*/
String name();
- /**
- * Stops the Connector.
- *
- * @throws Exception if stopping fails
- */
- void stop() throws Exception;
-
}
From 8096aa34fe8e2e38785590735f6db621591e0265 Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Mon, 1 Jul 2024 20:23:24 +0800
Subject: [PATCH 03/15] more and more
---
.../eventmesh/common/AbstractComponent.java | 16 +++-
.../rdb/canal/CanalSourceFullConfig.java | 15 ++++
.../connector/rdb/canal/RdbDBDefinition.java | 4 +-
.../connector/rdb/canal/RdbFullPosition.java | 16 ++++
.../rdb/canal/RdbTableDefinition.java | 2 +-
.../connector/rdb/canal/RdbTableMgr.java | 30 --------
.../rdb/canal/SourceConnectorConfig.java | 2 +-
.../rdb/canal/mysql/MysqlTableDef.java | 2 +-
.../connector/canal/source/EntryParser.java | 15 ++--
.../connector/CanalSourceConnector.java | 10 ++-
.../connector/CanalSourceFullConnector.java | 75 +++++++++++++++++++
.../source/position/CanalFullPositionMgr.java | 65 ++++++++++++++++
.../canal/source/table/RdbSimpleTable.java | 31 ++++++++
.../canal/source/table/RdbTableMgr.java | 61 +++++++++++++++
14 files changed, 297 insertions(+), 47 deletions(-)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java
delete mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
index 9a1c0a404f..bca798602f 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
@@ -2,23 +2,35 @@
import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.atomic.AtomicBoolean;
+
/**
* @Description:
*/
@Slf4j
public abstract class AbstractComponent implements ComponentLifeCycle {
+ private final AtomicBoolean init = new AtomicBoolean(false);
+ private final AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void start() throws Exception {
+ if (!init.compareAndSet(false, true)){
+ log.info("component [{}] has started", this.getClass());
+ return;
+ }
log.info("component [{}] will start", this.getClass());
startup();
- log.info("component [{}] has started successfully", this.getClass());
+ log.info("component [{}] started successfully", this.getClass());
}
@Override
public void stop() throws Exception {
+ if (!stopped.compareAndSet(false, true)){
+ log.info("component [{}] has stopped", this.getClass());
+ return;
+ }
log.info("component [{}] will stop", this.getClass());
shutdown();
- log.info("component [{}] has stopped successfully", this.getClass());
+ log.info("component [{}] stopped successfully", this.getClass());
}
protected abstract void startup() throws Exception;
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
new file mode 100644
index 0000000000..8c5f94bd14
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
@@ -0,0 +1,15 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.eventmesh.common.config.connector.SourceConfig;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+
+import java.util.List;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class CanalSourceFullConfig extends SourceConfig {
+ private SourceConnectorConfig connectorConfig;
+ private List startPosition;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
index 450d0afce0..8e8c075d54 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
@@ -5,10 +5,10 @@
import java.util.Set;
/**
- * @Description: as class name
+ * Description: as class name
*/
@Data
public class RdbDBDefinition {
- private String schemaName;
+ private String schema;
private Set tables;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java
new file mode 100644
index 0000000000..41267946c9
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java
@@ -0,0 +1,16 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+
+import java.math.BigDecimal;
+
+@Data
+public class RdbFullPosition {
+ private String jobId;
+ private String schema;
+ private String tableName;
+ private String curPrimaryKey;
+ private BigDecimal minPrimaryKeyNum;
+ private BigDecimal maxPrimaryKeyNum;
+ private boolean finished;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
index cae88fdaa9..b2bfbc6b84 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
@@ -3,7 +3,7 @@
import lombok.Data;
/**
- * @Description:
+ * Description: as class name
*/
@Data
public class RdbTableDefinition {
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
deleted file mode 100644
index 31607a1e4d..0000000000
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableMgr.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.eventmesh.common.config.connector.rdb.canal;
-
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.eventmesh.common.AbstractComponent;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @Description:
- */
-@Slf4j
-@AllArgsConstructor
-public class RdbTableMgr extends AbstractComponent {
- private final SourceConnectorConfig config;
- private final Map tables = new HashMap<>();
-
- @Override
- protected void startup() throws Exception {
- if (config != null) {
-
- }
- }
-
- @Override
- protected void shutdown() throws Exception {
-
- }
-}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
index 07501625a5..01aa01fe09 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/SourceConnectorConfig.java
@@ -39,6 +39,6 @@ public class SourceConnectorConfig {
private String passWord;
- private Set schemas;
+ private Set databases;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
index 4b012f8245..8125c7e5d9 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
@@ -7,7 +7,7 @@
import java.util.Set;
/**
- * @Description:
+ * Description:
*/
@Data
@EqualsAndHashCode(callSuper = true)
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
index 32c55ec42c..3ca2f7ec2f 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/EntryParser.java
@@ -31,6 +31,7 @@
import java.util.List;
import java.util.Map;
+import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
import org.springframework.util.CollectionUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
@@ -47,7 +48,7 @@
@Slf4j
public class EntryParser {
- public Map> parse(CanalSourceConfig sourceConfig, List datas) {
+ public static Map> parse(CanalSourceConfig sourceConfig, List datas) {
List recordList = new ArrayList<>();
List transactionDataBuffer = new ArrayList<>();
// need check weather the entry is loopback
@@ -116,11 +117,10 @@ private Column getColumnIgnoreCase(List columns, String columName) {
return null;
}
- private List internParse(CanalSourceConfig sourceConfig, Entry entry) {
+ private static List internParse(CanalSourceConfig sourceConfig, Entry entry, RdbTableMgr tableMgr) {
String schemaName = entry.getHeader().getSchemaName();
String tableName = entry.getHeader().getTableName();
- if (!schemaName.equalsIgnoreCase(sourceConfig.getSourceConnectorConfig().getSchemaName())
- || !tableName.equalsIgnoreCase(sourceConfig.getSourceConnectorConfig().getTableName())) {
+ if (tableMgr.getTable(schemaName, tableName) == null) {
return null;
}
@@ -155,7 +155,7 @@ private List internParse(CanalSourceConfig sourceConfig, Ent
return recordList;
}
- private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, RowChange rowChange, RowData rowData) {
+ private static CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entry entry, RowChange rowChange, RowData rowData) {
CanalConnectRecord canalConnectRecord = new CanalConnectRecord();
canalConnectRecord.setTableName(entry.getHeader().getTableName());
canalConnectRecord.setSchemaName(entry.getHeader().getSchemaName());
@@ -242,7 +242,8 @@ private CanalConnectRecord internParse(CanalSourceConfig canalSourceConfig, Entr
return canalConnectRecord;
}
- private void checkUpdateKeyColumns(Map oldKeyColumns, Map keyColumns) {
+ private static void checkUpdateKeyColumns(Map oldKeyColumns,
+ Map keyColumns) {
if (oldKeyColumns.isEmpty()) {
return;
}
@@ -264,7 +265,7 @@ private void checkUpdateKeyColumns(Map oldKeyColumns, Map configClass() {
return CanalSourceConfig.class;
@@ -146,6 +149,8 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup
return instance;
}
});
+ tableMgr = RdbTableMgr.getInstance();
+ tableMgr.init(sourceConfig.getSourceConnectorConfig());
}
private Canal buildCanal(CanalSourceConfig sourceConfig) {
@@ -218,6 +223,7 @@ public void start() throws Exception {
if (running) {
return;
}
+ tableMgr.start();
canalServer.start();
canalServer.start(sourceConfig.getDestination());
@@ -288,11 +294,9 @@ public List poll() {
entries = message.getEntries();
}
- EntryParser entryParser = new EntryParser();
-
List result = new ArrayList<>();
// key: Xid offset
- Map> connectorRecordMap = entryParser.parse(sourceConfig, entries);
+ Map> connectorRecordMap = EntryParser.parse(sourceConfig, entries);
if (!connectorRecordMap.isEmpty()) {
Set>> entrySet = connectorRecordMap.entrySet();
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
new file mode 100644
index 0000000000..434c1464d4
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -0,0 +1,75 @@
+package org.apache.eventmesh.connector.canal.source.connector;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.AbstractComponent;
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
+import org.apache.eventmesh.openconnect.api.source.Source;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+
+@Slf4j
+public class CanalSourceFullConnector extends AbstractComponent implements Source, ConnectorCreateService {
+ private CanalSourceFullConfig config;
+
+ @Override
+ protected void startup() throws Exception {
+ if (config.getConnectorConfig().getDatabases() != null) {
+ for (RdbDBDefinition db : config.getConnectorConfig().getDatabases()) {
+ for (RdbTableDefinition table : db.getTables()) {
+ log.info("[]");
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void shutdown() throws Exception {
+
+ }
+
+ @Override
+ public Source create() {
+ return new CanalSourceFullConnector();
+ }
+
+ @Override
+ public Class extends Config> configClass() {
+ return CanalSourceFullConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ this.config = (CanalSourceFullConfig)config;
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
+ this.config = (CanalSourceFullConfig) sourceConnectorContext.getSourceConfig();
+
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return this.config.getConnectorConfig().getConnectorName();
+ }
+
+ @Override
+ public List poll() {
+ return null;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
new file mode 100644
index 0000000000..70611da632
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
@@ -0,0 +1,65 @@
+package org.apache.eventmesh.connector.canal.source.position;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.AbstractComponent;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbFullPosition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable;
+
+import java.util.Map;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+@AllArgsConstructor
+@Slf4j
+public class CanalFullPositionMgr extends AbstractComponent {
+ private CanalSourceFullConfig config;
+ private ScheduledThreadPoolExecutor executor;
+ private Map positions;
+
+ @Override
+ protected void startup() throws Exception {
+ if (config == null || config.getConnectorConfig() == null || config.getConnectorConfig().getDatabases() == null) {
+ log.info("config or database is null");
+ return;
+ }
+ executor = new ScheduledThreadPoolExecutor(1, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("task-full-position-timer");
+ return thread;
+ }, new ScheduledThreadPoolExecutor.DiscardOldestPolicy());
+ if (config.getStartPosition() != null) {
+ for (RecordPosition recordPosition : config.getStartPosition()) {
+
+ }
+ }
+
+ }
+
+ private void processPositions(CanalSourceFullConfig config) {
+ for (RdbDBDefinition database : config.getConnectorConfig().getDatabases()) {
+ for (RdbTableDefinition table : database.getTables()) {
+ log.info("init position of data [{}] table [{}]", database.getSchema(), table.getTableName());
+ RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchema(), table.getTableName());
+ RdbFullPosition recordPosition = positions.get(simpleTable);
+ if (recordPosition == null || !recordPosition.isFinished()) {
+ positions.put(simpleTable,initPosition(config, database.getSchema(), table.getTableName(), recordPosition));
+ }
+ }
+ }
+ }
+
+ private RdbFullPosition initPosition(CanalSourceFullConfig config, String db, String table,
+ RdbFullPosition recordPosition) {
+ return null;
+ }
+
+ @Override
+ protected void shutdown() throws Exception {
+
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java
new file mode 100644
index 0000000000..cd8fa4c93b
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbSimpleTable.java
@@ -0,0 +1,31 @@
+package org.apache.eventmesh.connector.canal.source.table;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+
+import java.util.Objects;
+
+@Data
+public class RdbSimpleTable extends RdbTableDefinition {
+ public RdbSimpleTable(String schema, String tableName) {
+ super();
+ this.schema = schema;
+ this.tableName = tableName;
+ }
+ private String schema;
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ if (!super.equals(o)) return false;
+ RdbSimpleTable that = (RdbSimpleTable) o;
+ return Objects.equals(schema, that.schema);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(super.hashCode(), schema);
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
new file mode 100644
index 0000000000..c7673880e7
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
@@ -0,0 +1,61 @@
+package org.apache.eventmesh.connector.canal.source.table;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.eventmesh.common.AbstractComponent;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Description:
+ */
+@Slf4j
+@AllArgsConstructor
+public class RdbTableMgr extends AbstractComponent {
+ private SourceConnectorConfig config;
+ private final Map tables = new HashMap<>();
+
+ public static String generate(String ...params) {
+ return String.join("@", params);
+ }
+
+ private RdbTableMgr(){
+ }
+
+ private static class RdbTableMgrHolder {
+ private static final RdbTableMgr INSTANCE = new RdbTableMgr();
+ }
+
+ public static RdbTableMgr getInstance() {
+ return RdbTableMgrHolder.INSTANCE;
+ }
+
+ public void init(SourceConnectorConfig config) {
+ this.config = config;
+ }
+
+ public RdbTableDefinition getTable(String dbName, String tableName) {
+ return tables.get(generate(dbName, tableName));
+ }
+
+ @Override
+ protected void startup() throws Exception {
+ if (config != null && config.getDatabases() != null) {
+ for (RdbDBDefinition db : config.getDatabases()) {
+ for (RdbTableDefinition table : db.getTables()) {
+ tables.put(generate(db.getSchema(), table.getTableName()), table);
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void shutdown() throws Exception {
+
+ }
+}
From 43c1b6669ced29c917d6766d60be2c86b85b5f40 Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Fri, 5 Jul 2024 18:20:50 +0800
Subject: [PATCH 04/15] mysql type for read
---
.../canal/source/connector/CanalSourceFullConnector.java | 2 --
1 file changed, 2 deletions(-)
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index 434c1464d4..79481a69af 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -3,11 +3,9 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.AbstractComponent;
import org.apache.eventmesh.common.config.connector.Config;
-import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
-import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
From 6f093d34081c946f00e16352e035f7f1c02dc1df Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Fri, 5 Jul 2024 18:20:50 +0800
Subject: [PATCH 05/15] mysql type for read
---
build.gradle | 3 +-
eventmesh-common/build.gradle | 1 +
.../eventmesh/common/AbstractComponent.java | 4 +-
.../connector/rdb/canal/CanalMySQLType.java | 142 ++++++++
.../rdb/canal/CanalSourceFullConfig.java | 1 +
...lPosition.java => JobRdbFullPosition.java} | 5 +-
.../rdb/canal/RdbColumnDefinition.java | 11 +
.../connector/rdb/canal/RdbDBDefinition.java | 2 +-
.../rdb/canal/RdbTableDefinition.java | 1 +
.../connector/rdb/canal/mysql/Constants.java | 5 +
.../rdb/canal/mysql/MySQLColumnDef.java | 12 +
...{MysqlTableDef.java => MySQLTableDef.java} | 6 +-
.../eventmesh-connector-canal/build.gradle | 1 +
.../connector/canal/DatabaseConnection.java | 58 ++--
.../eventmesh/connector/canal/SqlUtils.java | 28 ++
.../canal/source/CanalFullProducer.java | 325 ++++++++++++++++++
.../connector/CanalSourceConnector.java | 4 +-
.../connector/CanalSourceFullConnector.java | 48 ++-
.../source/position/CanalFullPositionMgr.java | 181 +++++++++-
.../source/position/TableFullPosition.java | 13 +
.../canal/source/table/RdbTableMgr.java | 149 ++++++--
21 files changed, 920 insertions(+), 80 deletions(-)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
rename eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/{RdbFullPosition.java => JobRdbFullPosition.java} (70%)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java
rename eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/{MysqlTableDef.java => MySQLTableDef.java} (59%)
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java
diff --git a/build.gradle b/build.gradle
index 0408231fd7..3d2f8604a3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -799,8 +799,9 @@ subprojects {
dependency "com.alibaba:druid-spring-boot-starter:1.2.23"
dependency "com.baomidou:mybatis-plus-boot-starter:3.5.5"
dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.18"
- dependency "com.baomidou:mybatis-plus:3.5.7"
dependency "com.mysql:mysql-connector-j:8.4.0"
+ dependency "org.springframework.boot:spring-boot-starter-jetty:2.7.10"
+ dependency "org.locationtech.jts:jts-core:1.19.0"
}
}
}
diff --git a/eventmesh-common/build.gradle b/eventmesh-common/build.gradle
index 70244d2299..c95e9f6c29 100644
--- a/eventmesh-common/build.gradle
+++ b/eventmesh-common/build.gradle
@@ -48,6 +48,7 @@ dependencies {
implementation "org.apache.httpcomponents:httpclient"
implementation "io.netty:netty-all"
+ compileOnly 'com.mysql:mysql-connector-j'
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
index bca798602f..bc4e9ad404 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/AbstractComponent.java
@@ -9,11 +9,11 @@
*/
@Slf4j
public abstract class AbstractComponent implements ComponentLifeCycle {
- private final AtomicBoolean init = new AtomicBoolean(false);
+ private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
@Override
public void start() throws Exception {
- if (!init.compareAndSet(false, true)){
+ if (!started.compareAndSet(false, true)){
log.info("component [{}] has started", this.getClass());
return;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
new file mode 100644
index 0000000000..969e631e93
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
@@ -0,0 +1,142 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import com.mysql.cj.MysqlType;
+
+import java.sql.JDBCType;
+
+public enum CanalMySQLType {
+ BIT("BIT"),
+ TINYINT("TINYINT"),
+ SMALLINT("SMALLINT"),
+ MEDIUMINT("MEDIUMINT"),
+ INT("INT"),
+ BIGINT("BIGINT"),
+ DECIMAL("DECIMAL"),
+ FLOAT("FLOAT", JDBCType.REAL),
+ DOUBLE("DOUBLE", JDBCType.DOUBLE),
+ DATE("DATE", JDBCType.DATE),
+ DATETIME("DATETIME", JDBCType.TIMESTAMP),
+ TIMESTAMP("TIMESTAMP", JDBCType.TIMESTAMP),
+ TIME("TIME", JDBCType.TIME),
+ YEAR("YEAR", JDBCType.DATE),
+ CHAR("CHAR", JDBCType.CHAR),
+ VARCHAR("VARCHAR", JDBCType.VARCHAR),
+ BINARY("BINARY", JDBCType.BINARY),
+ VARBINARY("VARBINARY", JDBCType.VARBINARY),
+ TINYBLOB("TINYBLOB", JDBCType.VARBINARY),
+ BLOB("BLOB", JDBCType.LONGVARBINARY),
+ MEDIUMBLOB("MEDIUMBLOB", JDBCType.LONGVARBINARY),
+ LONGBLOB("LONGBLOB", JDBCType.LONGVARBINARY),
+ TINYTEXT("TINYTEXT", JDBCType.VARCHAR),
+ TEXT("TEXT", JDBCType.LONGVARCHAR),
+ MEDIUMTEXT("MEDIUMTEXT", JDBCType.LONGVARCHAR),
+ LONGTEXT("LONGTEXT", JDBCType.LONGVARCHAR),
+ ENUM("ENUM", JDBCType.CHAR),
+ SET("SET", JDBCType.CHAR),
+ JSON("JSON", JDBCType.LONGVARCHAR),
+ GEOMETRY("GEOMETRY", JDBCType.BINARY),
+ POINT("POINT", JDBCType.BINARY),
+ LINESTRING("LINESTRING", JDBCType.BINARY),
+ POLYGON("POLYGON", JDBCType.BINARY),
+ MULTIPOINT("MULTIPOINT", JDBCType.BINARY),
+ GEOMETRY_COLLECTION("GEOMETRYCOLLECTION", JDBCType.BINARY),
+ GEOM_COLLECTION("GEOMCOLLECTION", JDBCType.BINARY),
+ MULTILINESTRING("MULTILINESTRING", JDBCType.BINARY),
+ MULTIPOLYGON("MULTIPOLYGON", JDBCType.BINARY);
+
+ private final String codeKey;
+ private final MysqlType mysqlType;
+
+ CanalMySQLType(String codeKey) {
+ this.codeKey = codeKey;
+ this.mysqlType = MysqlType.getByName(codeKey);
+ }
+
+ public static CanalMySQLType valueOfCode(String code) {
+ CanalMySQLType[] values = values();
+ for (CanalMySQLType tableType : values) {
+ if (tableType.codeKey.equalsIgnoreCase(code)) {
+ return tableType;
+ }
+ }
+ switch (MysqlType.getByName(code)) {
+ case BOOLEAN:
+ case TINYINT:
+ case TINYINT_UNSIGNED:
+ return TINYINT;
+ case SMALLINT:
+ case SMALLINT_UNSIGNED:
+ return SMALLINT;
+ case INT:
+ case INT_UNSIGNED:
+ return INT;
+ case BIGINT:
+ case BIGINT_UNSIGNED:
+ return BIGINT;
+ case MEDIUMINT:
+ case MEDIUMINT_UNSIGNED:
+ return MEDIUMINT;
+ case DECIMAL:
+ case DECIMAL_UNSIGNED:
+ return DECIMAL;
+ case FLOAT:
+ case FLOAT_UNSIGNED:
+ return FLOAT;
+ case DOUBLE:
+ case DOUBLE_UNSIGNED:
+ return DOUBLE;
+ case BIT:
+ return BIT;
+ case BINARY:
+ return BINARY;
+ case VARBINARY:
+ return VARBINARY;
+ case TINYBLOB:
+ return TINYBLOB;
+ case MEDIUMBLOB:
+ return MEDIUMBLOB;
+ case LONGBLOB:
+ return LONGBLOB;
+ case BLOB:
+ return BLOB;
+ case CHAR:
+ return CHAR;
+ case VARCHAR:
+ return VARCHAR;
+ case TINYTEXT:
+ return TINYTEXT;
+ case MEDIUMTEXT:
+ return MEDIUMTEXT;
+ case LONGTEXT:
+ return LONGTEXT;
+ case TEXT:
+ return TEXT;
+ case DATE:
+ return DATE;
+ case TIME:
+ return TIME;
+ case TIMESTAMP:
+ return TIMESTAMP;
+ case DATETIME:
+ return DATETIME;
+ case YEAR:
+ return YEAR;
+ case JSON:
+ return JSON;
+ case ENUM:
+ return ENUM;
+ case SET:
+ return SET;
+ case GEOMETRY:
+ return GEOMETRY;
+ case NULL:
+ case UNKNOWN:
+ default:
+ throw new UnsupportedOperationException("Unsupported mysql columnType " + code);
+ }
+ }
+
+ public MysqlType getMysqlType() {
+ return mysqlType;
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
index 8c5f94bd14..fcfa6a0e92 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
@@ -12,4 +12,5 @@
public class CanalSourceFullConfig extends SourceConfig {
private SourceConnectorConfig connectorConfig;
private List startPosition;
+ private int parallel;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
similarity index 70%
rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
index 41267946c9..b4aeffd721 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbFullPosition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
@@ -5,12 +5,11 @@
import java.math.BigDecimal;
@Data
-public class RdbFullPosition {
+public class JobRdbFullPosition {
private String jobId;
private String schema;
private String tableName;
private String curPrimaryKey;
- private BigDecimal minPrimaryKeyNum;
- private BigDecimal maxPrimaryKeyNum;
+ private long maxCount;
private boolean finished;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java
new file mode 100644
index 0000000000..2fe4d7a2f9
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbColumnDefinition.java
@@ -0,0 +1,11 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+
+import java.sql.JDBCType;
+
+@Data
+public class RdbColumnDefinition {
+ protected String name;
+ protected JDBCType jdbcType;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
index 8e8c075d54..d6b08503dd 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbDBDefinition.java
@@ -9,6 +9,6 @@
*/
@Data
public class RdbDBDefinition {
- private String schema;
+ private String schemaName;
private Set tables;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
index b2bfbc6b84..5001a8bcb8 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/RdbTableDefinition.java
@@ -7,5 +7,6 @@
*/
@Data
public class RdbTableDefinition {
+ protected String schemaName;
protected String tableName;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java
new file mode 100644
index 0000000000..3025be245b
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/Constants.java
@@ -0,0 +1,5 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal.mysql;
+
+public class Constants {
+ public static final String MySQLQuot = "`";
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java
new file mode 100644
index 0000000000..8f1fcbd741
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLColumnDef.java
@@ -0,0 +1,12 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal.mysql;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class MySQLColumnDef extends RdbColumnDefinition {
+ private CanalMySQLType type;
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java
similarity index 59%
rename from eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
rename to eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java
index 8125c7e5d9..439d80bcd5 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MysqlTableDef.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/mysql/MySQLTableDef.java
@@ -2,8 +2,10 @@
import lombok.Data;
import lombok.EqualsAndHashCode;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import java.util.Map;
import java.util.Set;
/**
@@ -11,7 +13,7 @@
*/
@Data
@EqualsAndHashCode(callSuper = true)
-public class MysqlTableDef extends RdbTableDefinition {
- private Set colNames;
+public class MySQLTableDef extends RdbTableDefinition {
private Set primaryKeys;
+ private Map columnDefinitions;
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/build.gradle b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
index ccc5acf0ca..77a9968496 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/build.gradle
+++ b/eventmesh-connectors/eventmesh-connector-canal/build.gradle
@@ -23,6 +23,7 @@ List canal = [
dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
+ implementation "org.locationtech.jts"
implementation project(":eventmesh-common")
implementation canal
implementation "com.alibaba:druid:1.2.23"
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
index 0d9da7f8be..00f9693be3 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
@@ -36,42 +36,36 @@ public class DatabaseConnection {
public static CanalSinkConfig sinkConfig;
+ public static DruidDataSource createDruidDataSource(String url, String UserName, String passWord) {
+ DruidDataSource dataSource = new DruidDataSource();
+ dataSource.setUrl(url);
+ dataSource.setUsername(UserName);
+ dataSource.setPassword(passWord);
+ dataSource.setInitialSize(5);
+ dataSource.setMinIdle(5);
+ dataSource.setMaxActive(20);
+ dataSource.setMaxWait(60000);
+ dataSource.setTimeBetweenEvictionRunsMillis(60000);
+ dataSource.setMinEvictableIdleTimeMillis(300000);
+ dataSource.setValidationQuery("SELECT 1");
+ dataSource.setTestWhileIdle(true);
+ dataSource.setTestOnBorrow(false);
+ dataSource.setTestOnReturn(false);
+ dataSource.setPoolPreparedStatements(true);
+ dataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+ return dataSource;
+ }
+
public static void initSourceConnection() {
- sourceDataSource = new DruidDataSource();
- sourceDataSource.setUrl(sourceConfig.getSourceConnectorConfig().getUrl());
- sourceDataSource.setUsername(sourceConfig.getSourceConnectorConfig().getUserName());
- sourceDataSource.setPassword(sourceConfig.getSourceConnectorConfig().getPassWord());
- sourceDataSource.setInitialSize(5);
- sourceDataSource.setMinIdle(5);
- sourceDataSource.setMaxActive(20);
- sourceDataSource.setMaxWait(60000);
- sourceDataSource.setTimeBetweenEvictionRunsMillis(60000);
- sourceDataSource.setMinEvictableIdleTimeMillis(300000);
- sourceDataSource.setValidationQuery("SELECT 1");
- sourceDataSource.setTestWhileIdle(true);
- sourceDataSource.setTestOnBorrow(false);
- sourceDataSource.setTestOnReturn(false);
- sourceDataSource.setPoolPreparedStatements(true);
- sourceDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+ sourceDataSource = createDruidDataSource(sourceConfig.getSourceConnectorConfig().getUrl(),
+ sourceConfig.getSourceConnectorConfig().getUserName(),
+ sourceConfig.getSourceConnectorConfig().getPassWord());
}
public static void initSinkConnection() {
- sinkDataSource = new DruidDataSource();
- sinkDataSource.setUrl(sinkConfig.getSinkConnectorConfig().getUrl());
- sinkDataSource.setUsername(sinkConfig.getSinkConnectorConfig().getUserName());
- sinkDataSource.setPassword(sinkConfig.getSinkConnectorConfig().getPassWord());
- sinkDataSource.setInitialSize(5);
- sinkDataSource.setMinIdle(5);
- sinkDataSource.setMaxActive(20);
- sinkDataSource.setMaxWait(60000);
- sinkDataSource.setTimeBetweenEvictionRunsMillis(60000);
- sinkDataSource.setMinEvictableIdleTimeMillis(300000);
- sinkDataSource.setValidationQuery("SELECT 1");
- sinkDataSource.setTestWhileIdle(true);
- sinkDataSource.setTestOnBorrow(false);
- sinkDataSource.setTestOnReturn(false);
- sinkDataSource.setPoolPreparedStatements(true);
- sinkDataSource.setMaxPoolPreparedStatementPerConnectionSize(20);
+ sinkDataSource = createDruidDataSource(sinkConfig.getSinkConnectorConfig().getUrl(),
+ sinkConfig.getSinkConnectorConfig().getUserName(),
+ sinkConfig.getSinkConnectorConfig().getPassWord());
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
index f6c4329e23..6d4ca03697 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
@@ -20,6 +20,7 @@
import static org.apache.eventmesh.connector.canal.ByteArrayConverter.SQL_BYTES;
import static org.apache.eventmesh.connector.canal.SqlTimestampConverter.SQL_TIMESTAMP;
+import com.mysql.cj.MysqlType;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.lang.StringUtils;
@@ -30,12 +31,15 @@
import java.sql.Blob;
import java.sql.Clob;
import java.sql.Date;
+import java.sql.JDBCType;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Time;
import java.sql.Timestamp;
import java.sql.Types;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
@@ -109,6 +113,25 @@ public class SqlUtils {
sqlTypeToJavaTypeMap.put(Types.CLOB, String.class);
}
+ public static String genPrepareSqlOfInClause(int size) {
+ StringBuilder sql = new StringBuilder();
+ sql.append("(");
+ for (int i = 0; i < size; i++) {
+ sql.append("?");
+ if (i < size - 1) {
+ sql.append(",");
+ }
+ }
+ sql.append(")");
+ return sql.toString();
+ }
+
+ public static void setInClauseParameters(PreparedStatement preparedStatement, List params) throws SQLException {
+ for (int i = 0; i < params.size(); i++) {
+ preparedStatement.setString(i + 1, params.get(i));
+ }
+ }
+
public static String sqlValueToString(ResultSet rs, int index, int sqlType) throws SQLException {
Class> requiredType = sqlTypeToJavaTypeMap.get(sqlType);
if (requiredType == null) {
@@ -293,4 +316,9 @@ public static boolean isTextType(int sqlType) {
|| sqlType == Types.NCHAR || sqlType == Types.NVARCHAR || sqlType == Types.NCLOB
|| sqlType == Types.LONGNVARCHAR;
}
+
+ public static JDBCType toJDBCType(String connectorDataType) {
+ MysqlType mysqlType = MysqlType.getByName(connectorDataType);
+ return JDBCType.valueOf(mysqlType.getJdbcType());
+ }
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
new file mode 100644
index 0000000000..9c3a6d07fb
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
@@ -0,0 +1,325 @@
+package org.apache.eventmesh.connector.canal.source;
+
+import com.mysql.cj.MysqlType;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.canal.source.position.TableFullPosition;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.locationtech.jts.io.WKBReader;
+
+import javax.sql.DataSource;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.LockSupport;
+
+
+
+@Slf4j
+public class CanalFullProducer {
+ private BlockingQueue> queue;
+ private final DataSource dataSource;
+ private final MySQLTableDef tableDefinition;
+ private final TableFullPosition position;
+ private static final int LIMIT = 2048;
+ private final int flushSize;
+ private final AtomicReference choosePrimaryKey = new AtomicReference<>(null);
+ private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+ private static final DateTimeFormatter DATE_STAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+ private static final WKBReader WKB_READER = new wk
+
+ public CanalFullProducer(BlockingQueue> queue, DataSource dataSource,
+ MySQLTableDef tableDefinition, TableFullPosition position, int flushSize) {
+ this.queue = queue;
+ this.dataSource = dataSource;
+ this.tableDefinition = tableDefinition;
+ this.position = position;
+ this.flushSize = flushSize;
+ }
+
+ public void start(AtomicBoolean flag) {
+ boolean isNextPage = false;
+ ArrayList records = new ArrayList<>();
+ while (flag.get()) {
+ String scanSql = generateScanSql(tableDefinition, !isNextPage);
+ log.info("scan sql is [{}] , cur position [{}]", scanSql,
+ JsonUtils.toJSONString(position.getCurPrimaryKeyCols()));
+
+ try (Connection connection = dataSource.getConnection(); PreparedStatement statement =
+ connection.prepareStatement(scanSql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)) {
+ statement.setFetchSize(Integer.MIN_VALUE);
+ setPrepareStatementValue(statement, position);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (flag.get() && resultSet.next()) {
+
+
+ if (records.size() < flushSize) {
+ continue;
+ }
+ queue.put(records);
+ records = new ArrayList<>();
+ }
+ } catch (InterruptedException ignore) {
+ }
+ } catch (SQLException e) {
+ log.error("create connection fail", e);
+ LockSupport.parkNanos(3000 * 1000L);
+ }
+ if (!isNextPage) {
+ isNextPage = true;
+ }
+ }
+
+ }
+
+ public Object readColumn(ResultSet rs, String col, MysqlType colType) throws Exception {
+ switch (colType) {
+ case TINYINT:
+ case SMALLINT:
+ case MEDIUMINT:
+ case INT:
+ Long uLong;
+ if (rs.wasNull()) {
+ return null;
+ } else {
+ uLong = rs.getLong(col);
+ }
+ if (uLong.compareTo(2147483647L) > 0) {
+ return uLong;
+ }
+ return uLong.intValue();
+ case BIGINT:
+ String v = rs.getString(col);
+ if (v == null) {
+ return null;
+ }
+ BigDecimal uBigInt = new BigDecimal(v);
+ if (uBigInt.compareTo(BigDecimal.valueOf(Long.MAX_VALUE)) > 0) {
+ return uBigInt;
+ }
+ return uBigInt.longValue();
+ case FLOAT:
+ case DOUBLE:
+ case DECIMAL:
+ return rs.getBigDecimal(col);
+ case DATE:
+ return rs.getObject(col, LocalDate.class);
+ case TIME:
+ return rs.getObject(col, LocalTime.class);
+ case DATETIME:
+ case TIMESTAMP:
+ return rs.getObject(col, LocalDateTime.class);
+ case YEAR:
+ if (rs.wasNull()) {
+ return null;
+ }
+ return rs.getInt(col);
+ case CHAR:
+ case VARCHAR:
+ case TINYTEXT:
+ case TEXT:
+ case MEDIUMTEXT:
+ case LONGTEXT:
+ case ENUM:
+ case SET:
+ case JSON:
+ return rs.getString(col);
+ case BIT:
+ case BINARY:
+ case VARBINARY:
+ case TINYBLOB:
+ case BLOB:
+ case MEDIUMBLOB:
+ case LONGBLOB:
+ return rs.getBytes(col);
+ case GEOMETRY:
+ String wkb = rs.getString(col);
+ if (wkb == null) {
+ return null;
+ }
+ return safeToGisWKT("0x" + wkb);
+ default:
+ return rs.getObject(col);
+ }
+ }
+
+ private void refreshPosition() {
+
+ }
+
+ private void setPrepareStatementValue(PreparedStatement statement, TableFullPosition position) throws SQLException {
+ String colName = choosePrimaryKey.get();
+ if (colName == null) {
+ return;
+ }
+ RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(colName);
+ Object value = position.getCurPrimaryKeyCols().get(choosePrimaryKey);
+ String str;
+ switch (columnDefinition.getJdbcType()) {
+ case BIT:
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ case BIGINT:
+ statement.setBigDecimal(1, new BigDecimal(String.valueOf(value)));
+ break;
+ case DECIMAL:
+ case FLOAT:
+ case DOUBLE:
+ case NUMERIC:
+ statement.setDouble(1, new BigDecimal(String.valueOf(value)).doubleValue());
+ break;
+ case CHAR:
+ case VARCHAR:
+ case LONGNVARCHAR:
+ case NCHAR:
+ case NVARCHAR:
+ case LONGVARCHAR:
+ case CLOB:
+ case NCLOB:
+ statement.setString(1, String.valueOf(value));
+ break;
+ case BLOB:
+ case VARBINARY:
+ case BINARY:
+ str = String.valueOf(value);
+ String hexStr = str;
+ if (str.startsWith("0x")) {
+ hexStr = str.substring(str.indexOf("0x"));
+ }
+ byte[] bytes = hex2bytes(hexStr);
+ statement.setBytes(1, bytes);
+ break;
+ case DATE:
+ Instant d;
+ if (value instanceof Long) {
+ Long val = (Long)value;
+ d = Instant.ofEpochMilli(val);
+ str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
+ } else if (value instanceof Integer) {
+ Integer val = (Integer)value;
+ d = Instant.ofEpochMilli((long)val);
+ str = d.atZone(ZoneId.systemDefault()).toLocalDateTime().format(DATE_FORMATTER);
+ } else if (value instanceof String) {
+ str = (String) value;
+ } else {
+ if (!(value instanceof LocalDate)) {
+ throw new IllegalArgumentException("unsupported date class type:" + value.getClass().getSimpleName());
+ }
+ str = ((LocalDate)value).format(DATE_FORMATTER);
+ }
+ statement.setString(1, str);
+ break;
+ case TIMESTAMP:
+ if (value instanceof String) {
+ str = (String)value;
+ } else {
+ if (!(value instanceof LocalDateTime)) {
+ throw new IllegalArgumentException("unsupported timestamp class type:" + value.getClass().getSimpleName());
+ }
+ str = ((LocalDateTime)value).format(DATE_STAMP_FORMATTER);
+ }
+ statement.setString(1, str);
+ break;
+ default:
+ throw new EventMeshException(String.format("not support the primary key type [%s]", value.getClass()));
+ }
+ }
+
+ public static byte[] hex2bytes(String hexStr) {
+ if (hexStr == null)
+ return null;
+ if (StringUtils.isBlank(hexStr)) {
+ return new byte[0];
+ }
+
+ if (hexStr.length() % 2 == 1) {
+ hexStr = "0" + hexStr;
+ }
+
+ int count = hexStr.length() / 2;
+ byte[] ret = new byte[count];
+ for (int i = 0; i < count; i++) {
+ int index = i * 2;
+ char c1 = hexStr.charAt(index);
+ char c2 = hexStr.charAt(index + 1);
+ if (c1 < '0' || c1 > 'F' || c2 < '0' || c2 > 'F') {
+ throw new EventMeshException(String.format("illegal byte [%s], [%s]" , c1, c2));
+ }
+ ret[i] = (byte) ((byte)c1 << 4);
+ ret[i] = (byte) (ret[i] | (byte)(c2));
+ }
+ return ret;
+ }
+
+ private void generateQueryColumnsSql(StringBuilder builder, Collection rdbColDefs) {
+ if (rdbColDefs == null || rdbColDefs.isEmpty()) {
+ builder.append("*");
+ return;
+ }
+ boolean first = true;
+ for (RdbColumnDefinition colDef : rdbColDefs) {
+ if (first) {
+ first = false;
+ } else {
+ builder.append(",");
+ }
+ builder.append(Constants.MySQLQuot);
+ builder.append(colDef.getName());
+ builder.append(Constants.MySQLQuot);
+ }
+ }
+
+ private String generateScanSql(MySQLTableDef tableDefinition, boolean isEquals) {
+ StringBuilder builder = new StringBuilder();
+ builder.append("select ");
+ generateQueryColumnsSql(builder, tableDefinition.getColumnDefinitions().values());
+ buildWhereSql(builder, tableDefinition, isEquals);
+ builder.append(" limit " + LIMIT);
+ return builder.toString();
+ }
+
+ private void buildWhereSql(StringBuilder builder, MySQLTableDef tableDefinition, boolean isEquals) {
+ String colName = null;
+ for (RdbColumnDefinition col : tableDefinition.getColumnDefinitions().values()) {
+ if (position.getCurPrimaryKeyCols().get(col.getName()) != null) {
+ colName = col.getName();
+ choosePrimaryKey.set(colName);
+ break;
+ }
+ }
+ if (colName == null) {
+ throw new EventMeshException("not support only have one primary key of the timestamp type");
+ }
+ builder.append(" where ")
+ .append(Constants.MySQLQuot)
+ .append(colName)
+ .append(Constants.MySQLQuot);
+ if (isEquals) {
+ builder.append(" >= ? ");
+ } else {
+ builder.append(" > ? ");
+ }
+ builder.append(" order by ").append(Constants.MySQLQuot).append(colName).append(Constants.MySQLQuot)
+ .append(" asc ");
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
index efdb4e4c95..c1c381c91f 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java
@@ -17,6 +17,7 @@
package org.apache.eventmesh.connector.canal.source.connector;
+import com.alibaba.otter.canal.parse.inbound.TableMeta;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
@@ -149,8 +150,7 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup
return instance;
}
});
- tableMgr = RdbTableMgr.getInstance();
- tableMgr.init(sourceConfig.getSourceConnectorConfig());
+ tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig());
}
private Canal buildCanal(CanalSourceConfig sourceConfig) {
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
index 79481a69af..b7e15157f0 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceFullConnector.java
@@ -2,28 +2,71 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.AbstractComponent;
+import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.config.connector.Config;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
+import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.connector.canal.DatabaseConnection;
+import org.apache.eventmesh.connector.canal.source.position.CanalFullPositionMgr;
+import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable;
+import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import javax.sql.DataSource;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
@Slf4j
public class CanalSourceFullConnector extends AbstractComponent implements Source, ConnectorCreateService {
private CanalSourceFullConfig config;
+ private CanalFullPositionMgr positionMgr;
+ private RdbTableMgr tableMgr;
+ private ThreadPoolExecutor executor;
+ private final Map dataSources = new HashMap<>();
+ private final BlockingQueue> queue = new LinkedBlockingQueue<>();
@Override
protected void startup() throws Exception {
+ this.tableMgr.start();
+ this.positionMgr.start();
+ if (positionMgr.isFinished()) {
+ log.info("connector [{}] has finished the job", config.getConnectorConfig().getConnectorName());
+ return;
+ }
+ executor = new ThreadPoolExecutor(config.getParallel(), config.getParallel(),0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(), new EventMeshThreadFactory("canal-source-full"));
if (config.getConnectorConfig().getDatabases() != null) {
for (RdbDBDefinition db : config.getConnectorConfig().getDatabases()) {
for (RdbTableDefinition table : db.getTables()) {
- log.info("[]");
+ log.info("it will create producer of db [{}] table [{}]", db.getSchemaName(), table.getTableName());
+ DataSource dataSource = dataSources.computeIfAbsent(db.getSchemaName(),
+ k -> DatabaseConnection.createDruidDataSource(null, config.getConnectorConfig().getUserName(),
+ config.getConnectorConfig().getPassWord()));
+ RdbSimpleTable simpleTable = new RdbSimpleTable(db.getSchemaName(), table.getTableName());
+ JobRdbFullPosition position = positionMgr.getPosition(simpleTable);
+ if (position == null) {
+ throw new EventMeshException(String.format("db [%s] table [%s] have none position info",
+ db.getSchemaName(), table.getTableName()));
+ }
+ RdbTableDefinition tableDefinition = tableMgr.getTable(simpleTable);
+ if (tableDefinition == null) {
+ throw new EventMeshException(String.format("db [%s] table [%s] have none table definition info",
+ db.getSchemaName(), table.getTableName()));
+ }
+
}
}
}
@@ -53,7 +96,8 @@ public void init(Config config) throws Exception {
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.config = (CanalSourceFullConfig) sourceConnectorContext.getSourceConfig();
-
+ this.tableMgr = new RdbTableMgr(config.getConnectorConfig());
+ this.positionMgr = new CanalFullPositionMgr(config, tableMgr);
}
@Override
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
index 70611da632..8003ef1784 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/CanalFullPositionMgr.java
@@ -1,24 +1,46 @@
package org.apache.eventmesh.connector.canal.source.position;
+import com.alibaba.druid.pool.DruidDataSource;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.AbstractComponent;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceFullConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
-import org.apache.eventmesh.common.config.connector.rdb.canal.RdbFullPosition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef;
import org.apache.eventmesh.common.remote.offset.RecordPosition;
+import org.apache.eventmesh.common.utils.JsonUtils;
+import org.apache.eventmesh.connector.canal.DatabaseConnection;
import org.apache.eventmesh.connector.canal.source.table.RdbSimpleTable;
+import org.apache.eventmesh.connector.canal.source.table.RdbTableMgr;
+import javax.sql.DataSource;
+import java.sql.JDBCType;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledThreadPoolExecutor;
-@AllArgsConstructor
@Slf4j
public class CanalFullPositionMgr extends AbstractComponent {
+
private CanalSourceFullConfig config;
private ScheduledThreadPoolExecutor executor;
- private Map positions;
+ private Map positions = new LinkedHashMap<>();
+ private RdbTableMgr tableMgr;
+
+ public CanalFullPositionMgr(CanalSourceFullConfig config, RdbTableMgr tableMgr) {
+ this.config = config;
+ this.tableMgr = tableMgr;
+ }
@Override
protected void startup() throws Exception {
@@ -33,31 +55,166 @@ protected void startup() throws Exception {
return thread;
}, new ScheduledThreadPoolExecutor.DiscardOldestPolicy());
if (config.getStartPosition() != null) {
- for (RecordPosition recordPosition : config.getStartPosition()) {
+ processPositions(config.getStartPosition());
+ }
+
+ }
+ public JobRdbFullPosition getPosition(RdbSimpleTable table) {
+ return positions.get(table);
+ }
+
+ public boolean isFinished() {
+ for (JobRdbFullPosition position : positions.values()) {
+ if (!position.isFinished()) {
+ log.info("schema [{}] table [{}] is not finish", position.getSchema(), position.getTableName());
+ return false;
}
}
-
+ return true;
}
- private void processPositions(CanalSourceFullConfig config) {
+ private void processPositions(List startPosition) throws SQLException {
for (RdbDBDefinition database : config.getConnectorConfig().getDatabases()) {
for (RdbTableDefinition table : database.getTables()) {
- log.info("init position of data [{}] table [{}]", database.getSchema(), table.getTableName());
- RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchema(), table.getTableName());
- RdbFullPosition recordPosition = positions.get(simpleTable);
+ RdbSimpleTable simpleTable = new RdbSimpleTable(database.getSchemaName(), table.getTableName());
+ RdbTableDefinition tableDefinition;
+ if ((tableDefinition = tableMgr.getTable(simpleTable)) == null) {
+ log.error("db [{}] table [{}] definition is null", database.getSchemaName(), table.getTableName());
+ continue;
+ }
+ log.info("init position of data [{}] table [{}]", database.getSchemaName(), table.getTableName());
+
+ JobRdbFullPosition recordPosition = positions.get(simpleTable);
if (recordPosition == null || !recordPosition.isFinished()) {
- positions.put(simpleTable,initPosition(config, database.getSchema(), table.getTableName(), recordPosition));
+ try (DruidDataSource dataSource = DatabaseConnection.createDruidDataSource(null,
+ config.getConnectorConfig().getUserName(), config.getConnectorConfig().getPassWord())) {
+ positions.put(simpleTable, initPosition(dataSource, (MySQLTableDef)tableDefinition, recordPosition));
+ }
}
}
}
}
- private RdbFullPosition initPosition(CanalSourceFullConfig config, String db, String table,
- RdbFullPosition recordPosition) {
+ private JobRdbFullPosition initPosition(DataSource dataSource, MySQLTableDef tableDefinition,
+ JobRdbFullPosition recordPosition) throws SQLException {
+ TableFullPosition position = new TableFullPosition();
+ Map preMinPrimaryKeys = new LinkedHashMap<>();
+ Map preMaxPrimaryKeys = new LinkedHashMap<>();
+ for (String pk : tableDefinition.getPrimaryKeys()) {
+ Object min = fetchMinPrimaryKey(dataSource, tableDefinition, preMinPrimaryKeys, pk);
+ Object max = fetchMaxPrimaryKey(dataSource, tableDefinition, preMaxPrimaryKeys, pk);
+ preMinPrimaryKeys.put(pk, min);
+ preMaxPrimaryKeys.put(pk, max);
+ position.getCurPrimaryKeyCols().put(pk, min);
+ position.getMinPrimaryKeyCols().put(pk, min);
+ position.getMaxPrimaryKeyCols().put(pk, max);
+ }
+ long rowCount = queryCurTableRowCount(dataSource, tableDefinition);
+ JobRdbFullPosition jobRdbFullPosition = new JobRdbFullPosition();
+ if (recordPosition != null ) {
+ if (StringUtils.isNotBlank(recordPosition.getCurPrimaryKey())) {
+ TableFullPosition record = JsonUtils.parseObject(recordPosition.getCurPrimaryKey(), TableFullPosition.class);
+ if (record != null && record.getCurPrimaryKeyCols() != null && !record.getCurPrimaryKeyCols().isEmpty()) {
+ position.setCurPrimaryKeyCols(record.getCurPrimaryKeyCols());
+ }
+ }
+ }
+ jobRdbFullPosition.setSchema(tableDefinition.getSchemaName());
+ jobRdbFullPosition.setTableName(tableDefinition.getTableName());
+ jobRdbFullPosition.setMaxCount(rowCount);
+ jobRdbFullPosition.setCurPrimaryKey(JsonUtils.toJSONString(position.getCurPrimaryKeyCols()));
+ return jobRdbFullPosition;
+ }
+
+
+ private long queryCurTableRowCount(DataSource datasource, MySQLTableDef tableDefinition) throws SQLException {
+ String sql =
+ "select AVG_ROW_LENGTH,DATA_LENGTH from information_schema.TABLES where TABLE_SCHEMA=" + Constants.MySQLQuot + tableDefinition.getSchemaName() + Constants.MySQLQuot + " and TABLE_NAME="+ Constants.MySQLQuot + tableDefinition.getTableName() + Constants.MySQLQuot;
+ try (Statement statement = datasource.getConnection().createStatement(); ResultSet resultSet = statement.executeQuery(sql)) {
+ long result = 0L;
+ if (resultSet.next()) {
+ long avgRowLength = resultSet.getLong("AVG_ROW_LENGTH");
+ long dataLength = resultSet.getLong("DATA_LENGTH");
+ result = dataLength / avgRowLength;
+ }
+ return result;
+ }
+ }
+
+ private void appendPrePrimaryKey(Map preMap, StringBuilder sql) {
+ if (preMap != null && !preMap.isEmpty()) {
+ sql.append(" WHERE ");
+ boolean first = true;
+ for (Map.Entry entry : preMap.entrySet()) {
+ if (first) {
+ first = false;
+ } else {
+ sql.append(" AND ");
+ }
+ sql.append(Constants.MySQLQuot).append(entry.getKey()).append(Constants.MySQLQuot).append("=?");
+ }
+ }
+ }
+
+ private void setValue2Statement(PreparedStatement ps, Map preMap, MySQLTableDef tableDefinition) throws SQLException {
+ if (preMap != null && !preMap.isEmpty()) {
+ int index = 1;
+ for (Map.Entry entry : preMap.entrySet()) {
+ RdbColumnDefinition def = tableDefinition.getColumnDefinitions().get(entry.getKey());
+ ps.setObject(index, entry.getValue(), def.getJdbcType().getVendorTypeNumber());
+ ++index;
+ }
+ }
+ }
+
+ private Object fetchMinPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition,
+ Map prePrimary, String curPrimaryKeyCol) throws SQLException {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT MIN(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot).append(") min_primary_key FROM")
+ .append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot).append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot);
+ appendPrePrimaryKey(prePrimary, builder);
+ String sql = builder.toString();
+ try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)){
+ setValue2Statement(statement, prePrimary, tableDefinition);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol);
+ if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) {
+ return resultSet.getString("min_primary_key");
+ } else {
+ return resultSet.getObject("min_primary_key");
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private Object fetchMaxPrimaryKey(DataSource dataSource, MySQLTableDef tableDefinition,
+ Map prePrimary, String curPrimaryKeyCol) throws SQLException {
+ StringBuilder builder = new StringBuilder();
+ builder.append("SELECT MAX(").append(Constants.MySQLQuot).append(curPrimaryKeyCol).append(Constants.MySQLQuot).append(") max_primary_key FROM")
+ .append(Constants.MySQLQuot).append(tableDefinition.getSchemaName()).append(Constants.MySQLQuot).append(".").append(Constants.MySQLQuot).append(tableDefinition.getTableName()).append(Constants.MySQLQuot);
+ appendPrePrimaryKey(prePrimary, builder);
+ String sql = builder.toString();
+ try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)){
+ setValue2Statement(statement, prePrimary, tableDefinition);
+ try (ResultSet resultSet = statement.executeQuery()) {
+ if (resultSet.next()) {
+ RdbColumnDefinition columnDefinition = tableDefinition.getColumnDefinitions().get(curPrimaryKeyCol);
+ if (columnDefinition.getJdbcType() == JDBCType.TIMESTAMP) {
+ return resultSet.getString("max_primary_key");
+ } else {
+ return resultSet.getObject("max_primary_key");
+ }
+ }
+ }
+ }
return null;
}
+
@Override
protected void shutdown() throws Exception {
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java
new file mode 100644
index 0000000000..b4b30e7a24
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/position/TableFullPosition.java
@@ -0,0 +1,13 @@
+package org.apache.eventmesh.connector.canal.source.position;
+
+import lombok.Data;
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+@Data
+public class TableFullPosition {
+ private Map curPrimaryKeyCols = new LinkedHashMap<>();
+ private Map minPrimaryKeyCols = new LinkedHashMap<>();
+ private Map maxPrimaryKeyCols = new LinkedHashMap<>();
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
index c7673880e7..6cacc56122 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/table/RdbTableMgr.java
@@ -1,57 +1,160 @@
package org.apache.eventmesh.connector.canal.source.table;
-import lombok.AllArgsConstructor;
+import com.alibaba.druid.pool.DruidDataSource;
+import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;
import org.apache.eventmesh.common.AbstractComponent;
+import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbDBDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbTableDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.SourceConnectorConfig;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef;
+import org.apache.eventmesh.connector.canal.DatabaseConnection;
+import org.apache.eventmesh.connector.canal.SqlUtils;
+import javax.sql.DataSource;
+import java.sql.JDBCType;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
/**
* Description:
*/
@Slf4j
-@AllArgsConstructor
public class RdbTableMgr extends AbstractComponent {
private SourceConnectorConfig config;
- private final Map tables = new HashMap<>();
+ private final Map tables = new HashMap<>();
- public static String generate(String ...params) {
- return String.join("@", params);
- }
-
- private RdbTableMgr(){
- }
-
- private static class RdbTableMgrHolder {
- private static final RdbTableMgr INSTANCE = new RdbTableMgr();
- }
-
- public static RdbTableMgr getInstance() {
- return RdbTableMgrHolder.INSTANCE;
+ public RdbTableMgr(SourceConnectorConfig config) {
+ this.config = config;
}
- public void init(SourceConnectorConfig config) {
- this.config = config;
+ public RdbTableDefinition getTable(String schema, String tableName) {
+ return getTable(new RdbSimpleTable(schema, tableName));
}
- public RdbTableDefinition getTable(String dbName, String tableName) {
- return tables.get(generate(dbName, tableName));
+ public RdbTableDefinition getTable(RdbSimpleTable table) {
+ return tables.get(table);
}
@Override
protected void startup() throws Exception {
if (config != null && config.getDatabases() != null) {
for (RdbDBDefinition db : config.getDatabases()) {
- for (RdbTableDefinition table : db.getTables()) {
- tables.put(generate(db.getSchema(), table.getTableName()), table);
+ if (db.getTables() == null) {
+ log.warn("init db [{}] position, but it's tables are null", db.getSchemaName());
+ continue;
}
+ try (DruidDataSource dataSource = DatabaseConnection.createDruidDataSource(null,
+ config.getUserName(), config.getPassWord())) {
+ List tableNames =
+ db.getTables().stream().map(RdbTableDefinition::getTableName).collect(Collectors.toList());
+ Map> primaryKeys = queryTablePrimaryKey(dataSource, tableNames);
+ Map> columns = queryColumns(dataSource, tableNames);
+ for (RdbTableDefinition table : db.getTables()) {
+ MySQLTableDef mysqlTable = new MySQLTableDef();
+ mysqlTable.setSchemaName(db.getSchemaName());
+ mysqlTable.setTableName(table.getTableName());
+ if (primaryKeys == null || primaryKeys.isEmpty() || primaryKeys.get(table.getTableName()) == null) {
+ log.warn("init db [{}] table [{}] info, and primary keys are empty", db.getSchemaName(),
+ table.getTableName());
+ } else {
+ mysqlTable.setPrimaryKeys(new HashSet<>(primaryKeys.get(table.getTableName())));
+ }
+ if (columns == null || columns.isEmpty() || columns.get(table.getTableName()) == null) {
+ log.warn("init db [{}] table [{}] info, and columns are empty", db.getSchemaName(),
+ table.getTableName());
+ } else {
+ LinkedHashMap cols = new LinkedHashMap<>();
+ columns.get(table.getTableName()).forEach(x -> cols.put(x.getName(), x));
+ mysqlTable.setColumnDefinitions(cols);
+ }
+
+ tables.put(new RdbSimpleTable(db.getSchemaName(), table.getTableName()), mysqlTable);
+ }
+ } catch (Exception e) {
+ log.error("init db [{}] tables info fail", db.getSchemaName(), e);
+ }
+
+ }
+ }
+ }
+
+ private Map> queryTablePrimaryKey(DruidDataSource dataSource, List tables) throws SQLException {
+ Map> primaryKeys = new LinkedHashMap<>();
+ String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size());
+ String sql = "select L.TABLE_NAME,L.COLUMN_NAME,R.CONSTRAINT_TYPE from " +
+ "INFORMATION_SCHEMA.KEY_COLUMN_USAGE L left join INFORMATION_SCHEMA.TABLE_CONSTRAINTS R on C" +
+ ".TABLE_SCHEMA = R.TABLE_SCHEMA and L.TABLE_NAME = R.TABLE_NAME and L.CONSTRAINT_CATALOG = R" +
+ ".CONSTRAINT_CATALOG and L.CONSTRAINT_SCHEMA = R.CONSTRAINT_SCHEMA and L.CONSTRAINT_NAME = R" +
+ ".CONSTRAINT_NAME where L.TABLE_SCHEMA = ? and L.TABLE_NAME in " + prepareTables + " and R" +
+ ".CONSTRAINT_TYPE IN ('PRIMARY KEY') order by L.ORDINAL_POSITION asc";
+ try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) {
+ SqlUtils.setInClauseParameters(statement, tables);
+ ResultSet resultSet = statement.executeQuery();
+ if (resultSet == null) {
+ return null;
+ }
+ while (resultSet.next()) {
+ String tableName = resultSet.getString("TABLE_NAME");
+ String colName = resultSet.getString("COLUMN_NAME");
+ primaryKeys.compute(tableName, (k, v) -> {
+ if (v == null) {
+ v = new LinkedList<>();
+ }
+ v.add(colName);
+ return v;
+ });
+ }
+ resultSet.close();
+ }
+ return primaryKeys;
+ }
+
+ private Map> queryColumns(DataSource dataSource, List tables) throws SQLException {
+ String prepareTables = SqlUtils.genPrepareSqlOfInClause(tables.size());
+ String sql = "select TABLE_SCHEMA,TABLE_NAME,COLUMN_NAME,IS_NULLABLE,DATA_TYPE,CHARACTER_MAXIMUM_LENGTH," +
+ "CHARACTER_OCTET_LENGTH,NUMERIC_SCALE,NUMERIC_PRECISION,DATETIME_PRECISION,CHARACTER_SET_NAME," +
+ "COLLATION_NAME,COLUMN_TYPE,COLUMN_DEFAULT,COLUMN_COMMENT,ORDINAL_POSITION,EXTRA from " +
+ "INFORMATION_SCHEMA.COLUMNS where TABLE_SCHEMA = ? and TABLE_NAME in " + prepareTables + " order by " +
+ "ORDINAL_POSITION asc";
+ Map> cols = new LinkedHashMap<>();
+ try (PreparedStatement statement = dataSource.getConnection().prepareStatement(sql)) {
+ SqlUtils.setInClauseParameters(statement, tables);
+ ResultSet resultSet = statement.executeQuery();
+ if (resultSet == null) {
+ return null;
+ }
+ while (resultSet.next()) {
+ String tableName = resultSet.getString("TABLE_NAME");
+ String colName = resultSet.getString("COLUMN_NAME");
+ String dataType = resultSet.getString("DATA_TYPE");
+ JDBCType jdbcType = SqlUtils.toJDBCType(dataType);
+ MysqlType type = MysqlType.getByName(dataType);
+ MySQLColumnDef col = new MySQLColumnDef();
+ col.setJdbcType(jdbcType);
+ col.setType(type);
+ col.setName(colName);
+ cols.compute(tableName, (k, v) -> {
+ if (v == null) {
+ v = new LinkedList<>();
+ }
+ v.add(col);
+ return v;
+ });
}
+ resultSet.close();
}
+ return cols;
}
@Override
From f29dd7be70b13d22b0ce6709272095ccfd499062 Mon Sep 17 00:00:00 2001
From: sodaRyCN <757083350@qq.com>
Date: Thu, 11 Jul 2024 16:49:14 +0800
Subject: [PATCH 06/15] close to finish full read and begin full write
---
.../connector/rdb/canal/CanalMySQLType.java | 81 ++++----
.../rdb/canal/CanalSinkFullConfig.java | 12 ++
.../rdb/canal/CanalSourceFullConfig.java | 1 +
.../rdb/canal/JobRdbFullPosition.java | 5 +-
.../offset/canal/CanalFullRecordOffset.java | 18 ++
.../canal/CanalFullRecordPartition.java | 37 ++++
.../connector/canal/DatabaseConnection.java | 4 +-
.../eventmesh/connector/canal/SqlUtils.java | 16 +-
.../connector/CanalSinkFullConnector.java | 59 ++++++
.../canal/source/CanalFullProducer.java | 188 +++++++++++++-----
.../connector/canal/source/EntryParser.java | 32 +--
.../connector/CanalSourceConnector.java | 5 +-
.../connector/CanalSourceFullConnector.java | 76 +++++--
.../source/position/CanalFullPositionMgr.java | 89 ++++++---
.../canal/source/table/RdbTableMgr.java | 78 ++++----
15 files changed, 501 insertions(+), 200 deletions(-)
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java
create mode 100644 eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java
create mode 100644 eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
index 969e631e93..257822810e 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalMySQLType.java
@@ -2,7 +2,8 @@
import com.mysql.cj.MysqlType;
-import java.sql.JDBCType;
+import java.util.HashMap;
+import java.util.Map;
public enum CanalMySQLType {
BIT("BIT"),
@@ -12,37 +13,38 @@ public enum CanalMySQLType {
INT("INT"),
BIGINT("BIGINT"),
DECIMAL("DECIMAL"),
- FLOAT("FLOAT", JDBCType.REAL),
- DOUBLE("DOUBLE", JDBCType.DOUBLE),
- DATE("DATE", JDBCType.DATE),
- DATETIME("DATETIME", JDBCType.TIMESTAMP),
- TIMESTAMP("TIMESTAMP", JDBCType.TIMESTAMP),
- TIME("TIME", JDBCType.TIME),
- YEAR("YEAR", JDBCType.DATE),
- CHAR("CHAR", JDBCType.CHAR),
- VARCHAR("VARCHAR", JDBCType.VARCHAR),
- BINARY("BINARY", JDBCType.BINARY),
- VARBINARY("VARBINARY", JDBCType.VARBINARY),
- TINYBLOB("TINYBLOB", JDBCType.VARBINARY),
- BLOB("BLOB", JDBCType.LONGVARBINARY),
- MEDIUMBLOB("MEDIUMBLOB", JDBCType.LONGVARBINARY),
- LONGBLOB("LONGBLOB", JDBCType.LONGVARBINARY),
- TINYTEXT("TINYTEXT", JDBCType.VARCHAR),
- TEXT("TEXT", JDBCType.LONGVARCHAR),
- MEDIUMTEXT("MEDIUMTEXT", JDBCType.LONGVARCHAR),
- LONGTEXT("LONGTEXT", JDBCType.LONGVARCHAR),
- ENUM("ENUM", JDBCType.CHAR),
- SET("SET", JDBCType.CHAR),
- JSON("JSON", JDBCType.LONGVARCHAR),
- GEOMETRY("GEOMETRY", JDBCType.BINARY),
- POINT("POINT", JDBCType.BINARY),
- LINESTRING("LINESTRING", JDBCType.BINARY),
- POLYGON("POLYGON", JDBCType.BINARY),
- MULTIPOINT("MULTIPOINT", JDBCType.BINARY),
- GEOMETRY_COLLECTION("GEOMETRYCOLLECTION", JDBCType.BINARY),
- GEOM_COLLECTION("GEOMCOLLECTION", JDBCType.BINARY),
- MULTILINESTRING("MULTILINESTRING", JDBCType.BINARY),
- MULTIPOLYGON("MULTIPOLYGON", JDBCType.BINARY);
+ FLOAT("FLOAT"),
+ DOUBLE("DOUBLE"),
+ DATE("DATE"),
+ DATETIME("DATETIME"),
+ TIMESTAMP("TIMESTAMP"),
+ TIME("TIME"),
+ YEAR("YEAR"),
+ CHAR("CHAR"),
+ VARCHAR("VARCHAR"),
+ BINARY("BINARY"),
+ VARBINARY("VARBINARY"),
+ TINYBLOB("TINYBLOB"),
+ BLOB("BLOB"),
+ MEDIUMBLOB("MEDIUMBLOB"),
+ LONGBLOB("LONGBLOB"),
+ TINYTEXT("TINYTEXT"),
+ TEXT("TEXT"),
+ MEDIUMTEXT("MEDIUMTEXT"),
+ LONGTEXT("LONGTEXT"),
+ ENUM("ENUM"),
+ SET("SET"),
+ JSON("JSON"),
+ GEOMETRY("GEOMETRY"),
+ // MysqlType not include the following type
+ POINT("POINT"),
+ LINESTRING("LINESTRING"),
+ POLYGON("POLYGON"),
+ MULTIPOINT("MULTIPOINT"),
+ GEOMETRY_COLLECTION("GEOMETRYCOLLECTION"),
+ GEOM_COLLECTION("GEOMCOLLECTION"),
+ MULTILINESTRING("MULTILINESTRING"),
+ MULTIPOLYGON("MULTIPOLYGON");
private final String codeKey;
private final MysqlType mysqlType;
@@ -51,13 +53,18 @@ public enum CanalMySQLType {
this.codeKey = codeKey;
this.mysqlType = MysqlType.getByName(codeKey);
}
-
- public static CanalMySQLType valueOfCode(String code) {
+ private static final Map TYPES = new HashMap<>();
+ static {
CanalMySQLType[] values = values();
for (CanalMySQLType tableType : values) {
- if (tableType.codeKey.equalsIgnoreCase(code)) {
- return tableType;
- }
+ TYPES.put(tableType.codeKey, tableType);
+ }
+ }
+
+ public static CanalMySQLType valueOfCode(String code) {
+ CanalMySQLType type = TYPES.get(code.toUpperCase());
+ if (type != null) {
+ return type;
}
switch (MysqlType.getByName(code)) {
case BOOLEAN:
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
new file mode 100644
index 0000000000..5a6deb77cd
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSinkFullConfig.java
@@ -0,0 +1,12 @@
+package org.apache.eventmesh.common.config.connector.rdb.canal;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import org.apache.eventmesh.common.config.connector.SinkConfig;
+
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+public class CanalSinkFullConfig extends SinkConfig {
+
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
index fcfa6a0e92..6508e49fdb 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/CanalSourceFullConfig.java
@@ -13,4 +13,5 @@ public class CanalSourceFullConfig extends SourceConfig {
private SourceConnectorConfig connectorConfig;
private List startPosition;
private int parallel;
+ private int flushSize;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
index b4aeffd721..5f0f5326ed 100644
--- a/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/config/connector/rdb/canal/JobRdbFullPosition.java
@@ -1,15 +1,18 @@
package org.apache.eventmesh.common.config.connector.rdb.canal;
import lombok.Data;
+import lombok.ToString;
import java.math.BigDecimal;
@Data
+@ToString
public class JobRdbFullPosition {
private String jobId;
private String schema;
private String tableName;
- private String curPrimaryKey;
+ private String primaryKeyRecords;
private long maxCount;
private boolean finished;
+ private BigDecimal percent;
}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java
new file mode 100644
index 0000000000..f1f3a7c132
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordOffset.java
@@ -0,0 +1,18 @@
+package org.apache.eventmesh.common.remote.offset.canal;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition;
+import org.apache.eventmesh.common.remote.offset.RecordOffset;
+
+@Data
+@EqualsAndHashCode(callSuper = true)
+@ToString
+public class CanalFullRecordOffset extends RecordOffset {
+ private JobRdbFullPosition position;
+ @Override
+ public Class extends RecordOffset> getRecordOffsetClass() {
+ return CanalFullRecordOffset.class;
+ }
+}
diff --git a/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java
new file mode 100644
index 0000000000..a325444be5
--- /dev/null
+++ b/eventmesh-common/src/main/java/org/apache/eventmesh/common/remote/offset/canal/CanalFullRecordPartition.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eventmesh.common.remote.offset.canal;
+
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
+import org.apache.eventmesh.common.remote.offset.RecordPartition;
+
+
+@Data
+@ToString
+@EqualsAndHashCode(callSuper = true)
+public class CanalFullRecordPartition extends RecordPartition {
+ private String schema;
+ private String table;
+
+ @Override
+ public Class extends RecordPartition> getRecordPartitionClass() {
+ return CanalFullRecordPartition.class;
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
index 00f9693be3..dc576186ea 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/DatabaseConnection.java
@@ -18,14 +18,13 @@
package org.apache.eventmesh.connector.canal;
+import com.alibaba.druid.pool.DruidDataSource;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkConfig;
import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSourceConfig;
import java.sql.Connection;
import java.sql.SQLException;
-import com.alibaba.druid.pool.DruidDataSource;
-
public class DatabaseConnection {
public static DruidDataSource sourceDataSource;
@@ -42,6 +41,7 @@ public static DruidDataSource createDruidDataSource(String url, String UserName,
dataSource.setUsername(UserName);
dataSource.setPassword(passWord);
dataSource.setInitialSize(5);
+ dataSource.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSource.setMinIdle(5);
dataSource.setMaxActive(20);
dataSource.setMaxWait(60000);
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
index 6d4ca03697..ed72a9eb4d 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/SqlUtils.java
@@ -17,12 +17,11 @@
package org.apache.eventmesh.connector.canal;
-import static org.apache.eventmesh.connector.canal.ByteArrayConverter.SQL_BYTES;
-import static org.apache.eventmesh.connector.canal.SqlTimestampConverter.SQL_TIMESTAMP;
-
import com.mysql.cj.MysqlType;
import org.apache.commons.beanutils.ConvertUtilsBean;
import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
@@ -42,8 +41,8 @@
import java.util.List;
import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.eventmesh.connector.canal.ByteArrayConverter.SQL_BYTES;
+import static org.apache.eventmesh.connector.canal.SqlTimestampConverter.SQL_TIMESTAMP;
public class SqlUtils {
@@ -127,8 +126,13 @@ public static String genPrepareSqlOfInClause(int size) {
}
public static void setInClauseParameters(PreparedStatement preparedStatement, List params) throws SQLException {
+ setInClauseParameters(preparedStatement, 0, params);
+ }
+
+ public static void setInClauseParameters(PreparedStatement preparedStatement, int paramIndexStart,
+ List params) throws SQLException {
for (int i = 0; i < params.size(); i++) {
- preparedStatement.setString(i + 1, params.get(i));
+ preparedStatement.setString(paramIndexStart + i, params.get(i));
}
}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
new file mode 100644
index 0000000000..a43f6a4413
--- /dev/null
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkFullConnector.java
@@ -0,0 +1,59 @@
+package org.apache.eventmesh.connector.canal.sink.connector;
+
+import org.apache.eventmesh.common.config.connector.Config;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalSinkFullConfig;
+import org.apache.eventmesh.openconnect.api.ConnectorCreateService;
+import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
+import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
+import org.apache.eventmesh.openconnect.api.sink.Sink;
+import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+
+import java.util.List;
+
+public class CanalSinkFullConnector implements Sink, ConnectorCreateService {
+ private CanalSinkFullConfig config;
+ @Override
+ public void start() throws Exception {
+
+ }
+
+ @Override
+ public void stop() throws Exception {
+
+ }
+
+ @Override
+ public Sink create() {
+ return new CanalSinkFullConnector();
+ }
+
+ @Override
+ public Class extends Config> configClass() {
+ return CanalSinkFullConfig.class;
+ }
+
+ @Override
+ public void init(Config config) throws Exception {
+ this.config = (CanalSinkFullConfig) config;
+ }
+
+ @Override
+ public void init(ConnectorContext connectorContext) throws Exception {
+ this.config = (CanalSinkFullConfig)((SinkConnectorContext)connectorContext).getSinkConfig();
+ }
+
+ @Override
+ public void commit(ConnectRecord record) {
+
+ }
+
+ @Override
+ public String name() {
+ return null;
+ }
+
+ @Override
+ public void put(List sinkRecords) {
+
+ }
+}
diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
index 9c3a6d07fb..979346512c 100644
--- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
+++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/CanalFullProducer.java
@@ -1,15 +1,20 @@
package org.apache.eventmesh.connector.canal.source;
-import com.mysql.cj.MysqlType;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
+import org.apache.eventmesh.common.config.connector.rdb.canal.CanalMySQLType;
+import org.apache.eventmesh.common.config.connector.rdb.canal.JobRdbFullPosition;
import org.apache.eventmesh.common.config.connector.rdb.canal.RdbColumnDefinition;
import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.Constants;
+import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLColumnDef;
import org.apache.eventmesh.common.config.connector.rdb.canal.mysql.MySQLTableDef;
import org.apache.eventmesh.common.exception.EventMeshException;
+import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordOffset;
+import org.apache.eventmesh.common.remote.offset.canal.CanalFullRecordPartition;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.connector.canal.source.position.TableFullPosition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
+import org.locationtech.jts.geom.GeometryFactory;
import org.locationtech.jts.io.WKBReader;
import javax.sql.DataSource;
@@ -26,14 +31,16 @@
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
-
@Slf4j
public class CanalFullProducer {
private BlockingQueue> queue;
@@ -45,7 +52,7 @@ public class CanalFullProducer {
private final AtomicReference choosePrimaryKey = new AtomicReference<>(null);
private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter DATE_STAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
- private static final WKBReader WKB_READER = new wk
+ private static final WKBReader WKB_READER = new WKBReader(new GeometryFactory());
public CanalFullProducer(BlockingQueue> queue, DataSource dataSource,
MySQLTableDef tableDefinition, TableFullPosition position, int flushSize) {
@@ -56,42 +63,108 @@ public CanalFullProducer(BlockingQueue> queue, DataSource da
this.flushSize = flushSize;
}
+ public void choosePrimaryKey() {
+ for (RdbColumnDefinition col : tableDefinition.getColumnDefinitions().values()) {
+ if (position.getCurPrimaryKeyCols().get(col.getName()) != null) {
+ choosePrimaryKey.set(col.getName());
+ return;
+ }
+ }
+ throw new EventMeshException("illegal: can't pick any primary key");
+ }
+
+
public void start(AtomicBoolean flag) {
+ choosePrimaryKey();
boolean isNextPage = false;
- ArrayList records = new ArrayList<>();
+ List