Skip to content

Commit 45345f2

Browse files
authored
[Fix] [Clickhouse] Parallelism makes data duplicate (#8916)
1 parent 79f9d96 commit 45345f2

File tree

7 files changed

+40
-211
lines changed

7 files changed

+40
-211
lines changed

Diff for: docs/en/connector-v2/source/Clickhouse.md

-1
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,6 @@ The following example demonstrates how to create a data synchronization job that
6767
```bash
6868
# Set the basic configuration of the task to be performed
6969
env {
70-
parallelism = 10
7170
job.mode = "BATCH"
7271
}
7372

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java

+13-32
Original file line numberDiff line numberDiff line change
@@ -18,33 +18,30 @@
1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
1919

2020
import org.apache.seatunnel.api.source.Boundedness;
21-
import org.apache.seatunnel.api.source.SeaTunnelSource;
22-
import org.apache.seatunnel.api.source.SourceReader;
23-
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
24-
import org.apache.seatunnel.api.source.SupportColumnProjection;
25-
import org.apache.seatunnel.api.source.SupportParallelism;
2621
import org.apache.seatunnel.api.table.catalog.CatalogTable;
2722
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
28-
import org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
23+
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
24+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
25+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
26+
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
2927

3028
import com.clickhouse.client.ClickHouseNode;
3129

3230
import java.util.Collections;
3331
import java.util.List;
3432

35-
public class ClickhouseSource
36-
implements SeaTunnelSource<SeaTunnelRow, ClickhouseSourceSplit, ClickhouseSourceState>,
37-
SupportParallelism,
38-
SupportColumnProjection {
33+
public class ClickhouseSource extends AbstractSingleSplitSource<SeaTunnelRow> {
3934

40-
private List<ClickHouseNode> servers;
41-
private CatalogTable catalogTable;
42-
private String sql;
35+
private final List<ClickHouseNode> servers;
36+
private final CatalogTable catalogTable;
37+
private final String sql;
38+
private final SeaTunnelRowType rowTypeInfo;
4339

4440
public ClickhouseSource(List<ClickHouseNode> servers, CatalogTable catalogTable, String sql) {
4541
this.servers = servers;
4642
this.catalogTable = catalogTable;
4743
this.sql = sql;
44+
this.rowTypeInfo = catalogTable.getSeaTunnelRowType();
4845
}
4946

5047
@Override
@@ -63,24 +60,8 @@ public List<CatalogTable> getProducedCatalogTables() {
6360
}
6461

6562
@Override
66-
public SourceReader<SeaTunnelRow, ClickhouseSourceSplit> createReader(
67-
SourceReader.Context readerContext) throws Exception {
68-
return new ClickhouseSourceReader(
69-
servers, readerContext, this.catalogTable.getSeaTunnelRowType(), sql);
70-
}
71-
72-
@Override
73-
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> createEnumerator(
74-
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext)
75-
throws Exception {
76-
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
77-
}
78-
79-
@Override
80-
public SourceSplitEnumerator<ClickhouseSourceSplit, ClickhouseSourceState> restoreEnumerator(
81-
SourceSplitEnumerator.Context<ClickhouseSourceSplit> enumeratorContext,
82-
ClickhouseSourceState checkpointState)
83-
throws Exception {
84-
return new ClickhouseSourceSplitEnumerator(enumeratorContext);
63+
public AbstractSingleSplitReader<SeaTunnelRow> createReader(
64+
SingleSplitReaderContext readerContext) {
65+
return new ClickhouseSourceReader(servers, readerContext, sql, rowTypeInfo);
8566
}
8667
}

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceFactory.java

-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public String factoryIdentifier() {
6868
TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context) {
6969
ReadonlyConfig readonlyConfig = context.getOptions();
7070
List<ClickHouseNode> nodes = ClickhouseUtil.createNodes(readonlyConfig);
71-
7271
String sql = readonlyConfig.get(SQL);
7372
ClickHouseNode currentServer = nodes.get(ThreadLocalRandom.current().nextInt(nodes.size()));
7473
try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol());

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceReader.java

+26-58
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,12 @@
1717

1818
package org.apache.seatunnel.connectors.seatunnel.clickhouse.source;
1919

20-
import org.apache.seatunnel.api.source.Boundedness;
2120
import org.apache.seatunnel.api.source.Collector;
22-
import org.apache.seatunnel.api.source.SourceReader;
2321
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2422
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2523
import org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
24+
import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
25+
import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
2626

2727
import com.clickhouse.client.ClickHouseClient;
2828
import com.clickhouse.client.ClickHouseFormat;
@@ -32,34 +32,28 @@
3232
import lombok.extern.slf4j.Slf4j;
3333

3434
import java.io.IOException;
35-
import java.util.ArrayList;
36-
import java.util.Collections;
3735
import java.util.List;
3836
import java.util.Random;
3937

4038
@Slf4j
41-
public class ClickhouseSourceReader implements SourceReader<SeaTunnelRow, ClickhouseSourceSplit> {
39+
public class ClickhouseSourceReader extends AbstractSingleSplitReader<SeaTunnelRow> {
4240

4341
private final List<ClickHouseNode> servers;
4442
private ClickHouseClient client;
4543
private final SeaTunnelRowType rowTypeInfo;
46-
private final SourceReader.Context readerContext;
44+
private final SingleSplitReaderContext readerContext;
4745
private ClickHouseRequest<?> request;
4846
private final String sql;
49-
private volatile boolean noMoreSplit;
50-
51-
private final List<ClickhouseSourceSplit> splits;
5247

5348
ClickhouseSourceReader(
5449
List<ClickHouseNode> servers,
55-
SourceReader.Context readerContext,
56-
SeaTunnelRowType rowTypeInfo,
57-
String sql) {
50+
SingleSplitReaderContext readerContext,
51+
String sql,
52+
SeaTunnelRowType rowTypeInfo) {
5853
this.servers = servers;
5954
this.readerContext = readerContext;
60-
this.rowTypeInfo = rowTypeInfo;
6155
this.sql = sql;
62-
this.splits = new ArrayList<>();
56+
this.rowTypeInfo = rowTypeInfo;
6357
}
6458

6559
@Override
@@ -80,57 +74,31 @@ public void close() throws IOException {
8074
@Override
8175
public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
8276
synchronized (output.getCheckpointLock()) {
83-
if (!splits.isEmpty()) {
84-
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
85-
response.stream()
86-
.forEach(
87-
record -> {
88-
Object[] values =
89-
new Object[this.rowTypeInfo.getFieldNames().length];
90-
for (int i = 0; i < record.size(); i++) {
91-
if (record.getValue(i).isNullOrEmpty()) {
92-
values[i] = null;
93-
} else {
94-
values[i] =
95-
TypeConvertUtil.valueUnwrap(
96-
this.rowTypeInfo.getFieldType(i),
97-
record.getValue(i));
98-
}
77+
try (ClickHouseResponse response = this.request.query(sql).executeAndWait()) {
78+
response.stream()
79+
.forEach(
80+
record -> {
81+
Object[] values =
82+
new Object[this.rowTypeInfo.getFieldNames().length];
83+
for (int i = 0; i < record.size(); i++) {
84+
if (record.getValue(i).isNullOrEmpty()) {
85+
values[i] = null;
86+
} else {
87+
values[i] =
88+
TypeConvertUtil.valueUnwrap(
89+
this.rowTypeInfo.getFieldType(i),
90+
record.getValue(i));
9991
}
100-
output.collect(new SeaTunnelRow(values));
101-
});
102-
}
103-
signalNoMoreElement();
104-
}
105-
if (noMoreSplit
106-
&& splits.isEmpty()
107-
&& Boundedness.BOUNDED.equals(readerContext.getBoundedness())) {
108-
signalNoMoreElement();
92+
}
93+
output.collect(new SeaTunnelRow(values));
94+
});
10995
}
96+
signalNoMoreElement();
11097
}
11198
}
11299

113100
private void signalNoMoreElement() {
114101
log.info("Closed the bounded ClickHouse source");
115102
this.readerContext.signalNoMoreElement();
116-
this.splits.clear();
117103
}
118-
119-
@Override
120-
public List<ClickhouseSourceSplit> snapshotState(long checkpointId) throws Exception {
121-
return Collections.emptyList();
122-
}
123-
124-
@Override
125-
public void addSplits(List<ClickhouseSourceSplit> splits) {
126-
this.splits.addAll(splits);
127-
}
128-
129-
@Override
130-
public void handleNoMoreSplits() {
131-
noMoreSplit = true;
132-
}
133-
134-
@Override
135-
public void notifyCheckpointComplete(long checkpointId) throws Exception {}
136104
}

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplit.java

-27
This file was deleted.

Diff for: seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSourceSplitEnumerator.java

-91
This file was deleted.

Diff for: seatunnel-e2e/seatunnel-connector-v2-e2e/connector-clickhouse-e2e/src/test/resources/clickhouse_to_console.conf

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
######
2020

2121
env {
22-
parallelism = 3
22+
parallelism = 1
2323
job.mode = "BATCH"
2424
}
2525

0 commit comments

Comments
 (0)