Skip to content

Commit d8c6ab4

Browse files
author
xubo.huster
committed
[Bitsail][PG-CDC] add postgres cdc connector
1 parent ba19416 commit d8c6ab4

File tree

26 files changed

+1573
-108
lines changed

26 files changed

+1573
-108
lines changed

bitsail-connectors/connector-cdc/connector-cdc-base/pom.xml

-1
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
</properties>
3434

3535
<dependencies>
36-
3736
</dependencies>
3837

3938
</project>

bitsail-connectors/connector-cdc/connector-cdc-base/src/main/java/com/bytedance/bitsail/connector/cdc/source/reader/BinlogSourceReader.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.Logger;
2828
import org.slf4j.LoggerFactory;
2929

30+
import java.io.IOException;
3031
import java.util.ArrayDeque;
3132
import java.util.List;
3233
import java.util.Queue;
@@ -115,7 +116,7 @@ public void close() {
115116
}
116117
}
117118

118-
private void submitSplit() {
119+
private void submitSplit() throws IOException, InterruptedException {
119120
if (!remainSplits.isEmpty()) {
120121
BinlogSplit curSplit = remainSplits.poll();
121122
LOG.info("submit split to binlog reader: {}, size of the remaining splits: {}", curSplit.toString(), remainSplits.size());

bitsail-connectors/connector-cdc/connector-cdc-jdbc-base/pom.xml

+15-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,19 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2+
<!--
3+
~ Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
4+
~
5+
~ Licensed under the Apache License, Version 2.0 (the "License");
6+
~ you may not use this file except in compliance with the License.
7+
~ You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
-->
217
<project xmlns="http://maven.apache.org/POM/4.0.0"
318
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
419
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
@@ -44,7 +59,6 @@
4459
<type>test-jar</type>
4560
<scope>test</scope>
4661
</dependency>
47-
4862
</dependencies>
4963

5064
</project>
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,29 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package com.bytedance.bitsail.connector.cdc.jdbc.source.config;
217

318
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
419
import com.bytedance.bitsail.connector.cdc.error.BinlogReaderErrorCode;
20+
import com.bytedance.bitsail.connector.cdc.jdbc.source.constant.DebeziumConstant;
521
import com.bytedance.bitsail.connector.cdc.model.ClusterInfo;
622
import com.bytedance.bitsail.connector.cdc.model.ConnectionInfo;
723
import com.bytedance.bitsail.connector.cdc.option.BinlogReaderOptions;
824

925
import io.debezium.config.Configuration;
1026
import io.debezium.relational.RelationalDatabaseConnectorConfig;
11-
import lombok.Builder;
1227
import lombok.Getter;
1328
import org.apache.commons.lang.StringUtils;
1429

@@ -17,7 +32,6 @@
1732
import java.util.Properties;
1833

1934
@Getter
20-
@Builder
2135
public abstract class AbstractJdbcDebeziumConfig {
2236

2337
private static final long serialVersionUID = 1L;
@@ -32,28 +46,26 @@ public abstract class AbstractJdbcDebeziumConfig {
3246
// debezium configuration
3347
private final Properties dbzProperties;
3448
private final Configuration dbzConfiguration;
49+
private final RelationalDatabaseConnectorConfig dbzJdbcConnectorConfig;
50+
private String dbName;
3551

36-
public static AbstractJdbcDebeziumConfig fromBitSailConf(BitSailConfiguration jobConf) {
52+
public AbstractJdbcDebeziumConfig(BitSailConfiguration jobConf) {
3753
List<ClusterInfo> clusterInfo = jobConf.getNecessaryOption(BinlogReaderOptions.CONNECTIONS, BinlogReaderErrorCode.REQUIRED_VALUE);
3854
//Only support one DB
3955
assert (clusterInfo.size() == 1);
4056
ConnectionInfo connectionInfo = clusterInfo.get(0).getMaster();
4157
assert (connectionInfo != null);
42-
Properties props = extractProps(jobConf);
43-
String username = jobConf.getNecessaryOption(BinlogReaderOptions.USER_NAME, BinlogReaderErrorCode.REQUIRED_VALUE);
44-
String password = jobConf.getNecessaryOption(BinlogReaderOptions.PASSWORD, BinlogReaderErrorCode.REQUIRED_VALUE);
58+
this.dbzProperties = extractProps(jobConf);
59+
this.hostname = connectionInfo.getHost();
60+
this.port = connectionInfo.getPort();
61+
this.username = jobConf.getNecessaryOption(BinlogReaderOptions.USER_NAME, BinlogReaderErrorCode.REQUIRED_VALUE);
62+
this.password = jobConf.getNecessaryOption(BinlogReaderOptions.PASSWORD, BinlogReaderErrorCode.REQUIRED_VALUE);
63+
this.dbName = jobConf.getNecessaryOption(BinlogReaderOptions.DB_NAME, BinlogReaderErrorCode.REQUIRED_VALUE);
4564
String timezone = jobConf.get(BinlogReaderOptions.CONNECTION_TIMEZONE);
46-
fillConnectionInfo(props, connectionInfo, username, password, timezone);
65+
fillConnectionInfo(this.dbzProperties, connectionInfo, timezone);
4766

48-
Configuration config = Configuration.from(props);
49-
return AbstractJdbcDebeziumConfig.builder()
50-
.hostname(connectionInfo.getHost())
51-
.port(connectionInfo.getPort())
52-
.username(username)
53-
.password(password)
54-
.dbzProperties(props)
55-
.dbzConfiguration(config)
56-
.build();
67+
this.dbzConfiguration = Configuration.from(this.dbzProperties);
68+
this.dbzJdbcConnectorConfig = getJdbcConnectorConfig(this.dbzConfiguration);
5769
}
5870

5971
public static Properties extractProps(BitSailConfiguration jobConf) {
@@ -65,14 +77,15 @@ public static Properties extractProps(BitSailConfiguration jobConf) {
6577
return props;
6678
}
6779

68-
public abstract RelationalDatabaseConnectorConfig getConnectorConfig();
80+
public abstract RelationalDatabaseConnectorConfig getJdbcConnectorConfig(Configuration config);
6981

70-
public static void fillConnectionInfo(Properties props, ConnectionInfo connectionInfo, String username, String password, String timezone) {
71-
props.put("database.hostname", connectionInfo.getHost());
72-
props.put("database.port", String.valueOf(connectionInfo.getPort()));
73-
props.put("database.user", username);
74-
props.put("database.password", password);
75-
props.put("database.serverTimezone", ZoneId.of(timezone).toString());
82+
public void fillConnectionInfo(Properties props, ConnectionInfo connectionInfo, String timezone) {
83+
props.put(DebeziumConstant.DATABASE_HOSTNAME, connectionInfo.getHost());
84+
props.put(DebeziumConstant.DATABASE_PORT, String.valueOf(connectionInfo.getPort()));
85+
props.put(DebeziumConstant.DATABASE_USER, username);
86+
props.put(DebeziumConstant.DATABASE_PASSWORD, password);
87+
props.put(DebeziumConstant.DATABASE_NAME, dbName);
88+
props.put(DebeziumConstant.DATABASE_TIMEZONE, ZoneId.of(timezone).toString());
7689
}
7790

7891
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.bytedance.bitsail.connector.cdc.jdbc.source.constant;
17+
18+
public class DebeziumConstant {
19+
public static final String DATABASE_HOSTNAME = "database.hostname";
20+
public static final String DATABASE_PORT = "database.port";
21+
public static final String DATABASE_USER = "database.user";
22+
public static final String DATABASE_PASSWORD = "database.password";
23+
public static final String DATABASE_NAME = "database.dbname";
24+
public static final String DATABASE_TIMEZONE = "database.serverTimezone";
25+
}
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,31 @@
1+
/*
2+
* Copyright 2022-2023 Bytedance Ltd. and/or its affiliates.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package com.bytedance.bitsail.connector.cdc.jdbc.source.reader;
217

318
import com.bytedance.bitsail.common.configuration.BitSailConfiguration;
419
import com.bytedance.bitsail.connector.cdc.jdbc.source.config.AbstractJdbcDebeziumConfig;
5-
import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.SplitChangeEventStreamingTaskContext;
20+
import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.AbstractSplitChangeEventStreamingTaskContext;
621
import com.bytedance.bitsail.connector.cdc.jdbc.source.streaming.SplitChangeEventStreamingTaskController;
722
import com.bytedance.bitsail.connector.cdc.source.reader.BinlogSplitReader;
823
import com.bytedance.bitsail.connector.cdc.source.split.BinlogSplit;
924
import com.bytedance.bitsail.common.row.Row;
1025
import io.debezium.connector.base.ChangeEventQueue;
11-
import io.debezium.connector.common.CdcSourceTaskContext;
1226
import io.debezium.pipeline.DataChangeEvent;
1327
import io.debezium.relational.RelationalDatabaseConnectorConfig;
28+
import lombok.Getter;
1429
import org.apache.kafka.connect.source.SourceRecord;
1530
import org.slf4j.Logger;
1631
import org.slf4j.LoggerFactory;
@@ -23,58 +38,54 @@
2338
import java.util.Map;
2439
import java.util.concurrent.TimeUnit;
2540

26-
public abstract class AbstractJdbcChangeEventReader implements BinlogSplitReader<Row> {
27-
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcChangeEventReader.class);
41+
@Getter
42+
public abstract class AbstractJdbcChangeEventSplitReader implements BinlogSplitReader<Row> {
43+
private static final Logger LOG = LoggerFactory.getLogger(AbstractJdbcChangeEventSplitReader.class);
2844

29-
private final AbstractJdbcDebeziumConfig jdbcDebeziumConfig;
45+
protected final AbstractJdbcDebeziumConfig jdbcDebeziumConfig;
3046

31-
private ChangeEventQueue<DataChangeEvent> queue;
47+
protected ChangeEventQueue<DataChangeEvent> queue;
3248

33-
private RelationalDatabaseConnectorConfig connectorConfig;
49+
protected RelationalDatabaseConnectorConfig connectorConfig;
3450

35-
private List<SourceRecord> batch;
51+
protected List<SourceRecord> batch;
3652

37-
private Iterator<SourceRecord> recordIterator;
53+
protected Iterator<SourceRecord> recordIterator;
3854

39-
private CdcSourceTaskContext taskContext;
55+
protected Map<String, ?> offset;
4056

41-
private Map<String, ?> offset;
57+
protected SplitChangeEventStreamingTaskController splitChangeEventStreamingTaskController;
4258

43-
private SplitChangeEventStreamingTaskController splitChangeEventStreamingTaskController;
44-
45-
private SplitChangeEventStreamingTaskContext splitChangeEventStreamingTaskContext;
59+
protected AbstractSplitChangeEventStreamingTaskContext splitChangeEventStreamingTaskContext;
4660

4761
private final int subtaskId;
4862

49-
public AbstractJdbcChangeEventReader(BitSailConfiguration jobConf, int subtaskId) {
63+
public AbstractJdbcChangeEventSplitReader(BitSailConfiguration jobConf, int subtaskId) {
5064
jdbcDebeziumConfig = getJdbcDebeziumConfig(jobConf);
51-
connectorConfig = jdbcDebeziumConfig.getConnectorConfig();
65+
connectorConfig = jdbcDebeziumConfig.getDbzJdbcConnectorConfig();
5266
this.subtaskId = subtaskId;
5367
this.offset = new HashMap<>();
5468
}
5569

56-
public AbstractJdbcDebeziumConfig getJdbcDebeziumConfig(BitSailConfiguration jobConf) {
57-
return AbstractJdbcDebeziumConfig.fromBitSailConf(jobConf);
58-
}
59-
60-
public abstract SplitChangeEventStreamingTaskContext getSplitReaderTaskContext();
70+
public abstract AbstractJdbcDebeziumConfig getJdbcDebeziumConfig(BitSailConfiguration jobConf);
6171

62-
public abstract void testConnectionAndValidBinlogConfiguration(RelationalDatabaseConnectorConfig connectorConfig) throws IOException;
72+
public abstract AbstractSplitChangeEventStreamingTaskContext getSplitReaderTaskContext(BinlogSplit split, RelationalDatabaseConnectorConfig connectorConfig);
6373

6474
public void inititialzeSplitReader(BinlogSplit split) {
65-
splitChangeEventStreamingTaskContext = getSplitReaderTaskContext();
75+
splitChangeEventStreamingTaskContext = getSplitReaderTaskContext(split, connectorConfig);
6676
this.offset = new HashMap<>();
6777
this.queue = new ChangeEventQueue.Builder<DataChangeEvent>()
6878
.pollInterval(connectorConfig.getPollInterval())
6979
.maxBatchSize(connectorConfig.getMaxBatchSize())
7080
.maxQueueSize(connectorConfig.getMaxQueueSize())
7181
.maxQueueSizeInBytes(connectorConfig.getMaxQueueSizeInBytes())
72-
.loggingContextSupplier(() -> taskContext.configureLoggingContext(splitChangeEventStreamingTaskContext.threadNamePrefix()))
82+
.loggingContextSupplier(() -> splitChangeEventStreamingTaskContext.getDbzTaskContext()
83+
.configureLoggingContext(splitChangeEventStreamingTaskContext.threadNamePrefix()))
7384
.buffering()
7485
.build();
7586
this.batch = new ArrayList<>();
7687
this.recordIterator = this.batch.iterator();
77-
splitChangeEventStreamingTaskContext.initializeSplitReaderTaskContext(connectorConfig, this.queue);
88+
splitChangeEventStreamingTaskContext.attachStreamingToQueue(this.queue);
7889
splitChangeEventStreamingTaskController = new SplitChangeEventStreamingTaskController(splitChangeEventStreamingTaskContext, this.subtaskId);
7990
}
8091

@@ -136,7 +147,7 @@ public Row poll() {
136147
* @throws Exception
137148
*/
138149
@Override
139-
public boolean hasNext() throws Exception {
150+
public boolean hasNext() {
140151
if (this.recordIterator.hasNext()) {
141152
return true;
142153
} else {
@@ -149,20 +160,24 @@ public boolean isCompleted() {
149160
return !splitChangeEventStreamingTaskController.isRunning();
150161
}
151162

152-
private boolean pollNextBatch() throws InterruptedException {
163+
private boolean pollNextBatch() {
153164
if (splitChangeEventStreamingTaskController.isRunning()) {
154-
List<DataChangeEvent> dbzRecords = queue.poll();
155-
while (dbzRecords.isEmpty()) {
156-
//sleep 10s
157-
LOG.info("No record found, sleep for 5s in reader");
158-
TimeUnit.SECONDS.sleep(5);
159-
dbzRecords = queue.poll();
160-
}
161-
this.batch = new ArrayList<>();
162-
for (DataChangeEvent event : dbzRecords) {
163-
this.batch.add(event.getRecord());
165+
try {
166+
List<DataChangeEvent> dbzRecords = queue.poll();
167+
while (dbzRecords.isEmpty()) {
168+
//sleep 10s
169+
LOG.info("No record found, sleep for 5s in reader");
170+
TimeUnit.SECONDS.sleep(5);
171+
dbzRecords = queue.poll();
172+
}
173+
this.batch = new ArrayList<>();
174+
for (DataChangeEvent event : dbzRecords) {
175+
this.batch.add(event.getRecord());
176+
}
177+
this.recordIterator = this.batch.iterator();
178+
} catch (InterruptedException e) {
179+
throw new RuntimeException(e);
164180
}
165-
this.recordIterator = this.batch.iterator();
166181
return true;
167182
}
168183
return false;

0 commit comments

Comments
 (0)