1515 */
1616
1717#include "node/node_client.h"
18- #include "core/message_bus.h"
18+ #include "agent_compat.h"
19+ #include "agent_config.h"
1920#include "cJSON.h"
21+ #include "core/message_bus.h"
2022#include "infra/config_store.h"
2123#include "tools/tool_registry.h"
22- #include "agent_compat.h"
23- #include "agent_config.h"
2424
2525#include <arpa/inet.h>
2626#include <fcntl.h>
@@ -59,6 +59,9 @@ typedef struct {
5959
6060static node_ws_t s_ws ;
6161static volatile bool s_running = false;
62+ static pthread_t s_thread ;
63+ static bool s_thread_valid = false;
64+ static pthread_mutex_t s_mutex = PTHREAD_MUTEX_INITIALIZER ;
6265static char s_gateway_host [128 ];
6366static int s_gateway_port = 0 ;
6467static char s_gateway_token [256 ];
@@ -73,7 +76,7 @@ static int entropy_func(void* data, unsigned char* output, size_t len)
7376 return 0 ;
7477 }
7578 syslog (LOG_ERR , "[node_client] CRITICAL: No secure entropy source available\n" );
76- return -1 ; /* Generic error - TLS handshake will fail safely */
79+ return -1 ; /* Generic error - TLS handshake will fail safely */
7780}
7881
7982/* ── Raw I/O ───────────────────────────────────────────────── */
@@ -124,9 +127,13 @@ static int read_exact(node_ws_t* ws, void* buf, size_t len)
124127
125128static int ws_send_text (node_ws_t * ws , const char * data , size_t len )
126129{
127- if (!ws -> connected )
130+ pthread_mutex_lock (& s_mutex );
131+ if (!ws -> connected ) {
132+ pthread_mutex_unlock (& s_mutex );
128133 return -1 ;
134+ }
129135
136+ int result = -1 ;
130137 uint8_t header [14 ];
131138 int hlen = 0 ;
132139 header [hlen ++ ] = 0x81 ; /* FIN + text */
@@ -144,14 +151,15 @@ static int ws_send_text(node_ws_t* ws, const char* data, size_t len)
144151 }
145152
146153 uint8_t mask [4 ];
147- entropy_func (NULL , mask , 4 );
154+ if (entropy_func (NULL , mask , 4 ) != 0 )
155+ goto out ;
148156 header [hlen ++ ] = mask [0 ];
149157 header [hlen ++ ] = mask [1 ];
150158 header [hlen ++ ] = mask [2 ];
151159 header [hlen ++ ] = mask [3 ];
152160
153161 if (raw_write (ws , header , hlen ) < 0 )
154- return -1 ;
162+ goto out ;
155163
156164 const uint8_t * src = (const uint8_t * )data ;
157165 uint8_t chunk [256 ];
@@ -162,10 +170,14 @@ static int ws_send_text(node_ws_t* ws, const char* data, size_t len)
162170 for (size_t i = 0 ; i < todo ; i ++ )
163171 chunk [i ] = src [off + i ] ^ mask [(off + i ) & 3 ];
164172 if (raw_write (ws , chunk , todo ) < 0 )
165- return -1 ;
173+ goto out ;
166174 off += todo ;
167175 }
168- return 0 ;
176+ result = 0 ;
177+
178+ out :
179+ pthread_mutex_unlock (& s_mutex );
180+ return result ;
169181}
170182
171183/* ── Protocol helpers ──────────────────────────────────────── */
@@ -196,6 +208,11 @@ static int send_json_request(const char* method, cJSON* params)
196208 params ? params : cJSON_CreateObject ());
197209
198210 char * json = cJSON_PrintUnformatted (frame );
211+ if (!json ) {
212+ syslog (LOG_ERR , "[%s] JSON serialization failed (OOM)\n" , TAG );
213+ cJSON_Delete (frame );
214+ return -1 ;
215+ }
199216 int ret = ws_send_text (& s_ws , json , strlen (json ));
200217 syslog (LOG_DEBUG , "[%s] TX: %s\n" , TAG , method );
201218 free (json );
@@ -319,7 +336,7 @@ static void handle_invoke(cJSON* payload)
319336}
320337
321338/* ── Forward declarations ──────────────────────────────────── */
322- static void do_disconnect (void );
339+ static void do_disconnect_locked (void ); /* caller must hold s_mutex */
323340
324341/* ── TCP + TLS + WS handshake ──────────────────────────────── */
325342static int do_connect (void )
@@ -399,7 +416,10 @@ static int do_connect(void)
399416
400417 /* WebSocket upgrade handshake */
401418 uint8_t raw_key [16 ];
402- entropy_func (NULL , raw_key , sizeof (raw_key ));
419+ if (entropy_func (NULL , raw_key , sizeof (raw_key )) != 0 ) {
420+ syslog (LOG_ERR , "[%s] Failed to generate WS key entropy\n" , TAG );
421+ goto fail ;
422+ }
403423 unsigned char b64_key [32 ] = { 0 };
404424 size_t b64_len = 0 ;
405425 mbedtls_base64_encode (b64_key , sizeof (b64_key ) - 1 , & b64_len , raw_key ,
@@ -431,30 +451,87 @@ static int do_connect(void)
431451 goto fail ;
432452 }
433453
454+ pthread_mutex_lock (& s_mutex );
434455 s_ws .connected = true;
456+ pthread_mutex_unlock (& s_mutex );
435457 syslog (LOG_INFO , "[%s] WebSocket connected to %s:%d\n" , TAG , s_gateway_host ,
436458 s_gateway_port );
437459 return 0 ;
438460
439461fail :
440- do_disconnect ();
462+ pthread_mutex_lock (& s_mutex );
463+ do_disconnect_locked ();
464+ pthread_mutex_unlock (& s_mutex );
441465 return -1 ;
442466}
443467
444- static void do_disconnect (void )
468+ /* Phase 1: Close the underlying fd so any blocking recv/read returns
469+ * immediately with an error. Does NOT free TLS objects — the worker
470+ * thread may still be unwinding through mbedtls_ssl_read when this
471+ * runs. Caller must hold s_mutex. */
472+ static void do_shutdown_fd_locked (void )
473+ {
474+ s_ws .connected = false;
475+ if (s_ws .tls_init ) {
476+ /* Shut down the transport fd; mbedtls_ssl_read will return error */
477+ if (s_ws .net .fd >= 0 ) {
478+ shutdown (s_ws .net .fd , SHUT_RDWR );
479+ close (s_ws .net .fd );
480+ s_ws .net .fd = -1 ;
481+ }
482+ } else if (s_ws .fd >= 0 ) {
483+ shutdown (s_ws .fd , SHUT_RDWR );
484+ close (s_ws .fd );
485+ }
486+ s_ws .fd = -1 ;
487+ }
488+
489+ /* Phase 2: Free TLS library objects. Only safe to call after the
490+ * worker thread has exited (i.e. after pthread_join) or from the
491+ * worker thread itself. Caller must hold s_mutex. */
492+ static void do_free_tls_locked (void )
445493{
446494 if (s_ws .tls_init ) {
447- mbedtls_ssl_close_notify (& s_ws .ssl );
448495 mbedtls_ssl_free (& s_ws .ssl );
449496 mbedtls_ssl_config_free (& s_ws .conf );
450497 mbedtls_ctr_drbg_free (& s_ws .ctr_drbg );
451- mbedtls_net_free (& s_ws .net );
498+ /* net fd already closed in phase 1; just reset the context */
499+ mbedtls_net_init (& s_ws .net );
452500 s_ws .tls_init = false;
453- } else if (s_ws .fd >= 0 ) {
454- close (s_ws .fd );
455501 }
456- s_ws .fd = -1 ;
457- s_ws .connected = false;
502+ }
503+
504+ /* Full disconnect (both phases). Used by the worker thread itself
505+ * where there is no concurrent reader to race against. */
506+ static void do_disconnect_locked (void )
507+ {
508+ do_shutdown_fd_locked ();
509+ do_free_tls_locked ();
510+ }
511+
512+ /* Thread-safe full disconnect for the worker thread's own use */
513+ static void do_disconnect (void )
514+ {
515+ pthread_mutex_lock (& s_mutex );
516+ do_disconnect_locked ();
517+ pthread_mutex_unlock (& s_mutex );
518+ }
519+
520+ /* Interrupt-only disconnect for external callers (stop/start).
521+ * Closes the fd to unblock the worker, but does NOT free TLS objects.
522+ * Caller must pthread_join the worker, then call do_free_tls. */
523+ static void do_shutdown_fd (void )
524+ {
525+ pthread_mutex_lock (& s_mutex );
526+ do_shutdown_fd_locked ();
527+ pthread_mutex_unlock (& s_mutex );
528+ }
529+
530+ static void do_free_tls (void )
531+ {
532+ pthread_mutex_lock (& s_mutex );
533+ do_free_tls_locked ();
534+ pthread_mutex_unlock (& s_mutex );
458535}
459536
460537/* ── WS recv + message dispatch ────────────────────────────── */
@@ -513,7 +590,8 @@ static int ws_recv_frame(node_ws_t* ws, char* buf, size_t buf_size)
513590 pong_hdr [0 ] = 0x8A ; /* FIN + pong */
514591 pong_hdr [1 ] = 0x80 | (uint8_t )read_len ; /* masked, same payload */
515592 uint8_t pmask [4 ];
516- entropy_func (NULL , pmask , 4 );
593+ if (entropy_func (NULL , pmask , 4 ) != 0 )
594+ return -1 ;
517595 memcpy (pong_hdr + 2 , pmask , 4 );
518596 raw_write (ws , pong_hdr , 6 );
519597 if (read_len > 0 ) {
@@ -657,7 +735,8 @@ static void* node_client_thread(void* arg)
657735 uint8_t ping [6 ];
658736 ping [0 ] = 0x89 ; /* FIN + ping */
659737 ping [1 ] = 0x80 ; /* masked, 0 payload */
660- entropy_func (NULL , ping + 2 , 4 ); /* mask key */
738+ if (entropy_func (NULL , ping + 2 , 4 ) != 0 ) /* mask key */
739+ break ;
661740 if (raw_write (& s_ws , ping , 6 ) < 0 )
662741 break ;
663742 last_ping = now ;
@@ -697,13 +776,13 @@ int node_client_start(void)
697776 if (s_running ) {
698777 syslog (LOG_INFO , "[%s] Stopping previous client...\n" , TAG );
699778 s_running = false;
700- do_disconnect ();
701- /* Give the old thread time to exit */
702- for (int i = 0 ; i < 20 ; i ++ ) {
703- usleep (100000 ); /* 100ms */
704- if (!s_ws .connected )
705- break ;
779+ do_shutdown_fd ();
780+ /* Wait for old thread to actually exit */
781+ if (s_thread_valid ) {
782+ pthread_join (s_thread , NULL );
783+ s_thread_valid = false;
706784 }
785+ do_free_tls ();
707786 }
708787
709788 /* Read gateway config */
@@ -720,20 +799,35 @@ int node_client_start(void)
720799 return OK ; /* not an error — just not configured */
721800 }
722801
802+ memset (s_gateway_host , 0 , sizeof (s_gateway_host ));
723803 strncpy (s_gateway_host , host_buf , sizeof (s_gateway_host ) - 1 );
724804 s_gateway_port = port_buf [0 ] ? atoi (port_buf ) : 8080 ;
805+ memset (s_gateway_token , 0 , sizeof (s_gateway_token ));
725806 if (token_buf [0 ])
726807 strncpy (s_gateway_token , token_buf , sizeof (s_gateway_token ) - 1 );
727808 s_use_tls = (s_gateway_port == 443 );
728809
729810 s_running = true;
730- int ret = agent_task_create (node_client_thread , "node_client" ,
731- NODE_CLIENT_STACK , NULL , NODE_CLIENT_PRIO );
732- if (ret != OK ) {
733- syslog (LOG_ERR , "[%s] Failed to create node client thread\n" , TAG );
811+
812+ /* Create joinable thread (not detached) so we can synchronize on stop */
813+ pthread_attr_t attr ;
814+ pthread_attr_init (& attr );
815+ pthread_attr_setstacksize (& attr , NODE_CLIENT_STACK < 4096 ? 4096 : NODE_CLIENT_STACK );
816+
817+ struct sched_param sp ;
818+ sp .sched_priority = NODE_CLIENT_PRIO ;
819+ pthread_attr_setschedparam (& attr , & sp );
820+ pthread_attr_setschedpolicy (& attr , SCHED_FIFO );
821+ pthread_attr_setinheritsched (& attr , PTHREAD_EXPLICIT_SCHED );
822+
823+ int ret = pthread_create (& s_thread , & attr , node_client_thread , NULL );
824+ pthread_attr_destroy (& attr );
825+ if (ret != 0 ) {
826+ syslog (LOG_ERR , "[%s] Failed to create node client thread: %d\n" , TAG , ret );
734827 s_running = false;
735828 return ERROR ;
736829 }
830+ s_thread_valid = true;
737831
738832 syslog (LOG_INFO , "[%s] Node client started → %s:%d\n" , TAG , s_gateway_host ,
739833 s_gateway_port );
@@ -743,17 +837,28 @@ int node_client_start(void)
743837void node_client_stop (void )
744838{
745839 s_running = false;
746- do_disconnect ();
840+ /* Phase 1: close fd to unblock any blocking recv in the worker */
841+ do_shutdown_fd ();
842+ /* Wait for worker thread to fully exit */
843+ if (s_thread_valid ) {
844+ pthread_join (s_thread , NULL );
845+ s_thread_valid = false;
846+ }
847+ /* Phase 2: now safe to free TLS objects — worker is gone */
848+ do_free_tls ();
747849 syslog (LOG_INFO , "[%s] Node client stopped\n" , TAG );
748850}
749851
750852int node_client_send_chat_message (const char * channel , const char * chat_id ,
751853 const char * content )
752854{
855+ pthread_mutex_lock (& s_mutex );
753856 if (!s_ws .connected ) {
857+ pthread_mutex_unlock (& s_mutex );
754858 syslog (LOG_WARNING , "[%s] send_chat_message: not connected\n" , TAG );
755859 return ERROR ;
756860 }
861+ pthread_mutex_unlock (& s_mutex );
757862
758863 /* Use chat.forward — the same event name the gateway uses to push
759864 * messages to nodes, so the gateway already understands this payload
0 commit comments