Skip to content

[ISSUE #4458] Support MySQL Sink Connector feature #4771

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eventmesh-connectors/eventmesh-connector-jdbc/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,14 @@ packageSources {
dependencies {
antlr("org.antlr:antlr4:4.13.0")
implementation 'org.antlr:antlr4-runtime:4.13.0'
implementation 'com.alibaba:druid:1.2.20'
implementation 'org.hibernate:hibernate-core:5.6.15.Final'
implementation project(":eventmesh-common")
implementation project(":eventmesh-openconnect:eventmesh-openconnect-java")
implementation project(":eventmesh-spi")
implementation 'com.zendesk:mysql-binlog-connector-java:0.28.0'
implementation 'mysql:mysql-connector-java:8.0.32'

compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
import java.util.List;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
/**
* Represents changes in a catalog, such as schema or table modifications.
*/
@Data
@NoArgsConstructor
public class CatalogChanges {

/**
Expand All @@ -52,10 +54,10 @@ public class CatalogChanges {
// The table associated with the changes
private Table table;
// The list of columns affected by the changes
private List<? extends Column> columns;
private List<? extends Column<?>> columns;

private CatalogChanges(String type, String operationType, CatalogSchema catalog, Table table,
List<? extends Column> columns) {
List<? extends Column<?>> columns) {
this.type = type;
this.operationType = operationType;
this.catalog = catalog;
Expand All @@ -81,7 +83,7 @@ public static class Builder {
private String operationType;
private CatalogSchema catalog;
private Table table;
private List<? extends Column> columns;
private List<? extends Column<?>> columns;

/**
* Sets the operation type for the change.
Expand Down Expand Up @@ -123,7 +125,7 @@ public Builder table(Table table) {
* @param columns The list of Column instances.
* @return The Builder instance.
*/
public Builder columns(List<? extends Column> columns) {
public Builder columns(List<? extends Column<?>> columns) {
this.columns = columns;
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,107 @@
package org.apache.eventmesh.connector.jdbc;

import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
/**
* DataChanges class representing changes in data associated with a JDBC connection.
*/
public class DataChanges {

private Object after;

private Object before;

/**
* The type of change.
* <pr>
* {@link org.apache.eventmesh.connector.jdbc.event.DataChangeEventType}
* </pr>
*/
private String type;

/**
* Constructs a DataChanges instance with 'after' and 'before' data.
*
* @param after The data after the change.
* @param before The data before the change.
*/
public DataChanges(Object after, Object before) {
this.after = after;
this.before = before;
}

/**
* Constructs a DataChanges instance with 'after', 'before' data, and a change type.
*
* @param after The data after the change.
* @param before The data before the change.
* @param type The type of change.
*/
public DataChanges(Object after, Object before, String type) {
this.after = after;
this.before = before;
this.type = type;
}

/**
* Creates a new DataChanges builder.
*
* @return The DataChanges builder.
*/
public static Builder newBuilder() {
return new Builder();
}

/**
* Builder class for constructing DataChanges instances.
*/
public static class Builder {

private String type;
private Object after;
private Object before;

/**
* Sets the change type in the builder.
*
* @param type The type of change.
* @return The DataChanges builder.
*/
public Builder withType(String type) {
this.type = type;
return this;
}

/**
* Sets the 'after' data in the builder.
*
* @param after The data after the change.
* @return The DataChanges builder.
*/
public Builder withAfter(Object after) {
this.after = after;
return this;
}

/**
* Sets the 'before' data in the builder.
*
* @param before The data before the change.
* @return The DataChanges builder.
*/
public Builder withBefore(Object before) {
this.before = before;
return this;
}

/**
* Builds the DataChanges instance.
*
* @return The constructed DataChanges.
*/
public DataChanges build() {
return new DataChanges(after, before, type);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.eventmesh.connector.jdbc;

import org.apache.eventmesh.connector.jdbc.table.catalog.Column;

import java.util.List;

import lombok.AllArgsConstructor;
Expand All @@ -28,25 +30,25 @@
@AllArgsConstructor
public class Field {

private String type;

private boolean required;

private String field;

private String name;

private Column<?> column;

private List<Field> fields;

public Field(String type, boolean required, String field, String name) {
this.type = type;
public Field(Column<?> column, boolean required, String field, String name) {
this.column = column;
this.required = required;
this.field = field;
this.name = name;
}

public Field withType(String type) {
this.type = type;
public Field withColumn(Column<?> column) {
this.column = column;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,26 @@

package org.apache.eventmesh.connector.jdbc;

/**
* Represents data associated with a JDBC connector.
*/
public final class JdbcConnectData {

/**
* Constant representing data changes in the JDBC connector.
*/
public static final byte DATA_CHANGES = 1;

/**
* Constant representing schema changes in the JDBC connector.
*/
public static final byte SCHEMA_CHANGES = 1 << 1;

private Payload payload = new Payload();

private Schema schema;

private byte type;
private byte type = 0;

public JdbcConnectData() {
}
Expand Down Expand Up @@ -67,4 +76,12 @@ public void markDataChanges() {
public void markSchemaChanges() {
this.type |= SCHEMA_CHANGES;
}

public boolean isDataChanges() {
return (this.type & DATA_CHANGES) != 0;
}

public boolean isSchemaChanges() {
return (this.type & SCHEMA_CHANGES) != 0;
}
}
Loading