1515import com .taosdata .jdbc .ws .FutureResponse ;
1616import com .taosdata .jdbc .ws .InFlightRequest ;
1717import com .taosdata .jdbc .ws .Transport ;
18- import com .taosdata .jdbc .ws .entity .Action ;
1918import com .taosdata .jdbc .ws .entity .Code ;
2019import com .taosdata .jdbc .ws .entity .Request ;
2120import com .taosdata .jdbc .ws .entity .Response ;
2625import java .nio .ByteOrder ;
2726import java .sql .SQLException ;
2827import java .time .Duration ;
29- import java .time .ZoneId ;
3028import java .util .*;
3129import java .util .stream .Collectors ;
3230
@@ -136,46 +134,22 @@ private boolean handleReconnect() throws SQLException {
136134 }
137135 }
138136
139- @ SuppressWarnings ("unchecked" )
140- private ConsumerRecords <V > doPoll (Duration timeout , Deserializer <V > deserializer ) throws SQLException {
141- if (param .isAutoCommit () && (0 != messageId )) {
142- long now = System .currentTimeMillis ();
143- if (now - lastCommitTime > param .getAutoCommitInterval ()) {
144- commitSync ();
145- lastCommitTime = now ;
146- }
137+ private ConsumerRecords <V > getMeta (PollResp pollResp ) throws SQLException {
138+ Request fetchJsonMetaReq = factory .generateFetchJsonMeata (pollResp .getMessageId ());
139+ FetchJsonMetaResp fetchJsonMetaResp = (FetchJsonMetaResp ) transport .send (fetchJsonMetaReq );
140+ if (Code .SUCCESS .getCode () != fetchJsonMetaResp .getCode ()) {
141+ throw new SQLException ("consumer fetch json meta error, code: (0x" + Integer .toHexString (fetchJsonMetaResp .getCode ()) + "), message: " + fetchJsonMetaResp .getMessage ());
147142 }
148143
149- Request request = factory .generatePoll (lastMessageId , timeout .toMillis ());
150- PollResp pollResp = (PollResp ) transport .send (request );
151-
152- if (Code .SUCCESS .getCode () != pollResp .getCode ()) {
153- throw new SQLException ("consumer poll error, code: (0x" + Integer .toHexString (pollResp .getCode ()) + "), message: " + pollResp .getMessage ());
154- }
155- if (!pollResp .isHaveMessage ()) {
144+ if (fetchJsonMetaResp .getData () == null || fetchJsonMetaResp .getData ().getMetas () == null ) {
156145 return ConsumerRecords .emptyRecord ();
157146 }
158147
159- messageId = pollResp .getMessageId ();
160- lastMessageId = messageId ;
161-
162- if (pollResp .getMessageType () == TmqMessageType .TMQ_RES_TABLE_META .getCode () || pollResp .getMessageType () == TmqMessageType .TMQ_RES_METADATA .getCode ()) {
163- Request fetchJsonMetaReq = factory .generateFetchJsonMeata (pollResp .getMessageId ());
164- FetchJsonMetaResp fetchJsonMetaResp = (FetchJsonMetaResp ) transport .send (fetchJsonMetaReq );
165- if (Code .SUCCESS .getCode () != fetchJsonMetaResp .getCode ()) {
166- throw new SQLException ("consumer fetch json meta error, code: (0x" + Integer .toHexString (fetchJsonMetaResp .getCode ()) + "), message: " + fetchJsonMetaResp .getMessage ());
167- }
168-
169- if (fetchJsonMetaResp .getData () == null || fetchJsonMetaResp .getData ().getMetas () == null ) {
170- return ConsumerRecords .emptyRecord ();
171- }
172-
173- ConsumerRecords <V > records = new ConsumerRecords <>();
174-
175- for (Meta meta : fetchJsonMetaResp .getData ().getMetas ()){
176- TopicPartition tp = new TopicPartition (pollResp .getTopic (), pollResp .getVgroupId ());
148+ ConsumerRecords <V > records = new ConsumerRecords <>();
149+ TopicPartition tp = new TopicPartition (pollResp .getTopic (), pollResp .getVgroupId ());
177150
178- ConsumerRecord <V > r = new ConsumerRecord .Builder <V >()
151+ for (Meta meta : fetchJsonMetaResp .getData ().getMetas ()){
152+ ConsumerRecord <V > r = new ConsumerRecord .Builder <V >()
179153 .topic (pollResp .getTopic ())
180154 .dbName (pollResp .getDatabase ())
181155 .vGroupId (pollResp .getVgroupId ())
@@ -184,27 +158,24 @@ private ConsumerRecords<V> doPoll(Duration timeout, Deserializer<V> deserializer
184158 .meta (meta )
185159 .value (null )
186160 .build ();
187- records .put (tp , r );
188- }
189- return records ;
190- }
191-
192- if (pollResp .getMessageType () != TmqMessageType .TMQ_RES_DATA .getCode ()) {
193- return ConsumerRecords .emptyRecord ();
161+ records .put (tp , r );
194162 }
163+ return records ;
164+ }
195165
166+ private ConsumerRecords <V > getData (PollResp pollResp , Deserializer <V > deserializer ) throws SQLException {
196167 ConsumerRecords <V > records = new ConsumerRecords <>();
197168 try (WSConsumerResultSet rs = new WSConsumerResultSet (transport , factory , pollResp .getMessageId (), pollResp .getDatabase (), param .getConnectionParam ().getZoneId ())) {
198169 if (deserializer instanceof MapEnhanceDeserializer ){
199170 ConsumerRecords <TMQEnhMap > resultRecords = rs .handleSubscribeDB (pollResp );
200171 return (ConsumerRecords <V >) resultRecords ;
201172 }
202- while (rs .next ()) {
203- String topic = pollResp .getTopic ();
204- String dbName = pollResp .getDatabase ();
205- int vGroupId = pollResp .getVgroupId ();
206- TopicPartition tp = new TopicPartition (topic , vGroupId );
173+ String topic = pollResp .getTopic ();
174+ String dbName = pollResp .getDatabase ();
175+ int vGroupId = pollResp .getVgroupId ();
176+ TopicPartition tp = new TopicPartition (topic , vGroupId );
207177
178+ while (rs .next ()) {
208179 V v = deserializer .deserialize (rs , topic , dbName );
209180 ConsumerRecord <V > r = new ConsumerRecord .Builder <V >()
210181 .topic (topic )
@@ -221,6 +192,58 @@ private ConsumerRecords<V> doPoll(Duration timeout, Deserializer<V> deserializer
221192 return records ;
222193 }
223194
195+ @ SuppressWarnings ("unchecked" )
196+ private ConsumerRecords <V > doPoll (Duration timeout , Deserializer <V > deserializer ) throws SQLException {
197+ if (param .isAutoCommit () && (0 != messageId )) {
198+ long now = System .currentTimeMillis ();
199+ if (now - lastCommitTime > param .getAutoCommitInterval ()) {
200+ commitSync ();
201+ lastCommitTime = now ;
202+ }
203+ }
204+
205+ Request request = factory .generatePoll (lastMessageId , timeout .toMillis ());
206+ PollResp pollResp = (PollResp ) transport .send (request );
207+
208+ if (Code .SUCCESS .getCode () != pollResp .getCode ()) {
209+ throw new SQLException ("consumer poll error, code: (0x" + Integer .toHexString (pollResp .getCode ()) + "), message: " + pollResp .getMessage ());
210+ }
211+ if (!pollResp .isHaveMessage ()) {
212+ return ConsumerRecords .emptyRecord ();
213+ }
214+
215+ messageId = pollResp .getMessageId ();
216+ lastMessageId = messageId ;
217+
218+ if (pollResp .getMessageType () == TmqMessageType .TMQ_RES_TABLE_META .getCode ()){
219+ return getMeta (pollResp );
220+ }
221+
222+ if (pollResp .getMessageType () == TmqMessageType .TMQ_RES_DATA .getCode ()){
223+ return getData (pollResp , deserializer );
224+ }
225+
226+ if (pollResp .getMessageType () == TmqMessageType .TMQ_RES_METADATA .getCode ()) {
227+ ConsumerRecords <V > metaRecords = getMeta (pollResp );
228+ ConsumerRecords <V > dataRecords = getData (pollResp , deserializer );
229+ if (metaRecords .isEmpty ()){
230+ return dataRecords ;
231+ }
232+ if (dataRecords .isEmpty ()){
233+ return metaRecords ;
234+ }
235+
236+ TopicPartition tp = new TopicPartition (pollResp .getTopic (), pollResp .getVgroupId ());
237+
238+ for (ConsumerRecord <V > r : metaRecords .get (tp )){
239+ dataRecords .put (tp , r );
240+ }
241+
242+ return dataRecords ;
243+ }
244+ return ConsumerRecords .emptyRecord ();
245+ }
246+
224247 @ Override
225248 public ConsumerRecords <V > poll (Duration timeout , Deserializer <V > deserializer ) throws SQLException {
226249
0 commit comments