Skip to content

Commit d4c83e5

Browse files
committed
perf: improve native query performance
1 parent 157c4ec commit d4c83e5

10 files changed

Lines changed: 343 additions & 23 deletions

File tree

deploy-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.taosdata.jdbc</groupId>
77
<artifactId>taos-jdbcdriver</artifactId>
8-
<version>3.7.1</version>
8+
<version>3.7.2</version>
99
<packaging>jar</packaging>
1010

1111
<name>JDBCDriver</name>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.taosdata.jdbc</groupId>
55
<artifactId>taos-jdbcdriver</artifactId>
6-
<version>3.7.1</version>
6+
<version>3.7.2</version>
77

88
<packaging>jar</packaging>
99
<name>JDBCDriver</name>

src/main/java/com/taosdata/jdbc/AbstractDriver.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,8 @@ protected Connection getWSConnection(String url, ConnectionParam param, Properti
9292
Utils.retainByteBuf(byteBuf);
9393
FetchBlockNewResp fetchBlockResp = new FetchBlockNewResp(byteBuf);
9494
remove.getFuture().complete(fetchBlockResp);
95+
} else {
96+
9597
}
9698
});
9799
Transport transport = new Transport(WSFunction.WS, param, inFlightRequest);

src/main/java/com/taosdata/jdbc/AbstractResultSet.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.concurrent.ForkJoinPool;
1313

