Skip to content

Commit 1a02c57

Browse files
[Improve] StarRocksSourceReader use the existing client (#6480)
1 parent ca4a65f commit 1a02c57

File tree

2 files changed

+41
-26
lines changed

2 files changed

+41
-26
lines changed

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/client/source/StarRocksBeReadClient.java

+8-17
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040

4141
import java.io.Serializable;
4242
import java.util.ArrayList;
43-
import java.util.List;
43+
import java.util.Set;
4444
import java.util.concurrent.atomic.AtomicBoolean;
4545

4646
import static org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorErrorCode.CLOSE_BE_READER_FAILED;
@@ -55,21 +55,12 @@ public class StarRocksBeReadClient implements Serializable {
5555
private String contextId;
5656
private int readerOffset = 0;
5757
private final SourceConfig sourceConfig;
58-
private final SeaTunnelRowType seaTunnelRowType;
58+
private SeaTunnelRowType seaTunnelRowType;
5959
private StarRocksRowBatchReader rowBatch;
60-
61-
private final List<Long> tabletIds;
62-
63-
private final String queryPlan;
6460
protected AtomicBoolean eos = new AtomicBoolean(false);
6561

66-
public StarRocksBeReadClient(
67-
QueryPartition queryPartition,
68-
SourceConfig sourceConfig,
69-
SeaTunnelRowType seaTunnelRowType) {
62+
public StarRocksBeReadClient(String beNodeInfo, SourceConfig sourceConfig) {
7063
this.sourceConfig = sourceConfig;
71-
this.seaTunnelRowType = seaTunnelRowType;
72-
String beNodeInfo = queryPartition.getBeAddress();
7364
log.debug("Parse StarRocks BE address: '{}'.", beNodeInfo);
7465
String[] hostPort = beNodeInfo.split(":");
7566
if (hostPort.length != 2) {
@@ -79,8 +70,6 @@ public StarRocksBeReadClient(
7970
}
8071
this.ip = hostPort[0].trim();
8172
this.port = Integer.parseInt(hostPort[1].trim());
82-
this.queryPlan = queryPartition.getQueryPlan();
83-
this.tabletIds = new ArrayList<>(queryPartition.getTabletIds());
8473
TBinaryProtocol.Factory factory = new TBinaryProtocol.Factory();
8574
TSocket socket =
8675
new TSocket(
@@ -101,10 +90,12 @@ public StarRocksBeReadClient(
10190
client = new TStarrocksExternalService.Client(protocol);
10291
}
10392

104-
public void openScanner() {
93+
public void openScanner(QueryPartition partition, SeaTunnelRowType seaTunnelRowType) {
94+
this.seaTunnelRowType = seaTunnelRowType;
95+
Set<Long> tabletIds = partition.getTabletIds();
10596
TScanOpenParams params = new TScanOpenParams();
106-
params.setTablet_ids(tabletIds);
107-
params.setOpaqued_query_plan(queryPlan);
97+
params.setTablet_ids(new ArrayList<>(tabletIds));
98+
params.setOpaqued_query_plan(partition.getQueryPlan());
10899
params.setCluster(DEFAULT_CLUSTER_NAME);
109100
params.setDatabase(sourceConfig.getDatabase());
110101
params.setTable(sourceConfig.getTable());

seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/source/StarRocksSourceReader.java

+33-9
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@
2323
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
2424
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
2525
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.StarRocksBeReadClient;
26+
import org.apache.seatunnel.connectors.seatunnel.starrocks.client.source.model.QueryPartition;
2627
import org.apache.seatunnel.connectors.seatunnel.starrocks.config.SourceConfig;
28+
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;
2729

2830
import lombok.extern.slf4j.Slf4j;
2931

3032
import java.io.IOException;
3133
import java.util.ArrayList;
34+
import java.util.HashMap;
3235
import java.util.LinkedList;
3336
import java.util.List;
37+
import java.util.Map;
3438
import java.util.Queue;
3539

3640
@Slf4j
@@ -40,6 +44,7 @@ public class StarRocksSourceReader implements SourceReader<SeaTunnelRow, StarRoc
4044
private final SourceReader.Context context;
4145
private final SourceConfig sourceConfig;
4246
private final SeaTunnelRowType seaTunnelRowType;
47+
private Map<String, StarRocksBeReadClient> clientsPools;
4348
private volatile boolean noMoreSplitsAssignment;
4449

4550
public StarRocksSourceReader(
@@ -87,26 +92,45 @@ public void handleNoMoreSplits() {
8792
}
8893

8994
private void read(StarRocksSourceSplit split, Collector<SeaTunnelRow> output) {
90-
StarRocksBeReadClient client =
91-
new StarRocksBeReadClient(split.getPartition(), sourceConfig, seaTunnelRowType);
95+
96+
QueryPartition partition = split.getPartition();
97+
String beAddress = partition.getBeAddress();
98+
StarRocksBeReadClient client = null;
99+
if (clientsPools.containsKey(beAddress)) {
100+
client = clientsPools.get(beAddress);
101+
} else {
102+
client = new StarRocksBeReadClient(beAddress, sourceConfig);
103+
clientsPools.put(beAddress, client);
104+
}
92105
// open scanner to be
93-
client.openScanner();
106+
client.openScanner(partition, seaTunnelRowType);
94107
while (client.hasNext()) {
95108
SeaTunnelRow seaTunnelRow = client.getNext();
96109
output.collect(seaTunnelRow);
97110
}
98-
// close client to be
99-
if (client != null) {
100-
client.close();
101-
}
102111
}
103112

104113
@Override
105-
public void open() throws Exception {}
114+
public void open() throws Exception {
115+
clientsPools = new HashMap<>();
116+
}
106117

107118
@Override
108119
public void close() throws IOException {
109-
// nothing to do
120+
if (!clientsPools.isEmpty()) {
121+
clientsPools
122+
.values()
123+
.forEach(
124+
client -> {
125+
if (client != null) {
126+
try {
127+
client.close();
128+
} catch (StarRocksConnectorException e) {
129+
log.error("Failed to close reader: ", e);
130+
}
131+
}
132+
});
133+
}
110134
}
111135

112136
@Override

0 commit comments

Comments
 (0)