Skip to content

Commit 9ac3dd8

Browse files
authored
[ISSUE apache#4458] Support MySQL Sink Connector feature (apache#4771)
* [ISSUE apache#4458] Support mysql Sink Connector feature * remove pg jdbc import * update dependencies
1 parent 2881b83 commit 9ac3dd8

File tree

133 files changed

+5805
-812
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

133 files changed

+5805
-812
lines changed

eventmesh-connectors/eventmesh-connector-jdbc/build.gradle

+3
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,14 @@ packageSources {
3636
dependencies {
3737
antlr("org.antlr:antlr4:4.13.0")
3838
implementation 'org.antlr:antlr4-runtime:4.13.0'
39+
implementation 'com.alibaba:druid:1.2.20'
40+
implementation 'org.hibernate:hibernate-core:5.6.15.Final'
3941
implementation project(":eventmesh-common")
4042
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
4143
implementation project(":eventmesh-spi")
4244
implementation 'com.zendesk:mysql-binlog-connector-java:0.28.0'
4345
implementation 'mysql:mysql-connector-java:8.0.32'
46+
4447
compileOnly 'org.projectlombok:lombok'
4548
annotationProcessor 'org.projectlombok:lombok'
4649

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/CatalogChanges.java

+7-5
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,13 @@
2525
import java.util.List;
2626

2727
import lombok.Data;
28+
import lombok.NoArgsConstructor;
2829

29-
@Data
3030
/**
3131
* Represents changes in a catalog, such as schema or table modifications.
3232
*/
33+
@Data
34+
@NoArgsConstructor
3335
public class CatalogChanges {
3436

3537
/**
@@ -52,10 +54,10 @@ public class CatalogChanges {
5254
// The table associated with the changes
5355
private Table table;
5456
// The list of columns affected by the changes
55-
private List<? extends Column> columns;
57+
private List<? extends Column<?>> columns;
5658

5759
private CatalogChanges(String type, String operationType, CatalogSchema catalog, Table table,
58-
List<? extends Column> columns) {
60+
List<? extends Column<?>> columns) {
5961
this.type = type;
6062
this.operationType = operationType;
6163
this.catalog = catalog;
@@ -81,7 +83,7 @@ public static class Builder {
8183
private String operationType;
8284
private CatalogSchema catalog;
8385
private Table table;
84-
private List<? extends Column> columns;
86+
private List<? extends Column<?>> columns;
8587

8688
/**
8789
* Sets the operation type for the change.
@@ -123,7 +125,7 @@ public Builder table(Table table) {
123125
* @param columns The list of Column instances.
124126
* @return The Builder instance.
125127
*/
126-
public Builder columns(List<? extends Column> columns) {
128+
public Builder columns(List<? extends Column<?>> columns) {
127129
this.columns = columns;
128130
return this;
129131
}

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/DataChanges.java

+55
Original file line numberDiff line numberDiff line change
@@ -18,52 +18,107 @@
1818
package org.apache.eventmesh.connector.jdbc;
1919

2020
import lombok.Data;
21+
import lombok.NoArgsConstructor;
2122

2223
@Data
24+
@NoArgsConstructor
25+
/**
26+
* DataChanges class representing changes in data associated with a JDBC connection.
27+
*/
2328
public class DataChanges {
2429

2530
private Object after;
2631

2732
private Object before;
2833

34+
/**
35+
* The type of change.
36+
* <pr>
37+
* {@link org.apache.eventmesh.connector.jdbc.event.DataChangeEventType}
38+
* </pr>
39+
*/
2940
private String type;
3041

42+
/**
43+
* Constructs a DataChanges instance with 'after' and 'before' data.
44+
*
45+
* @param after The data after the change.
46+
* @param before The data before the change.
47+
*/
3148
public DataChanges(Object after, Object before) {
3249
this.after = after;
3350
this.before = before;
3451
}
3552

53+
/**
54+
* Constructs a DataChanges instance with 'after', 'before' data, and a change type.
55+
*
56+
* @param after The data after the change.
57+
* @param before The data before the change.
58+
* @param type The type of change.
59+
*/
3660
public DataChanges(Object after, Object before, String type) {
3761
this.after = after;
3862
this.before = before;
3963
this.type = type;
4064
}
4165

66+
/**
67+
* Creates a new DataChanges builder.
68+
*
69+
* @return The DataChanges builder.
70+
*/
4271
public static Builder newBuilder() {
4372
return new Builder();
4473
}
4574

75+
/**
76+
* Builder class for constructing DataChanges instances.
77+
*/
4678
public static class Builder {
4779

4880
private String type;
4981
private Object after;
5082
private Object before;
5183

84+
/**
85+
* Sets the change type in the builder.
86+
*
87+
* @param type The type of change.
88+
* @return The DataChanges builder.
89+
*/
5290
public Builder withType(String type) {
5391
this.type = type;
5492
return this;
5593
}
5694

95+
/**
96+
* Sets the 'after' data in the builder.
97+
*
98+
* @param after The data after the change.
99+
* @return The DataChanges builder.
100+
*/
57101
public Builder withAfter(Object after) {
58102
this.after = after;
59103
return this;
60104
}
61105

106+
/**
107+
* Sets the 'before' data in the builder.
108+
*
109+
* @param before The data before the change.
110+
* @return The DataChanges builder.
111+
*/
62112
public Builder withBefore(Object before) {
63113
this.before = before;
64114
return this;
65115
}
66116

117+
/**
118+
* Builds the DataChanges instance.
119+
*
120+
* @return The constructed DataChanges.
121+
*/
67122
public DataChanges build() {
68123
return new DataChanges(after, before, type);
69124
}

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/Field.java

+8-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.eventmesh.connector.jdbc;
1919

20+
import org.apache.eventmesh.connector.jdbc.table.catalog.Column;
21+
2022
import java.util.List;
2123

2224
import lombok.AllArgsConstructor;
@@ -28,25 +30,25 @@
2830
@AllArgsConstructor
2931
public class Field {
3032

31-
private String type;
32-
3333
private boolean required;
3434

3535
private String field;
3636

3737
private String name;
3838

39+
private Column<?> column;
40+
3941
private List<Field> fields;
4042

41-
public Field(String type, boolean required, String field, String name) {
42-
this.type = type;
43+
public Field(Column<?> column, boolean required, String field, String name) {
44+
this.column = column;
4345
this.required = required;
4446
this.field = field;
4547
this.name = name;
4648
}
4749

48-
public Field withType(String type) {
49-
this.type = type;
50+
public Field withColumn(Column<?> column) {
51+
this.column = column;
5052
return this;
5153
}
5254

eventmesh-connectors/eventmesh-connector-jdbc/src/main/java/org/apache/eventmesh/connector/jdbc/JdbcConnectData.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,26 @@
1717

1818
package org.apache.eventmesh.connector.jdbc;
1919

20+
/**
21+
* Represents data associated with a JDBC connector.
22+
*/
2023
public final class JdbcConnectData {
2124

25+
/**
26+
* Constant representing data changes in the JDBC connector.
27+
*/
2228
public static final byte DATA_CHANGES = 1;
2329

30+
/**
31+
* Constant representing schema changes in the JDBC connector.
32+
*/
2433
public static final byte SCHEMA_CHANGES = 1 << 1;
2534

2635
private Payload payload = new Payload();
2736

2837
private Schema schema;
2938

30-
private byte type;
39+
private byte type = 0;
3140

3241
public JdbcConnectData() {
3342
}
@@ -67,4 +76,12 @@ public void markDataChanges() {
6776
public void markSchemaChanges() {
6877
this.type |= SCHEMA_CHANGES;
6978
}
79+
80+
public boolean isDataChanges() {
81+
return (this.type & DATA_CHANGES) != 0;
82+
}
83+
84+
public boolean isSchemaChanges() {
85+
return (this.type & SCHEMA_CHANGES) != 0;
86+
}
7087
}

0 commit comments

Comments
 (0)