1414
public abstract class AbstractResultSet extends WrapperImpl implements ResultSet {
15+
protected final int START_BACKEND_FETCH_BLOCK_NUM = 3;
1516
private int fetchSize;
1617
protected boolean wasNull;
1718
protected int timestampPrecision;

src/main/java/com/taosdata/jdbc/TSDBResultSet.java

Lines changed: 33 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class TSDBResultSet extends AbstractResultSet {
2222
BlockingQueue<TSDBResultSetBlockData> blockingQueueOut = new LinkedBlockingQueue<>(cacheSize);
2323
private TSDBResultSetBlockData blockData;
2424
private volatile boolean isClosed;
25+
private int fetchBlockNum = 0;
2526
ThreadPoolExecutor backFetchExecutor;
2627
ForkJoinPool dataHandleExecutor = getForkJoinPool();
2728

@@ -49,6 +50,9 @@ public TSDBResultSet(TSDBStatement statement, TSDBJNIConnector connector, long r
4950
this.timestampPrecision = timestampPrecision;
5051
this.blockData = new TSDBResultSetBlockData();
5152

53+
}
54+
55+
private void startBackendFetch() {
5256
backFetchExecutor = (ThreadPoolExecutor)Executors.newFixedThreadPool(1);
5357
backFetchExecutor.submit(() -> {
5458
try {
@@ -73,7 +77,6 @@ public TSDBResultSet(TSDBStatement statement, TSDBJNIConnector connector, long r
7377
}
7478
});
7579
}
76-
7780
public boolean next() throws SQLException {
7881
if (isClosed){
7982
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
@@ -82,14 +85,27 @@ public boolean next() throws SQLException {
8285
if (this.blockData.forward())
8386
return true;
8487

85-
try {
86-
this.blockData = blockingQueueOut.take();
87-
} catch (InterruptedException e) {
88-
Thread.currentThread().interrupt();
88+
fetchBlockNum++;
89+
90+
int code;
91+
if (fetchBlockNum > START_BACKEND_FETCH_BLOCK_NUM) {
92+
if (backFetchExecutor == null) {
93+
startBackendFetch();
94+
}
95+
96+
try {
97+
this.blockData = blockingQueueOut.take();
98+
} catch (InterruptedException e) {
99+
Thread.currentThread().interrupt();
100+
}
101+
this.blockData.waitTillOK();
102+
103+
code = this.blockData.returnCode;
104+
} else {
105+
code = this.jniConnector.fetchBlock(this.resultSetPointer, this.blockData);
106+
this.blockData.reset();
89107
}
90-
this.blockData.waitTillOK();
91108

92-
int code = this.blockData.returnCode;
93109
if (code == TSDBConstants.JNI_CONNECTION_NULL) {
94110
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_JNI_CONNECTION_NULL);
95111
} else if (code == TSDBConstants.JNI_RESULT_SET_NULL) {
@@ -104,15 +120,17 @@ public void close() throws SQLException {
104120
return;
105121
isClosed = true;
106122

107-
while (backFetchExecutor.getActiveCount() != 0) {
108-
try {
109-
Thread.sleep(1);
110-
} catch (InterruptedException ignored) {
111-
Thread.currentThread().interrupt();
123+
if (backFetchExecutor != null) {
124+
while (backFetchExecutor.getActiveCount() != 0) {
125+
try {
126+
Thread.sleep(1);
127+
} catch (InterruptedException ignored) {
128+
Thread.currentThread().interrupt();
129+
}
130+
}
131+
if (!backFetchExecutor.isShutdown()) {
132+
backFetchExecutor.shutdown();
112133
}
113-
}
114-
if (!backFetchExecutor.isShutdown()){
115-
backFetchExecutor.shutdown();
116134
}
117135

118136
if (this.statement == null)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package com.taosdata.jdbc.enums;
2+
3+
public enum FetchState {
4+
PAUSED(1),
5+
FETCHING(2),
6+
COMPLETED(3)
7+
;
8+
private final long state;
9+
10+
FetchState(int state) {
11+
this.state = state;
12+
}
13+
14+
public long get() {
15+
return state;
16+
}
17+
}

src/main/java/com/taosdata/jdbc/ws/AbstractWSResultSet.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ public abstract class AbstractWSResultSet extends AbstractResultSet {
4444
ForkJoinPool dataHandleExecutor = getForkJoinPool();
4545

4646
private int fetchBlockNum = 0;
47-
private final int START_BACKEND_FETCH_BLOCK_NUM = 3;
4847
protected AbstractWSResultSet(Statement statement, Transport transport,
4948
QueryResp response, String database) throws SQLException {
5049
this.statement = statement;
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
package com.taosdata.jdbc.ws;
2+
3+
import com.taosdata.jdbc.AbstractResultSet;
4+
import com.taosdata.jdbc.BlockData;
5+
import com.taosdata.jdbc.TSDBError;
6+
import com.taosdata.jdbc.TSDBErrorNumbers;
7+
import com.taosdata.jdbc.enums.DataType;
8+
import com.taosdata.jdbc.enums.FetchState;
9+
import com.taosdata.jdbc.rs.RestfulResultSet;
10+
import com.taosdata.jdbc.rs.RestfulResultSetMetaData;
11+
import com.taosdata.jdbc.ws.entity.Action;
12+
import com.taosdata.jdbc.ws.entity.Code;
13+
import com.taosdata.jdbc.ws.entity.FetchBlockNewResp;
14+
import com.taosdata.jdbc.ws.entity.QueryResp;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
17+
18+
import java.sql.SQLException;
19+
import java.sql.Statement;
20+
import java.util.Arrays;
21+
import java.util.concurrent.*;
22+
23+
public abstract class FetchBlockData extends AbstractResultSet {
24+
private static final Logger log = LoggerFactory.getLogger(FetchBlockData.class);
25+
26+
protected final Transport transport;
27+
protected final long queryId;
28+
protected final long reqId;
29+
30+
protected volatile boolean isClosed;
31+
private static final int CACHE_SIZE = 5;
32+
BlockingQueue<BlockData> blockingQueueOut = new LinkedBlockingQueue<>(CACHE_SIZE);
33+
ForkJoinPool dataHandleExecutor = getForkJoinPool();
34+
FetchState fetchState = FetchState.PAUSED;
35+
36+
protected FetchBlockData(Statement statement, Transport transport,
37+
QueryResp response, String database) throws SQLException {
38+
this.transport = transport;
39+
this.queryId = response.getId();
40+
this.reqId = response.getReqId();
41+
}
42+
43+
private void startBackendFetch(){
44+
backFetchExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);
45+
backFetchExecutor.submit(() -> {
46+
try {
47+
while (!isClosed){
48+
BlockData blockData = BlockData.getEmptyBlockData(fields, timestampPrecision);
49+
50+
byte[] version = {1, 0};
51+
FetchBlockNewResp resp = (FetchBlockNewResp) transport.send(Action.FETCH_BLOCK_NEW.getAction(),
52+
reqId, queryId, 7, version);
53+
resp.init();
54+
55+
if (Code.SUCCESS.getCode() != resp.getCode()) {
56+
blockData.setReturnCode(resp.getCode());
57+
blockData.setErrorMessage(resp.getMessage());
58+
blockingQueueOut.put(blockData);
59+
break;
60+
}
61+
if (resp.isCompleted() || isClosed) {
62+
blockData.setCompleted(true);
63+
blockingQueueOut.put(blockData);
64+
break;
65+
}
66+
67+
blockData.setBuffer(resp.getBuffer());
68+
blockingQueueOut.put(blockData);
69+
70+
dataHandleExecutor.submit(blockData::handleData);
71+
}
72+
} catch (InterruptedException ignored) {
73+
Thread.currentThread().interrupt();
74+
} catch (Exception e) {
75+
log.error("fetch block error", e);
76+
BlockData blockData = BlockData.getEmptyBlockData(fields, timestampPrecision);
77+
while (!isClosed) {
78+
try {
79+
if (blockingQueueOut.offer(blockData, 10, TimeUnit.MILLISECONDS)){
80+
break;
81+
}
82+
} catch (InterruptedException ignored) {
83+
Thread.currentThread().interrupt();
84+
return;
85+
}
86+
}
87+
}
88+
});
89+
}
90+
91+
@Override
92+
public boolean next() throws SQLException {
93+
if (isClosed()) {
94+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_RESULTSET_CLOSED);
95+
}
96+
97+
if (this.forward()) {
98+
return true;
99+
}
100+
101+
fetchBlockNum++;
102+
if (fetchBlockNum > START_BACKEND_FETCH_BLOCK_NUM) {
103+
if (backFetchExecutor == null) {
104+
startBackendFetch();
105+
}
106+
BlockData blockData;
107+
try {
108+
blockData = blockingQueueOut.take();
109+
} catch (InterruptedException e) {
110+
Thread.currentThread().interrupt();
111+
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_UNKNOWN, "FETCH DATA INTERRUPTED");
112+
}
113+
114+
if (blockData.getReturnCode() != Code.SUCCESS.getCode()) {
115+
throw TSDBError.createSQLException(blockData.getReturnCode(), "FETCH DATA ERROR:" + blockData.getErrorMessage());
116+
}
117+
this.reset();
118+
if (blockData.isCompleted()) {
119+
this.isCompleted = true;
120+
return false;
121+
}
122+
blockData.waitTillOK();
123+
this.result = blockData.getData();
124+
this.numOfRows = blockData.getNumOfRows();
125+
} else {
126+
byte[] version = {1, 0};
127+
FetchBlockNewResp resp = (FetchBlockNewResp) transport.send(Action.FETCH_BLOCK_NEW.getAction(),
128+
reqId, queryId, 7, version);
129+
resp.init();
130+
131+
if (Code.SUCCESS.getCode() != resp.getCode()) {
132+
throw TSDBError.createSQLException(resp.getCode(), "FETCH DATA ERROR:" + resp.getMessage());
133+
}
134+
this.reset();
135+
BlockData blockData = BlockData.getEmptyBlockData(fields, timestampPrecision);
136+
137+
if (resp.isCompleted() || isClosed) {
138+
blockData.setCompleted(true);
139+
isCompleted = true;
140+
return false;
141+
}
142+
143+
blockData.setBuffer(resp.getBuffer());
144+
blockData.handleData();
145+
146+
this.result = blockData.getData();
147+
this.numOfRows = blockData.getNumOfRows();
148+
}
149+
150+
return true;
151+
}
152+
153+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package com.taosdata.jdbc.ws;
2+
3+
import com.taosdata.jdbc.ws.entity.Action;
4+
import com.taosdata.jdbc.ws.schemaless.SchemalessAction;
5+
import com.taosdata.jdbc.ws.tmq.ConsumerAction;
6+
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
import java.util.Optional;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
import java.util.concurrent.Semaphore;
12+
import java.util.concurrent.TimeUnit;
13+
import java.util.concurrent.TimeoutException;
14+
15+
/**
16+
* Unfinished execution
17+
*/
18+
public class InFlightFetchRequest {
19+
private final int timeout;
20+
private final Semaphore semaphore;
21+
private final Map<Long, ConcurrentHashMap<Long, FutureResponse>> futureMap = new HashMap<>();
22+
23+
public InFlightFetchRequest(int timeout, int concurrentNum) {
24+
this.timeout = timeout;
25+
this.semaphore = new Semaphore(concurrentNum);
26+
for (Action value : Action.values()) {
27+
String action = value.getAction();
28+
futureMap.put(action, new ConcurrentHashMap<>());
29+
}
30+
for (ConsumerAction value : ConsumerAction.values()) {
31+
String action = value.getAction();
32+
futureMap.put(action, new ConcurrentHashMap<>());
33+
}
34+
for (SchemalessAction value : SchemalessAction.values()) {
35+
String action = value.getAction();
36+
futureMap.put(action, new ConcurrentHashMap<>());
37+
}
38+
}
39+
40+
public void put(FutureResponse rf) throws InterruptedException, TimeoutException {
41+
if (semaphore.tryAcquire(timeout, TimeUnit.MILLISECONDS)) {
42+
futureMap.get(rf.getAction()).put(rf.getId(), rf);
43+
} else {
44+
throw new TimeoutException("websocket connection reached the max number of concurrent requests");
45+
}
46+
}
47+
48+
public FutureResponse remove(String action, Long id) {
49+
if (action.equals("version") && (id == null || id == 0) && futureMap.get(action).size() == 1) {
50+
Optional<Long> optionalLong = futureMap.get(action).keySet().stream().findFirst();
51+
if (optionalLong.isPresent()) {
52+
id = optionalLong.get();
53+
} else {
54+
return null;
55+
}
56+
FutureResponse future = futureMap.get(action).remove(id);
57+
if (null != future) {
58+
semaphore.release();
59+
}
60+
return future;
61+
}
62+
FutureResponse future = futureMap.get(action).remove(id);
63+
if (null != future) {
64+
semaphore.release();
65+
}
66+
return future;
67+
}
68+
69+
public void close() {
70+
futureMap.keySet().stream()
71+
.flatMap(k -> {
72+
ConcurrentHashMap<Long, FutureResponse> futures = futureMap.get(k);
73+
futureMap.put(k, new ConcurrentHashMap<>());
74+
return futures.values().stream();
75+
})
76+
.parallel().map(FutureResponse::getFuture)
77+
.forEach(e -> e.completeExceptionally(new Exception("close all inFlightRequest")));
78+
}
79+
80+
public boolean hasInFlightRequest() {
81+
return futureMap.keySet().stream()
82+
.filter(k -> !futureMap.get(k).isEmpty()).findAny().orElse(null) != null;
83+
}
84+
}

0 commit comments

Comments
 (0)