Skip to content

Commit 2ead789

Browse files
authored
fix: database subscription data type conversion bug (#250)
* fix: data type conversion in database subscription * test: added database subscription data type tests
1 parent 196b609 commit 2ead789

10 files changed

Lines changed: 127 additions & 43 deletions

File tree

deploy-pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
<groupId>com.taosdata.jdbc</groupId>
77
<artifactId>taos-jdbcdriver</artifactId>
8-
<version>3.6.2</version>
8+
<version>3.6.3</version>
99
<packaging>jar</packaging>
1010

1111
<name>JDBCDriver</name>

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
<modelVersion>4.0.0</modelVersion>
44
<groupId>com.taosdata.jdbc</groupId>
55
<artifactId>taos-jdbcdriver</artifactId>
6-
<version>3.6.2</version>
6+
<version>3.6.3</version>
77

88
<packaging>jar</packaging>
99
<name>JDBCDriver</name>

src/main/java/com/taosdata/jdbc/ws/BlockResultSet.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,6 @@ public Object parseValue(int columnIndex) {
4242
return DataTypeConverUtil.parseValue(type, source);
4343
}
4444

45-
public Object parseValueWithZoneId(int columnIndex, ZoneId zoneId) {
46-
Object source = result.get(columnIndex - 1).get(rowIndex);
47-
if (null == source)
48-
return null;
49-
50-
int type = fields.get(columnIndex - 1).getTaosType();
51-
return DataTypeConverUtil.parseValue(type, source);
52-
}
53-
5445
@Override
5546
public String getString(int columnIndex) throws SQLException {
5647
checkAvailability(columnIndex, fields.size());
@@ -505,15 +496,4 @@ public Statement getStatement() throws SQLException {
505496
public Timestamp getTimestamp(int columnIndex, Calendar cal) throws SQLException {
506497
return getTimestamp(columnIndex);
507498
}
508-
509-
// ceil(numOfRows/8.0)
510-
private int BitmapLen(int n) {
511-
return (n + 0x7) >> 3;
512-
}
513-
514-
private boolean isNull(byte[] c, int n) {
515-
int position = n >>> 3;
516-
int index = n & 0x7;
517-
return (c[position] & (1 << (7 - index))) == (1 << (7 - index));
518-
}
519499
}

src/main/java/com/taosdata/jdbc/ws/Transport.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ public void checkServerTrusted(X509Certificate[] certs, String authType) {
101101
this.connectionParam = param;
102102
this.wsFunction = function;
103103

104-
this.timeout = param.getRequestTimeout();
104+
setTimeout(param.getRequestTimeout());
105105
}
106106

107107
public void setTextMessageHandler(Consumer<String> textMessageHandler) {
@@ -117,6 +117,11 @@ public void setBinaryMessageHandler(Consumer<ByteBuffer> binaryMessageHandler) {
117117
}
118118

119119
public void setTimeout(long timeout) {
120+
if (timeout < 0){
121+
timeout = DEFAULT_MESSAGE_WAIT_TIMEOUT;
122+
} else if (timeout == 0){
123+
timeout = Integer.MAX_VALUE;
124+
}
120125
this.timeout = timeout;
121126
}
122127

src/main/java/com/taosdata/jdbc/ws/WSStatement.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -121,9 +121,6 @@ public void setQueryTimeout(int seconds) throws SQLException {
121121
if (seconds < 0)
122122
throw TSDBError.createSQLException(TSDBErrorNumbers.ERROR_INVALID_VARIABLE);
123123

124-
if (seconds == 0){
125-
seconds = Integer.MAX_VALUE;
126-
}
127124
this.queryTimeout = seconds;
128125
transport.setTimeout(seconds * 1000L);
129126
}

src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ private ConsumerRecords<V> doPoll(Duration timeout, Deserializer<V> deserializer
177177
}
178178

179179
ConsumerRecords<V> records = new ConsumerRecords<>();
180-
try (WSConsumerResultSet rs = new WSConsumerResultSet(transport, factory, pollResp.getMessageId(), pollResp.getDatabase())) {
180+
try (WSConsumerResultSet rs = new WSConsumerResultSet(transport, factory, pollResp.getMessageId(), pollResp.getDatabase(), param.getConnectionParam().getZoneId())) {
181181
if (deserializer instanceof MapEnhanceDeserializer){
182182
ConsumerRecords<TMQEnhMap> resultRecords = rs.handleSubscribeDB(pollResp);
183183
return (ConsumerRecords<V>) resultRecords;

src/main/java/com/taosdata/jdbc/ws/tmq/WSConsumerResultSet.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,14 @@ public class WSConsumerResultSet extends AbstractResultSet {
5151

5252
protected int numOfRows = 0;
5353
protected int rowIndex = 0;
54+
protected final ZoneId zoneId;
5455

55-
public WSConsumerResultSet(Transport transport, TMQRequestFactory factory, long messageId, String database) {
56+
public WSConsumerResultSet(Transport transport, TMQRequestFactory factory, long messageId, String database, ZoneId zoneId) {
5657
this.transport = transport;
5758
this.factory = factory;
5859
this.messageId = messageId;
5960
this.database = database;
61+
this.zoneId = zoneId;
6062
}
6163

6264
private boolean forward() {
@@ -114,7 +116,7 @@ public ConsumerRecords<TMQEnhMap> handleSubscribeDB(PollResp pollResp) throws SQ
114116
if (Code.SUCCESS.getCode() != fetchResp.getCode())
115117
throw TSDBError.createSQLException(fetchResp.getCode(), fetchResp.getMessage());
116118

117-
return fetchResp.getEhnMapList(pollResp);
119+
return fetchResp.getEhnMapList(pollResp, zoneId);
118120
}
119121

120122
@Override
@@ -286,7 +288,7 @@ public Timestamp getTimestamp(int columnIndex) throws SQLException {
286288
return (Timestamp) value;
287289
if (value instanceof Long) {
288290
Instant instant = DateTimeUtils.parseTimestampColumnData((long) value, this.timestampPrecision);
289-
return DateTimeUtils.getTimestamp(instant, null);
291+
return DateTimeUtils.getTimestamp(instant, zoneId);
290292
}
291293
Timestamp ret;
292294
try {
@@ -491,11 +493,8 @@ public Object parseValue(int columnIndex) {
491493
if (type == TSDB_DATA_TYPE_TIMESTAMP){
492494
Long o = (Long)DataTypeConverUtil.parseValue(type, source);
493495
Instant instant = DateTimeUtils.parseTimestampColumnData(o, this.timestampPrecision);
494-
return DateTimeUtils.getTimestamp(instant, null);
496+
return DateTimeUtils.getTimestamp(instant, zoneId);
495497
}
496498
return DataTypeConverUtil.parseValue(type, source);
497499
}
498-
499-
// ceil(numOfRows/8.0)
500-
501500
}

src/main/java/com/taosdata/jdbc/ws/tmq/entity/FetchRawBlockResp.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
import com.taosdata.jdbc.rs.RestfulResultSet;
77
import com.taosdata.jdbc.rs.RestfulResultSetMetaData;
88
import com.taosdata.jdbc.tmq.*;
9+
import com.taosdata.jdbc.utils.DataTypeConverUtil;
10+
import com.taosdata.jdbc.utils.DateTimeUtils;
911
import com.taosdata.jdbc.utils.DecimalUtil;
1012
import com.taosdata.jdbc.ws.tmq.ConsumerAction;
1113
import com.taosdata.jdbc.ws.entity.Response;
@@ -14,6 +16,9 @@
1416
import java.nio.ByteBuffer;
1517
import java.nio.charset.StandardCharsets;
1618
import java.sql.SQLException;
19+
import java.sql.Timestamp;
20+
import java.time.Instant;
21+
import java.time.ZoneId;
1722
import java.util.ArrayList;
1823
import java.util.HashMap;
1924
import java.util.LinkedList;
@@ -163,7 +168,7 @@ public void parseBlockInfos() throws SQLException {
163168
rows = resultData.get(0).size();
164169
}
165170
}
166-
public ConsumerRecords<TMQEnhMap> getEhnMapList(PollResp pollResp) throws SQLException {
171+
public ConsumerRecords<TMQEnhMap> getEhnMapList(PollResp pollResp, ZoneId zoneId) throws SQLException {
167172
skipHead();
168173
int blockNum = buffer.getInt();
169174
int cols = 0;
@@ -219,7 +224,15 @@ public ConsumerRecords<TMQEnhMap> getEhnMapList(PollResp pollResp) throws SQLExc
219224
for (int j = 0; j < lineNum; j++) {
220225
HashMap<String, Object> lineDataMap = new HashMap<>();
221226
for (int k = 0; k < cols; k++) {
222-
lineDataMap.put(columnNames.get(k), resultData.get(k).get(j));
227+
if (fields.get(k).getTaosType() == TSDB_DATA_TYPE_TIMESTAMP){
228+
Long o = (Long) DataTypeConverUtil.parseValue(TSDB_DATA_TYPE_TIMESTAMP, resultData.get(k).get(j));
229+
Instant instant = DateTimeUtils.parseTimestampColumnData(o, precision);
230+
Timestamp t = DateTimeUtils.getTimestamp(instant, zoneId);
231+
lineDataMap.put(columnNames.get(k), t);
232+
continue;
233+
}
234+
Object o = DataTypeConverUtil.parseValue(fields.get(k).getTaosType(), resultData.get(k).get(j));
235+
lineDataMap.put(columnNames.get(k), o);
223236
}
224237
TMQEnhMap map = new TMQEnhMap(tableName, lineDataMap);
225238
ConsumerRecord<TMQEnhMap> r = new ConsumerRecord.Builder<TMQEnhMap>()

src/test/java/com/taosdata/jdbc/ws/WSConsumerResultSetTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ public class WSConsumerResultSetTest {
1616

1717
@Before
1818
public void setUp() {
19-
wsConsumerResultSet = new WSConsumerResultSet(null, null, 0, null);
19+
wsConsumerResultSet = new WSConsumerResultSet(null, null, 0, null, null);
2020
}
2121

2222
@Test

src/test/java/com/taosdata/jdbc/ws/WSConsumerSubscribeDBTest.java

Lines changed: 96 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import org.junit.Before;
1010
import org.junit.Test;
1111

12-
import java.sql.Connection;
13-
import java.sql.DriverManager;
14-
import java.sql.SQLException;
15-
import java.sql.Statement;
12+
import java.math.BigDecimal;
13+
import java.math.BigInteger;
14+
import java.sql.*;
1615
import java.time.Duration;
1716
import java.util.Collections;
1817
import java.util.Properties;
@@ -24,6 +23,7 @@ public class WSConsumerSubscribeDBTest {
2423
private static final String dbName = "tmq_ws_enh_test";
2524
private static final String superTable1 = "st1";
2625
private static final String superTable2 = "st2";
26+
private static final String superTableFullType = "st3";
2727
private static Connection connection;
2828
private static Statement statement;
2929
private static String[] topics = {"topic_ws_map"};
@@ -41,7 +41,7 @@ public void testWSEhnMapDB() throws Exception {
4141
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
4242
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
4343
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
44-
properties.setProperty(TMQConstants.GROUP_ID, "ws_map");
44+
properties.setProperty(TMQConstants.GROUP_ID, "ws_map1");
4545
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapEnhanceDeserializer");
4646
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
4747
properties.setProperty("fetch.max.wait.ms", "5000");
@@ -84,6 +84,90 @@ public void testWSEhnMapDB() throws Exception {
8484
}
8585
}
8686

87+
@Test
88+
public void testWSEhnMapDBAllType() throws Exception {
89+
AtomicInteger a = new AtomicInteger(1);
90+
String topic = topics[0];
91+
// create topic
92+
statement.executeUpdate("create topic if not exists " + topic + " as database " + dbName);
93+
94+
Properties properties = new Properties();
95+
properties.setProperty(TMQConstants.CONNECT_USER, "root");
96+
properties.setProperty(TMQConstants.CONNECT_PASS, "taosdata");
97+
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
98+
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
99+
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
100+
properties.setProperty(TMQConstants.GROUP_ID, "ws_map2");
101+
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapEnhanceDeserializer");
102+
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
103+
properties.setProperty("fetch.max.wait.ms", "5000");
104+
properties.setProperty("min.poll.rows", "1000");
105+
106+
boolean pass = false;
107+
try (TaosConsumer<TMQEnhMap> consumer = new TaosConsumer<>(properties)) {
108+
consumer.subscribe(Collections.singletonList(topic));
109+
for (int i = 0; i < 10; i++) {
110+
ConsumerRecords<TMQEnhMap> consumerRecords = consumer.poll(Duration.ofMillis(100));
111+
if (i == 0){
112+
statement.executeUpdate("insert into " + dbName + ".ct0 values(1747474225447, 1, 100, 2.2, 2.3, '1', 12, 2, true, '一', 'POINT(1 1)', '\\x0101', 1.2234, 1747474225448, 255, 65535, 4294967295, 18446744073709551615, -12345678901234567890123.4567890000, 12345678.901234)");
113+
}
114+
if (consumerRecords.isEmpty()){
115+
continue;
116+
}
117+
for (ConsumerRecord<TMQEnhMap> r : consumerRecords) {
118+
if (r.value().getTableName().equalsIgnoreCase("ct0")){
119+
Assert.assertEquals(20, r.value().getMap().size());
120+
121+
Assert.assertTrue(r.value().getMap().get("ts") instanceof Timestamp);
122+
Assert.assertTrue(r.value().getMap().get("c1") instanceof Integer);
123+
Assert.assertTrue(r.value().getMap().get("c2") instanceof Long);
124+
Assert.assertTrue(r.value().getMap().get("c3") instanceof Float);
125+
Assert.assertTrue(r.value().getMap().get("c4") instanceof Double);
126+
Assert.assertTrue(r.value().getMap().get("c5") instanceof byte[]);
127+
Assert.assertTrue(r.value().getMap().get("c6") instanceof Short);
128+
Assert.assertTrue(r.value().getMap().get("c7") instanceof Byte);
129+
Assert.assertTrue(r.value().getMap().get("c8") instanceof Boolean);
130+
Assert.assertTrue(r.value().getMap().get("c9") instanceof String);
131+
Assert.assertTrue(r.value().getMap().get("c10") instanceof byte[]);
132+
Assert.assertTrue(r.value().getMap().get("c11") instanceof byte[]);
133+
Assert.assertTrue(r.value().getMap().get("c12") instanceof Double);
134+
Assert.assertTrue(r.value().getMap().get("c13") instanceof Timestamp);
135+
Assert.assertTrue(r.value().getMap().get("c14") instanceof Short);
136+
Assert.assertTrue(r.value().getMap().get("c15") instanceof Integer);
137+
Assert.assertTrue(r.value().getMap().get("c16") instanceof Long);
138+
Assert.assertTrue(r.value().getMap().get("c17") instanceof BigInteger);
139+
Assert.assertTrue(r.value().getMap().get("c18") instanceof BigDecimal);
140+
Assert.assertTrue(r.value().getMap().get("c19") instanceof BigDecimal);
141+
142+
Assert.assertEquals(new Timestamp(1747474225447L), r.value().getMap().get("ts"));
143+
Assert.assertEquals(r.value().getMap().get("c1"), 1);
144+
Assert.assertEquals(100L, r.value().getMap().get("c2"));
145+
Assert.assertEquals(2.2F, r.value().getMap().get("c3"));
146+
Assert.assertEquals(2.3, r.value().getMap().get("c4"));
147+
Assert.assertArrayEquals("1".getBytes(), (byte[]) r.value().getMap().get("c5"));
148+
Assert.assertEquals((short) 12, r.value().getMap().get("c6"));
149+
Assert.assertEquals((byte) 2, r.value().getMap().get("c7"));
150+
Assert.assertEquals(true, r.value().getMap().get("c8"));
151+
Assert.assertEquals("一", r.value().getMap().get("c9"));
152+
Assert.assertArrayEquals(new byte[]{1, 1}, (byte[]) r.value().getMap().get("c11"));
153+
Assert.assertEquals(1.2234, r.value().getMap().get("c12"));
154+
Assert.assertEquals(new Timestamp(1747474225448L), r.value().getMap().get("c13"));
155+
Assert.assertEquals((short) 255, r.value().getMap().get("c14"));
156+
Assert.assertEquals(65535, r.value().getMap().get("c15"));
157+
Assert.assertEquals(4294967295L, r.value().getMap().get("c16"));
158+
Assert.assertEquals(new BigInteger("18446744073709551615"), r.value().getMap().get("c17"));
159+
Assert.assertEquals(new BigDecimal("-12345678901234567890123.4567890000"), r.value().getMap().get("c18"));
160+
Assert.assertEquals(new BigDecimal("12345678.901234"), r.value().getMap().get("c19"));
161+
pass = true;
162+
}
163+
}
164+
}
165+
166+
consumer.unsubscribe();
167+
Assert.assertTrue(pass);
168+
}
169+
}
170+
87171
@Test
88172
public void testWSEhnMapStable() throws Exception {
89173
AtomicInteger a = new AtomicInteger(1);
@@ -97,7 +181,7 @@ public void testWSEhnMapStable() throws Exception {
97181
properties.setProperty(TMQConstants.BOOTSTRAP_SERVERS, "127.0.0.1:6041");
98182
properties.setProperty(TMQConstants.MSG_WITH_TABLE_NAME, "true");
99183
properties.setProperty(TMQConstants.ENABLE_AUTO_COMMIT, "true");
100-
properties.setProperty(TMQConstants.GROUP_ID, "ws_map");
184+
properties.setProperty(TMQConstants.GROUP_ID, "ws_map3");
101185
properties.setProperty(TMQConstants.VALUE_DESERIALIZER, "com.taosdata.jdbc.tmq.MapEnhanceDeserializer");
102186
properties.setProperty(TMQConstants.CONNECT_TYPE, "ws");
103187
properties.setProperty("fetch.max.wait.ms", "5000");
@@ -164,6 +248,12 @@ public void before() throws SQLException {
164248

165249
statement.execute("create stable if not exists " + superTable2
166250
+ " (ts timestamp, cc1 int) tags(t1 int, t2 int)");
251+
252+
statement.execute("create stable if not exists " + superTableFullType
253+
+ " (ts timestamp, c1 int, c2 bigint, c3 float, c4 double, c5 binary(10), c6 SMALLINT, c7 TINYINT, " +
254+
"c8 BOOL, c9 nchar(100), c10 GEOMETRY(100), c11 VARBINARY(100), c12 double, c13 timestamp, " +
255+
"c14 tinyint unsigned, c15 smallint unsigned, c16 int unsigned, c17 bigint unsigned, c18 decimal(38, 10), c19 decimal(18, 6)) tags(t1 int)");
256+
statement.execute("create table if not exists ct0 using " + superTableFullType + " tags(1000)");
167257
}
168258

169259
@After

0 commit comments

Comments
 (0)