Skip to content

Commit 196b609

Browse files
authored
fix: cloud tmq subscribe bug (#249)
* fix stmt sql without ? * fix cloud tmq subscribe bug * improve stmt close * update version
1 parent 3e6c5a4 commit 196b609

10 files changed

Lines changed: 98 additions & 51 deletions

File tree

deploy-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.taosdata.jdbc</groupId>
77
<artifactId>taos-jdbcdriver</artifactId>
8-
<version>3.6.1</version>
8+
<version>3.6.2</version>
99
<packaging>jar</packaging>
1010

1111
<name>JDBCDriver</name>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.taosdata.jdbc</groupId>
55
<artifactId>taos-jdbcdriver</artifactId>
6-
<version>3.6.1</version>
6+
<version>3.6.2</version>
77

88
<packaging>jar</packaging>
99
<name>JDBCDriver</name>

src/main/java/com/taosdata/jdbc/ws/AbsWSPreparedStatement.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,16 @@ public class AbsWSPreparedStatement extends WSStatement implements TaosPrepareSt
5353
private final HashMap<ByteBuffer, TableInfo> tableInfoMap = new HashMap<>();
5454
private TableInfo tableInfo;
5555

56+
public AbsWSPreparedStatement(Transport transport,
57+
ConnectionParam param,
58+
String database,
59+
AbstractConnection connection,
60+
String sql,
61+
Long instanceId) {
62+
super(transport, database, connection, instanceId, param.getZoneId());
63+
this.rawSql = sql;
64+
this.param = param;
65+
}
5666

5767
public AbsWSPreparedStatement(Transport transport,
5868
ConnectionParam param,
@@ -64,8 +74,6 @@ public AbsWSPreparedStatement(Transport transport,
6474
super(transport, database, connection, instanceId, param.getZoneId());
6575
this.rawSql = sql;
6676
this.param = param;
67-
if (!sql.contains("?"))
68-
return;
6977

7078
reqId = prepareResp.getReqId();
7179
stmtId = prepareResp.getStmtId();
@@ -771,7 +779,7 @@ public int[] executeBatch() throws SQLException {
771779
@Override
772780
public void close() throws SQLException {
773781
if (!isClosed()) {
774-
if (transport.isConnected()) {
782+
if (transport.isConnected() && stmtId != 0) {
775783
Request close = RequestFactory.generateClose(stmtId, reqId);
776784
transport.send(close);
777785
}
@@ -1096,7 +1104,7 @@ private int executeBatchImpl() throws SQLException {
10961104
Request request = RequestFactory.generateExec(stmtId, reqId);
10971105
Stmt2ExecResp resp = (Stmt2ExecResp) transport.send(request);
10981106
if (Code.SUCCESS.getCode() != resp.getCode()) {
1099-
throw TSDBError.createSQLException(bindResp.getCode(), "(0x" + Integer.toHexString(bindResp.getCode()) + "):" + bindResp.getMessage());
1107+
throw TSDBError.createSQLException(resp.getCode(), "(0x" + Integer.toHexString(resp.getCode()) + "):" + resp.getMessage());
11001108
}
11011109

11021110
this.affectedRows = resp.getAffected();

src/main/java/com/taosdata/jdbc/ws/TSWSPreparedStatement.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,4 +16,13 @@ public TSWSPreparedStatement(Transport transport,
1616
Stmt2PrepareResp prepareResp) throws SQLException {
1717
super(transport, param, database, connection, sql, instanceId, prepareResp);
1818
}
19+
20+
public TSWSPreparedStatement(Transport transport,
21+
ConnectionParam param,
22+
String database,
23+
AbstractConnection connection,
24+
String sql,
25+
Long instanceId) throws SQLException {
26+
super(transport, param, database, connection, sql, instanceId);
27+
}
1928
}

src/main/java/com/taosdata/jdbc/ws/WSConnection.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,15 @@ public PreparedStatement prepareStatement(String sql) throws SQLException {
6969
fastWriteSql = true;
7070
}
7171

72+
if (!sql.contains("?")){
73+
return new TSWSPreparedStatement(transport,
74+
param,
75+
database,
76+
this,
77+
sql,
78+
idGenerator.getAndIncrement());
79+
}
80+
7281
if (transport != null && !transport.isClosed()) {
7382
long reqId = ReqId.getReqID();
7483
Request request = com.taosdata.jdbc.ws.stmt2.entity.RequestFactory.generateInit(reqId, true, true);

src/main/java/com/taosdata/jdbc/ws/WSStatement.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,9 @@ public void setQueryTimeout(int seconds) throws SQLException {
121121
if (seconds < 0)
122122
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
123123

124+
if (seconds == 0){
125+
seconds = Integer.MAX_VALUE;
126+
}
124127
this.queryTimeout = seconds;
125128
transport.setTimeout(seconds * 1000L);
126129
}

src/main/java/com/taosdata/jdbc/ws/WebSocketDriver.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,12 @@
11
package com.taosdata.jdbc.ws;
22

33

4-
import com.fasterxml.jackson.core.JsonProcessingException;
5-
import com.fasterxml.jackson.databind.JsonNode;
6-
import com.fasterxml.jackson.databind.ObjectMapper;
7-
import com.fasterxml.jackson.databind.ObjectReader;
8-
import com.taosdata.jdbc.*;
9-
import com.taosdata.jdbc.enums.WSFunction;
4+
import com.taosdata.jdbc.AbstractDriver;
5+
import com.taosdata.jdbc.TSDBError;
6+
import com.taosdata.jdbc.TSDBErrorNumbers;
107
import com.taosdata.jdbc.rs.ConnectionParam;
11-
import com.taosdata.jdbc.rs.RestfulConnection;
12-
import com.taosdata.jdbc.utils.HttpClientPoolUtil;
13-
import com.taosdata.jdbc.utils.JsonUtil;
14-
import com.taosdata.jdbc.utils.StringUtils;
15-
import com.taosdata.jdbc.ws.entity.*;
16-
import org.slf4j.LoggerFactory;
178

18-
import java.nio.ByteOrder;
19-
import java.nio.charset.StandardCharsets;
209
import java.sql.*;
21-
import java.util.Base64;
2210
import java.util.Properties;
2311
import java.util.logging.Logger;
2412

src/main/java/com/taosdata/jdbc/ws/tmq/entity/ConsumerParam.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,32 @@ public class ConsumerParam {
2020
knownKeys.add(TSDBDriver.PROPERTY_KEY_PASSWORD);
2121
knownKeys.add(TSDBDriver.PROPERTY_KEY_HOST);
2222
knownKeys.add(TSDBDriver.PROPERTY_KEY_PORT);
23+
knownKeys.add(TSDBDriver.PROPERTY_KEY_TOKEN);
24+
knownKeys.add(TSDBDriver.PROPERTY_KEY_PRODUCT_NAME);
25+
knownKeys.add(TSDBDriver.PROPERTY_KEY_DBNAME);
26+
knownKeys.add(TSDBDriver.PROPERTY_KEY_USE_SSL);
27+
knownKeys.add(TSDBDriver.PROPERTY_KEY_CONFIG_DIR);
28+
knownKeys.add(TSDBDriver.PROPERTY_KEY_LOCALE);
29+
knownKeys.add(TSDBDriver.PROPERTY_KEY_CHARSET);
30+
knownKeys.add(TSDBDriver.PROPERTY_KEY_BATCH_LOAD);
31+
knownKeys.add(TSDBDriver.PROPERTY_KEY_BATCH_ERROR_IGNORE);
32+
knownKeys.add(TSDBDriver.PROPERTY_KEY_MESSAGE_WAIT_TIMEOUT);
33+
knownKeys.add(TSDBDriver.PROPERTY_KEY_CONNECT_MODE);
34+
knownKeys.add(TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION);
35+
knownKeys.add(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT);
36+
knownKeys.add(TSDBDriver.PROPERTY_KEY_SLAVE_CLUSTER_HOST);
37+
knownKeys.add(TSDBDriver.PROPERTY_KEY_SLAVE_CLUSTER_PORT);
38+
knownKeys.add(TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS);
39+
knownKeys.add(TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT);
40+
knownKeys.add(TSDBDriver.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION);
41+
knownKeys.add(TSDBDriver.PROPERTY_KEY_APP_IP);
42+
knownKeys.add(TSDBDriver.PROPERTY_KEY_APP_NAME);
43+
knownKeys.add(TSDBDriver.PROPERTY_KEY_TIME_ZONE);
44+
45+
knownKeys.add(TSDBDriver.HTTP_POOL_SIZE);
46+
knownKeys.add(TSDBDriver.HTTP_KEEP_ALIVE);
47+
knownKeys.add(TSDBDriver.HTTP_CONNECT_TIMEOUT);
48+
knownKeys.add(TSDBDriver.HTTP_SOCKET_TIMEOUT);
2349

2450
knownKeys.add(TMQConstants.CONNECT_USER);
2551
knownKeys.add(TMQConstants.CONNECT_PASS);
@@ -37,18 +63,6 @@ public class ConsumerParam {
3763
knownKeys.add(TMQConstants.VALUE_DESERIALIZER_ENCODING);
3864
knownKeys.add(TMQConstants.CONNECT_TYPE);
3965
knownKeys.add(TMQConstants.CONNECT_URL);
40-
41-
knownKeys.add(TSDBDriver.PROPERTY_KEY_ENABLE_COMPRESSION);
42-
knownKeys.add(TSDBDriver.PROPERTY_KEY_ENABLE_AUTO_RECONNECT);
43-
knownKeys.add(TSDBDriver.PROPERTY_KEY_SLAVE_CLUSTER_HOST);
44-
knownKeys.add(TSDBDriver.PROPERTY_KEY_SLAVE_CLUSTER_PORT);
45-
knownKeys.add(TSDBDriver.PROPERTY_KEY_RECONNECT_INTERVAL_MS);
46-
knownKeys.add(TSDBDriver.PROPERTY_KEY_RECONNECT_RETRY_COUNT);
47-
knownKeys.add(TSDBDriver.PROPERTY_KEY_DISABLE_SSL_CERT_VALIDATION);
48-
49-
knownKeys.add(TSDBDriver.PROPERTY_KEY_APP_IP);
50-
knownKeys.add(TSDBDriver.PROPERTY_KEY_APP_NAME);
51-
knownKeys.add(TSDBDriver.PROPERTY_KEY_TIME_ZONE);
5266
}
5367

5468
private ConnectionParam connectionParam;

src/test/java/com/taosdata/jdbc/ws/WSConFailOverTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,10 @@ public void before() throws SQLException, InterruptedException, IOException, URI
7878
taosAdapterMock = new TaosAdapterMock(9041);
7979
taosAdapterMock.start();
8080

81+
while (!taosAdapterMock.isReady()){
82+
Thread.sleep(10);
83+
}
84+
8185
String url;
8286
url = SpecifyAddress.getInstance().getWebSocketWithoutUrl();
8387
if (url == null) {

src/test/java/com/taosdata/jdbc/ws/stmt/WsPstmtTest.java

Lines changed: 30 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.taosdata.jdbc.utils.SpecifyAddress;
44
import com.taosdata.jdbc.ws.TSWSPreparedStatement;
55
import org.junit.*;
6+
import org.junit.runners.MethodSorters;
67

78
import java.io.InputStream;
89
import java.io.OutputStream;
@@ -14,7 +15,7 @@
1415
import java.util.HashSet;
1516
import java.util.Properties;
1617

17-
@FixMethodOrder
18+
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
1819
public class WsPstmtTest {
1920
String host = "127.0.0.1";
2021
String db_name = "ws_prepare";
@@ -25,7 +26,7 @@ public class WsPstmtTest {
2526
PreparedStatement pstmt;
2627

2728
@Test
28-
public void testExecuteUpdate() throws SQLException {
29+
public void test001_ExecuteUpdate() throws SQLException {
2930
String sql = "insert into " + db_name + "." + tableName + " values(?, ?)";
3031
PreparedStatement statement = connection.prepareStatement(sql);
3132
statement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
@@ -37,7 +38,7 @@ public void testExecuteUpdate() throws SQLException {
3738
}
3839

3940
@Test
40-
public void testReuseStmtExecuteUpdate() throws SQLException {
41+
public void test002_ReuseStmtExecuteUpdate() throws SQLException {
4142
String sql = "insert into " + db_name + "." + tableName + " values(?, ?)";
4243
PreparedStatement statement = connection.prepareStatement(sql);
4344
statement.setTimestamp(1, new Timestamp(System.currentTimeMillis()));
@@ -55,7 +56,7 @@ public void testReuseStmtExecuteUpdate() throws SQLException {
5556
}
5657

5758
@Test
58-
public void testExecuteBatchInsert() throws SQLException {
59+
public void test003_ExecuteBatchInsert() throws SQLException {
5960
String sql = "insert into " + db_name + "." + tableName + " (ts, c1) values(?, ?)";
6061
PreparedStatement statement = connection.prepareStatement(sql);
6162
for (int i = 0; i < 10; i++) {
@@ -79,7 +80,7 @@ public void testExecuteBatchInsert() throws SQLException {
7980
}
8081

8182
@Test
82-
public void testQuery() throws SQLException {
83+
public void test004_Query() throws SQLException {
8384
String sql = "select * from " + db_name + "." + tableName + " where ts > ? and ts < ?";
8485
PreparedStatement statement = connection.prepareStatement(sql);
8586
statement.setTimestamp(1, new Timestamp(System.currentTimeMillis() - 1000));
@@ -90,17 +91,28 @@ public void testQuery() throws SQLException {
9091
}
9192
}
9293

94+
@Test
95+
public void test005_NormalQuery() throws SQLException {
96+
pstmt.execute("insert into " + db_name + "." + tableName + " values (now, 1)");
97+
98+
String sql = "select * from " + db_name + "." + tableName + " limit 1";
99+
try (PreparedStatement statement = connection.prepareStatement(sql);
100+
ResultSet resultSet = statement.executeQuery()){
101+
Assert.assertTrue(resultSet.next());
102+
}
103+
}
104+
93105
@Test (expected = SQLException.class)
94-
public void testSetNCharacterStream() throws SQLException {
106+
public void test100_SetNCharacterStream() throws SQLException {
95107
pstmt.setNCharacterStream(1, null);
96108
}
97109

98110
@Test (expected = SQLException.class)
99-
public void testSetNCharacterStream2() throws SQLException {
111+
public void test101_SetNCharacterStream2() throws SQLException {
100112
pstmt.setNCharacterStream(1, null, 0);
101113
}
102114
@Test (expected = SQLException.class)
103-
public void testSetNClob() throws SQLException {
115+
public void test012_SetNClob() throws SQLException {
104116
pstmt.setNClob(1, new NClob() {
105117
@Override
106118
public long length() throws SQLException {
@@ -169,12 +181,12 @@ public Reader getCharacterStream(long pos, long length) throws SQLException {
169181
});
170182
}
171183
@Test (expected = SQLException.class)
172-
public void testSetNClob2() throws SQLException {
184+
public void test103_SetNClob2() throws SQLException {
173185
pstmt.setNClob(1, null, 0);
174186
}
175187

176188
@Test (expected = SQLException.class)
177-
public void testSetBlob() throws SQLException {
189+
public void test104_SetBlob() throws SQLException {
178190
pstmt.setBlob(1, new Blob() {
179191
@Override
180192
public long length() throws SQLException {
@@ -233,38 +245,38 @@ public InputStream getBinaryStream(long pos, long length) throws SQLException {
233245
});
234246
}
235247
@Test (expected = SQLException.class)
236-
public void testSetBlob2() throws SQLException {
248+
public void test105_SetBlob2() throws SQLException {
237249
pstmt.setBlob(1, null, 0);
238250
}
239251

240252
@Test (expected = SQLException.class)
241-
public void testSetSQLXML() throws SQLException {
253+
public void test106_SetSQLXML() throws SQLException {
242254
pstmt.setSQLXML(1, null);
243255
}
244256

245257
@Test (expected = SQLException.class)
246-
public void testSetObject() throws SQLException {
258+
public void test107_SetObject() throws SQLException {
247259
pstmt.setObject(1, null, 0, 0);
248260
}
249261

250262
@Test (expected = SQLException.class)
251-
public void testSetAsciiStream() throws SQLException {
263+
public void test108_SetAsciiStream() throws SQLException {
252264
pstmt.setAsciiStream(1, null, 0);
253265
}
254266

255267

256268
@Test (expected = SQLException.class)
257-
public void testSetBinaryStream() throws SQLException {
269+
public void test109_SetBinaryStream() throws SQLException {
258270
pstmt.setBinaryStream(1, null, 0);
259271
}
260272

261273
@Test (expected = SQLException.class)
262-
public void testSetCharacterStream() throws SQLException {
274+
public void test110_SetCharacterStream() throws SQLException {
263275
pstmt.setCharacterStream(1, null, 0);
264276
}
265277

266278
@Test
267-
public void testSetTagNull() throws SQLException {
279+
public void test111_SetTagNull() throws SQLException {
268280
TSWSPreparedStatement wsPreparedStatement = pstmt.unwrap(TSWSPreparedStatement.class);
269281
wsPreparedStatement.setTagSqlTypeNull(1, Types.BOOLEAN);
270282
wsPreparedStatement.setTagSqlTypeNull(1, Types.TINYINT);
@@ -281,7 +293,7 @@ public void testSetTagNull() throws SQLException {
281293
}
282294

283295
@Test
284-
public void testSetObject2() throws SQLException {
296+
public void test112_SetObject2() throws SQLException {
285297
TSWSPreparedStatement wsPreparedStatement = pstmt.unwrap(TSWSPreparedStatement.class);
286298
wsPreparedStatement.setObject(1, null, Types.BOOLEAN);
287299
wsPreparedStatement.setObject(1, null, Types.TINYINT);

0 commit comments

Comments
 (0)