@@ -186,9 +186,10 @@ wstran_pipe_recv_cb(void *arg)
186186 bool need_more = false;
187187 ws_pipe * p = arg ;
188188 nni_iov iov [2 ];
189- uint8_t rv , pos = 1 ;
190- uint64_t len = 0 ;
189+ int rv = 0 ;
190+ uint8_t pos = 1 ;
191191 uint8_t * ptr ;
192+ uint64_t len = 0 ;
192193 nni_msg * smsg = NULL , * msg = NULL , * conn_msg = NULL ;
193194 nni_msg * * msg_vec = NULL ;
194195 nni_aio * raio = p -> rxaio ;
@@ -265,16 +266,17 @@ wstran_pipe_recv_cb(void *arg)
265266 }
266267 if (p -> wantrxhead > p -> gotrxhead )
267268 goto recv ;
269+
270+ // Parse as many complete MQTT packets as possible from tmp_msg without
271+ // trusting Remaining Length blindly (prevents OOB reads).
272+ size_t index = 0 ;
268273 // Negotiation shall be processed alone
269274 if ((* ptr & 0xF0 ) == CMD_CONNECT ) {
270275 conn_msg = p -> tmp_msg ;
271276 p -> tmp_msg = NULL ;
272277 goto done ;
273278 }
274279
275- // Parse as many complete MQTT packets as possible from tmp_msg without
276- // trusting Remaining Length blindly (prevents OOB reads).
277- size_t index = 0 ;
278280 uint8_t * baseptr = ptr ;
279281 // At least one msg is ready when we got here.
280282 while (index < p -> gotrxhead ) {
@@ -298,7 +300,7 @@ wstran_pipe_recv_cb(void *arg)
298300 }
299301 if (len > (uint64_t ) p -> conf -> max_packet_size ) {
300302 rv = NNG_EMSGSIZE ;
301- p -> err_code = NMQ_PACKET_TOO_LARGE ;
303+ p -> err_code = PACKET_TOO_LARGE ;
302304 goto reset ;
303305 }
304306 if (len > (uint64_t ) SIZE_MAX - (size_t ) pos ) {
@@ -377,7 +379,7 @@ wstran_pipe_recv_cb(void *arg)
377379 }
378380 if (p -> gotrxhead + p -> wantrxhead > p -> conf -> max_packet_size ) {
379381 log_trace ("size error 0x95\n" );
380- rv = NMQ_PACKET_TOO_LARGE ;
382+ rv = NNG_ECLOSED ;
381383 p -> err_code = PACKET_TOO_LARGE ;
382384 goto skip ;
383385 }
@@ -518,7 +520,7 @@ wstran_pipe_recv_cb(void *arg)
518520 if (nmq_unsubinfo_decode (vmsg , p -> npipe -> subinfol ,
519521 p -> ws_param -> pro_ver ) < 0 ) {
520522 log_error ("Invalid unsubscribe packet!" );
521- p -> err_code = PROTOCOL_ERROR ;
523+ rv = PROTOCOL_ERROR ;
522524 goto skip ;
523525 }
524526 } else if (cmd == CMD_SUBSCRIBE ) {
@@ -527,7 +529,7 @@ wstran_pipe_recv_cb(void *arg)
527529 if (nmq_subinfo_decode (vmsg , p -> npipe -> subinfol ,
528530 p -> ws_param -> pro_ver ) < 0 ) {
529531 log_error ("Invalid subscribe packet!" );
530- p -> err_code = PROTOCOL_ERROR ;
532+ rv = PROTOCOL_ERROR ;
531533 goto skip ;
532534 }
533535 }
@@ -596,7 +598,7 @@ wstran_pipe_recv_cb(void *arg)
596598 else
597599 log_warn ("WebSocket uaio is aborted already!" );
598600 nni_mtx_unlock (& p -> mtx );
599- if (p -> err_code != SUCCESS && msg_vec != NULL ) {
601+ if (rv != SUCCESS && msg_vec != NULL ) {
600602 // Cannot trust the rest msgs
601603 nni_lmq_flush (& p -> recvlmq );
602604 nni_msg_free (smsg );
@@ -668,7 +670,6 @@ wstran_pipe_recv(void *arg, nni_aio *aio)
668670 nni_mtx_lock (& p -> mtx );
669671 // Get msg from recv lmq
670672 if (nni_lmq_get (& p -> recvlmq , & msg ) == 0 ) {
671- log_error ("out lmq msg %p" , msg );
672673 nni_aio_set_msg (aio , msg );
673674 nni_mtx_unlock (& p -> mtx );
674675 nni_aio_finish (aio , 0 , nni_msg_len (msg ));
0 commit comments