Skip to content

Commit ba19416

Browse files
author
xubo.huster
committed
add cdc jdbc connector base
1 parent fdec860 commit ba19416

File tree

9 files changed

+526
-6
lines changed

9 files changed

+526
-6
lines changed

bitsail-base/src/main/java/com/bytedance/bitsail/base/connector/reader/v1/SourceReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323

2424
public interface SourceReader<T, SplitT extends SourceSplit> extends Serializable, AutoCloseable {
2525

26-
void start();
26+
void start() throws Exception;
2727

2828
void pollNext(SourcePipeline<T> pipeline) throws Exception;
2929

bitsail-common/src/main/java/com/bytedance/bitsail/common/option/ReaderOptions.java

+4
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ interface BaseReaderOptions {
8888
key(READER_PREFIX + "db_name")
8989
.noDefaultValue(String.class);
9090

91+
ConfigOption<String> CONNECTION_TIMEZONE =
92+
key(READER_PREFIX + "connection_timezone")
93+
.defaultValue("UTC");
94+
9195
ConfigOption<String> TABLE_NAME =
9296
key(READER_PREFIX + "table_name")
9397
.noDefaultValue(String.class);

bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public BinlogSourceReader(BitSailConfiguration jobConf, SourceReader.Context rea
5656
public abstract BinlogSplitReader<Row> getReader();
5757

5858
@Override
59-
public void start() {
59+
public void start() throws Exception{
6060
//start debezium streaming reader and send data to queue
6161
}
6262

bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSplitReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,12 @@
1818

1919
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
2020

21+
import java.io.IOException;
2122
import java.io.Serializable;
2223
import java.util.Map;
2324

2425
public interface BinlogSplitReader<T> extends Serializable {
25-
void readSplit(BinlogSplit split);
26+
void readSplit(BinlogSplit split) throws IOException, InterruptedException;
2627

2728
Map<String, String> getOffset();
2829

bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/pom.xml

+33-3
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,9 @@
33
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
44
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
55
<parent>
6-
<artifactId>bitsail-all</artifactId>
76
<groupId>com.bytedance.bitsail</groupId>
8-
<version>0.2.0-SNAPSHOT</version>
9-
<relativePath>../../../pom.xml</relativePath>
7+
<artifactId>connector-cdc</artifactId>
8+
<version>${revision}</version>
109
</parent>
1110
<modelVersion>4.0.0</modelVersion>
1211

@@ -15,6 +14,37 @@
1514
<properties>
1615
<maven.compiler.source>8</maven.compiler.source>
1716
<maven.compiler.target>8</maven.compiler.target>
17+
<debezium.version>1.6.4.Final</debezium.version>
1818
</properties>
19+
<dependencies>
20+
<dependency>
21+
<groupId>com.bytedance.bitsail</groupId>
22+
<artifactId>connector-cdc-base</artifactId>
23+
<version>${revision}</version>
24+
</dependency>
25+
<dependency>
26+
<groupId>io.debezium</groupId>
27+
<artifactId>debezium-embedded</artifactId>
28+
<version>${debezium.version}</version>
29+
<exclusions>
30+
<exclusion>
31+
<groupId>jakarta.activation</groupId>
32+
<artifactId>jakarta.activation-api</artifactId>
33+
</exclusion>
34+
<exclusion>
35+
<groupId>org.apache.kafka</groupId>
36+
<artifactId>kafka-clients</artifactId>
37+
</exclusion>
38+
</exclusions>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.debezium</groupId>
42+
<artifactId>debezium-core</artifactId>
43+
<version>${debezium.version}</version>
44+
<type>test-jar</type>
45+
<scope>test</scope>
46+
</dependency>
47+
48+
</dependencies>
1949

2050
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package com.bytedance.bitsail.connector.cdc.jdbc.source.config;
2+
3+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
4+
import com.bytedance.bitsail.connector.cdc.error.BinlogReaderErrorCode;
5+
import com.bytedance.bitsail.connector.cdc.model.ClusterInfo;
6+
import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo;
7+
import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions;
8+
9+
import io.debezium.config.Configuration;
10+
import io.debezium.relational.RelationalDatabaseConnectorConfig;
11+
import lombok.Builder;
12+
import lombok.Getter;
13+
import org.apache.commons.lang.StringUtils;
14+
15+
import java.time.ZoneId;
16+
import java.util.List;
17+
import java.util.Properties;
18+
19+
@Getter
20+
@Builder
21+
public abstract class AbstractJdbcDebeziumConfig {
22+
23+
private static final long serialVersionUID = 1L;
24+
25+
public static final String DEBEZIUM_PREFIX = "job.reader.debezium.";
26+
27+
private final String hostname;
28+
private final int port;
29+
private final String username;
30+
private final String password;
31+
32+
// debezium configuration
33+
private final Properties dbzProperties;
34+
private final Configuration dbzConfiguration;
35+
36+
public static AbstractJdbcDebeziumConfig fromBitSailConf(BitSailConfiguration jobConf) {
37+
List<ClusterInfo> clusterInfo = jobConf.getNecessaryOption(BinlogReaderOptions.CONNECTIONS, BinlogReaderErrorCode.REQUIRED_VALUE);
38+
//Only support one DB
39+
assert (clusterInfo.size() == 1);
40+
ConnectionInfo connectionInfo = clusterInfo.get(0).getMaster();
41+
assert (connectionInfo != null);
42+
Properties props = extractProps(jobConf);
43+
String username = jobConf.getNecessaryOption(BinlogReaderOptions.USER_NAME, BinlogReaderErrorCode.REQUIRED_VALUE);
44+
String password = jobConf.getNecessaryOption(BinlogReaderOptions.PASSWORD, BinlogReaderErrorCode.REQUIRED_VALUE);
45+
String timezone = jobConf.get(BinlogReaderOptions.CONNECTION_TIMEZONE);
46+
fillConnectionInfo(props, connectionInfo, username, password, timezone);
47+
48+
Configuration config = Configuration.from(props);
49+
return AbstractJdbcDebeziumConfig.builder()
50+
.hostname(connectionInfo.getHost())
51+
.port(connectionInfo.getPort())
52+
.username(username)
53+
.password(password)
54+
.dbzProperties(props)
55+
.dbzConfiguration(config)
56+
.build();
57+
}
58+
59+
public static Properties extractProps(BitSailConfiguration jobConf) {
60+
Properties props = new Properties();
61+
jobConf.getKeys().stream()
62+
.filter(s -> s.startsWith(DEBEZIUM_PREFIX))
63+
.map(s -> StringUtils.substringAfter(s, DEBEZIUM_PREFIX))
64+
.forEach(s -> props.setProperty(s, jobConf.getString(DEBEZIUM_PREFIX + s)));
65+
return props;
66+
}
67+
68+
public abstract RelationalDatabaseConnectorConfig getConnectorConfig();
69+
70+
public static void fillConnectionInfo(Properties props, ConnectionInfo connectionInfo, String username, String password, String timezone) {
71+
props.put("database.hostname", connectionInfo.getHost());
72+
props.put("database.port", String.valueOf(connectionInfo.getPort()));
73+
props.put("database.user", username);
74+
props.put("database.password", password);
75+
props.put("database.serverTimezone", ZoneId.of(timezone).toString());
76+
}
77+
78+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,170 @@
1+
package com.bytedance.bitsail.connector.cdc.jdbc.source.reader;
2+
3+
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
4+
import com.bytedance.bitsail.connector.cdc.jdbc.source.config.AbstractJdbcDebeziumConfig;
5+
import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.SplitChangeEventStreamingTaskContext;
6+
import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.SplitChangeEventStreamingTaskController;
7+
import com.bytedance.bitsail.connector.cdc.source.reader.BinlogSplitReader;
8+
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
9+
import com.bytedance.bitsail.common.row.Row;
10+
import io.debezium.connector.base.ChangeEventQueue;
11+
import io.debezium.connector.common.CdcSourceTaskContext;
12+
import io.debezium.pipeline.DataChangeEvent;
13+
import io.debezium.relational.RelationalDatabaseConnectorConfig;
14+
import org.apache.kafka.connect.source.SourceRecord;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.io.IOException;
19+
import java.util.ArrayList;
20+
import java.util.HashMap;
21+
import java.util.Iterator;
22+
import java.util.List;
23+
import java.util.Map;
24+
import java.util.concurrent.TimeUnit;
25+
26+
public abstract class AbstractJdbcChangeEventReader implements BinlogSplitReader<Row> {
27+
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcChangeEventReader.class);
28+
29+
private final AbstractJdbcDebeziumConfig jdbcDebeziumConfig;
30+
31+
private ChangeEventQueue<DataChangeEvent> queue;
32+
33+
private RelationalDatabaseConnectorConfig connectorConfig;
34+
35+
private List<SourceRecord> batch;
36+
37+
private Iterator<SourceRecord> recordIterator;
38+
39+
private CdcSourceTaskContext taskContext;
40+
41+
private Map<String, ?> offset;
42+
43+
private SplitChangeEventStreamingTaskController splitChangeEventStreamingTaskController;
44+
45+
private SplitChangeEventStreamingTaskContext splitChangeEventStreamingTaskContext;
46+
47+
private final int subtaskId;
48+
49+
public AbstractJdbcChangeEventReader(BitSailConfiguration jobConf, int subtaskId) {
50+
jdbcDebeziumConfig = getJdbcDebeziumConfig(jobConf);
51+
connectorConfig = jdbcDebeziumConfig.getConnectorConfig();
52+
this.subtaskId = subtaskId;
53+
this.offset = new HashMap<>();
54+
}
55+
56+
public AbstractJdbcDebeziumConfig getJdbcDebeziumConfig(BitSailConfiguration jobConf) {
57+
return AbstractJdbcDebeziumConfig.fromBitSailConf(jobConf);
58+
}
59+
60+
public abstract SplitChangeEventStreamingTaskContext getSplitReaderTaskContext();
61+
62+
public abstract void testConnectionAndValidBinlogConfiguration(RelationalDatabaseConnectorConfig connectorConfig) throws IOException;
63+
64+
public void inititialzeSplitReader(BinlogSplit split) {
65+
splitChangeEventStreamingTaskContext = getSplitReaderTaskContext();
66+
this.offset = new HashMap<>();
67+
this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
68+
.pollInterval(connectorConfig.getPollInterval())
69+
.maxBatchSize(connectorConfig.getMaxBatchSize())
70+
.maxQueueSize(connectorConfig.getMaxQueueSize())
71+
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
72+
.loggingContextSupplier(() -> taskContext.configureLoggingContext(splitChangeEventStreamingTaskContext.threadNamePrefix()))
73+
.buffering()
74+
.build();
75+
this.batch = new ArrayList<>();
76+
this.recordIterator = this.batch.iterator();
77+
splitChangeEventStreamingTaskContext.initializeSplitReaderTaskContext(connectorConfig, this.queue);
78+
splitChangeEventStreamingTaskController = new SplitChangeEventStreamingTaskController(splitChangeEventStreamingTaskContext, this.subtaskId);
79+
}
80+
81+
/**
82+
* Try to start streaming task to drain change event into target queue
83+
* @param split
84+
* @throws IOException
85+
* @throws InterruptedException
86+
*/
87+
@Override
88+
public void readSplit(BinlogSplit split) throws IOException, InterruptedException {
89+
inititialzeSplitReader(split);
90+
splitChangeEventStreamingTaskContext.testConnectionAndValidBinlogConfiguration();
91+
splitChangeEventStreamingTaskController.launchSplitReaderTask();
92+
}
93+
94+
/**
95+
* get the binlog offset being processed
96+
* @return
97+
*/
98+
@Override
99+
public Map<String, String> getOffset() {
100+
Map<String, String> offsetToStore = new HashMap<>();
101+
this.offset.forEach((k, v) -> offsetToStore.put(k, v.toString()));
102+
return offsetToStore;
103+
}
104+
105+
/**
106+
* close task and resources
107+
*/
108+
@Override
109+
public void close() {
110+
try {
111+
splitChangeEventStreamingTaskController.closeTask();
112+
} catch (Exception e) {
113+
LOG.error("Failed to close change event streaming task: {}", e.getMessage());
114+
}
115+
116+
try {
117+
splitChangeEventStreamingTaskContext.closeContextResources();
118+
} catch (Exception e) {
119+
LOG.error("Failed to close resources of streaming task context: {}", e.getMessage());
120+
}
121+
}
122+
123+
@Override
124+
public Row poll() {
125+
SourceRecord record = this.recordIterator.next();
126+
this.offset = record.sourceOffset();
127+
LOG.info("OFFSET:" + record.sourceOffset());
128+
LOG.info("poll one record {}", record.value());
129+
// TODO: Build BitSail row and return
130+
return null;
131+
}
132+
133+
/**
134+
* To judge whether current split has next record
135+
* @return
136+
* @throws Exception
137+
*/
138+
@Override
139+
public boolean hasNext() throws Exception {
140+
if (this.recordIterator.hasNext()) {
141+
return true;
142+
} else {
143+
return pollNextBatch();
144+
}
145+
}
146+
147+
@Override
148+
public boolean isCompleted() {
149+
return !splitChangeEventStreamingTaskController.isRunning();
150+
}
151+
152+
private boolean pollNextBatch() throws InterruptedException {
153+
if (splitChangeEventStreamingTaskController.isRunning()) {
154+
List<DataChangeEvent> dbzRecords = queue.poll();
155+
while (dbzRecords.isEmpty()) {
156+
//sleep 10s
157+
LOG.info("No record found, sleep for 5s in reader");
158+
TimeUnit.SECONDS.sleep(5);
159+
dbzRecords = queue.poll();
160+
}
161+
this.batch = new ArrayList<>();
162+
for (DataChangeEvent event : dbzRecords) {
163+
this.batch.add(event.getRecord());
164+
}
165+
this.recordIterator = this.batch.iterator();
166+
return true;
167+
}
168+
return false;
169+
}
170+
}

0 commit comments

Comments
 (0)