@@ -319,6 +319,7 @@ ssize_t neu_conn_send(neu_conn_t *conn, uint8_t *buf, ssize_t len)
319
319
320
320
if (conn -> is_connected ) {
321
321
switch (conn -> param .type ) {
322
+ case NEU_CONN_UDP_TO :
322
323
case NEU_CONN_TCP_SERVER :
323
324
assert (false);
324
325
break ;
@@ -378,6 +379,7 @@ ssize_t neu_conn_recv(neu_conn_t *conn, uint8_t *buf, ssize_t len)
378
379
}
379
380
380
381
switch (conn -> param .type ) {
382
+ case NEU_CONN_UDP_TO :
381
383
case NEU_CONN_TCP_SERVER :
382
384
zlog_fatal (conn -> param .log , "neu_conn_recv cann't recv tcp server msg" );
383
385
assert (1 == 0 );
@@ -432,6 +434,109 @@ ssize_t neu_conn_recv(neu_conn_t *conn, uint8_t *buf, ssize_t len)
432
434
433
435
return ret ;
434
436
}
437
+ ssize_t neu_conn_udp_sendto (neu_conn_t * conn , uint8_t * buf , ssize_t len ,
438
+ void * dst )
439
+ {
440
+ ssize_t ret = 0 ;
441
+
442
+ pthread_mutex_lock (& conn -> mtx );
443
+ if (conn -> stop ) {
444
+ pthread_mutex_unlock (& conn -> mtx );
445
+ return ret ;
446
+ }
447
+
448
+ if (!conn -> is_connected ) {
449
+ conn_connect (conn );
450
+ }
451
+
452
+ if (conn -> is_connected ) {
453
+ switch (conn -> param .type ) {
454
+ case NEU_CONN_TCP_CLIENT :
455
+ case NEU_CONN_TCP_SERVER :
456
+ case NEU_CONN_TTY_CLIENT :
457
+ case NEU_CONN_UDP :
458
+ assert (false);
459
+ break ;
460
+ case NEU_CONN_UDP_TO :
461
+ if (conn -> block ) {
462
+ ret = sendto (conn -> fd , buf , len , MSG_NOSIGNAL ,
463
+ (struct sockaddr * ) dst , sizeof (struct sockaddr ));
464
+ } else {
465
+ ret = sendto (conn -> fd , buf , len , MSG_NOSIGNAL | MSG_DONTWAIT ,
466
+ (struct sockaddr * ) dst , sizeof (struct sockaddr ));
467
+ }
468
+ break ;
469
+ }
470
+ if (ret != len ) {
471
+ zlog_error (
472
+ conn -> param .log ,
473
+ "conn udp fd: %d, sendto (%s:%d) buf len: %zd, ret: %zd, "
474
+ "errno: %s(%d)" ,
475
+ conn -> fd , inet_ntoa (((struct sockaddr_in * ) dst )-> sin_addr ),
476
+ htons (((struct sockaddr_in * ) dst )-> sin_port ), len , ret ,
477
+ strerror (errno ), errno );
478
+ }
479
+
480
+ if (ret == -1 && errno != EAGAIN ) {
481
+ conn_disconnect (conn );
482
+ }
483
+
484
+ if (ret > 0 && conn -> callback_trigger == false) {
485
+ conn -> connected (conn -> data , conn -> fd );
486
+ conn -> callback_trigger = true;
487
+ }
488
+ }
489
+
490
+ if (ret > 0 ) {
491
+ conn -> state .send_bytes += ret ;
492
+ }
493
+
494
+ pthread_mutex_unlock (& conn -> mtx );
495
+
496
+ return ret ;
497
+ }
498
+
499
+ ssize_t neu_conn_udp_recvfrom (neu_conn_t * conn , uint8_t * buf , ssize_t len ,
500
+ void * src )
501
+ {
502
+ ssize_t ret = 0 ;
503
+ socklen_t addr_len = sizeof (struct sockaddr_in );
504
+
505
+ pthread_mutex_lock (& conn -> mtx );
506
+ if (conn -> stop ) {
507
+ pthread_mutex_unlock (& conn -> mtx );
508
+ return ret ;
509
+ }
510
+
511
+ switch (conn -> param .type ) {
512
+ case NEU_CONN_TCP_SERVER :
513
+ case NEU_CONN_TCP_CLIENT :
514
+ case NEU_CONN_TTY_CLIENT :
515
+ case NEU_CONN_UDP :
516
+ assert (1 == 0 );
517
+ break ;
518
+ case NEU_CONN_UDP_TO :
519
+ ret =
520
+ recvfrom (conn -> fd , buf , len , 0 , (struct sockaddr * ) src , & addr_len );
521
+ break ;
522
+ }
523
+ if (ret <= 0 ) {
524
+ zlog_error (conn -> param .log ,
525
+ "conn udp fd: %d, recv buf len %zd, ret: %zd, errno: %s(%d)" ,
526
+ conn -> fd , len , ret , strerror (errno ), errno );
527
+ if (ret == 0 || (ret == -1 && errno != EAGAIN )) {
528
+ conn_disconnect (conn );
529
+ }
530
+ }
531
+
532
+ pthread_mutex_unlock (& conn -> mtx );
533
+
534
+ if (ret > 0 ) {
535
+ conn -> state .recv_bytes += ret ;
536
+ }
537
+
538
+ return ret ;
539
+ }
435
540
436
541
void neu_conn_connect (neu_conn_t * conn )
437
542
{
@@ -471,6 +576,9 @@ static void conn_free_param(neu_conn_t *conn)
471
576
free (conn -> param .params .udp .src_ip );
472
577
free (conn -> param .params .udp .dst_ip );
473
578
break ;
579
+ case NEU_CONN_UDP_TO :
580
+ free (conn -> param .params .udpto .src_ip );
581
+ break ;
474
582
case NEU_CONN_TTY_CLIENT :
475
583
free (conn -> param .params .tty_client .device );
476
584
break ;
@@ -501,21 +609,13 @@ static void conn_init_param(neu_conn_t *conn, neu_conn_param_t *param)
501
609
} else {
502
610
conn -> block = false;
503
611
}
504
- zlog_notice (conn -> param .log , "tcp server %s:%d, timeout: %d, block: %d" ,
505
- conn -> param .params .tcp_server .ip ,
506
- conn -> param .params .tcp_server .port ,
507
- conn -> param .params .tcp_server .timeout , conn -> block );
508
612
break ;
509
613
case NEU_CONN_TCP_CLIENT :
510
614
conn -> param .params .tcp_client .ip = strdup (param -> params .tcp_client .ip );
511
615
conn -> param .params .tcp_client .port = param -> params .tcp_client .port ;
512
616
conn -> param .params .tcp_client .timeout =
513
617
param -> params .tcp_client .timeout ;
514
618
conn -> block = conn -> param .params .tcp_client .timeout > 0 ;
515
- zlog_notice (conn -> param .log , "tcp client %s:%d, timeout: %d, block: %d" ,
516
- conn -> param .params .tcp_client .ip ,
517
- conn -> param .params .tcp_client .port ,
518
- conn -> param .params .tcp_client .timeout , conn -> block );
519
619
break ;
520
620
case NEU_CONN_UDP :
521
621
conn -> param .params .udp .src_ip = strdup (param -> params .udp .src_ip );
@@ -524,11 +624,12 @@ static void conn_init_param(neu_conn_t *conn, neu_conn_param_t *param)
524
624
conn -> param .params .udp .dst_port = param -> params .udp .dst_port ;
525
625
conn -> param .params .udp .timeout = param -> params .udp .timeout ;
526
626
conn -> block = conn -> param .params .udp .timeout > 0 ;
527
- zlog_notice (
528
- conn -> param .log , "udp %s:%d -> %s:%d, timeout: %d, block: %d" ,
529
- conn -> param .params .udp .src_ip , conn -> param .params .udp .src_port ,
530
- conn -> param .params .udp .dst_ip , conn -> param .params .udp .dst_port ,
531
- conn -> param .params .udp .timeout , conn -> block );
627
+ break ;
628
+ case NEU_CONN_UDP_TO :
629
+ conn -> param .params .udpto .src_ip = strdup (param -> params .udpto .src_ip );
630
+ conn -> param .params .udpto .src_port = param -> params .udpto .src_port ;
631
+ conn -> param .params .udpto .timeout = param -> params .udpto .timeout ;
632
+ conn -> block = conn -> param .params .udpto .timeout > 0 ;
532
633
break ;
533
634
case NEU_CONN_TTY_CLIENT :
534
635
conn -> param .params .tty_client .device =
@@ -651,9 +752,9 @@ static void conn_connect(neu_conn_t *conn)
651
752
conn -> is_connected = false;
652
753
return ;
653
754
} else {
654
- zlog_notice (conn -> param .log , "connect %s:%d, block: %d success" ,
755
+ zlog_notice (conn -> param .log , "connect %s:%d success" ,
655
756
conn -> param .params .tcp_client .ip ,
656
- conn -> param .params .tcp_client .port , conn -> block );
757
+ conn -> param .params .tcp_client .port );
657
758
conn -> is_connected = true;
658
759
conn -> fd = fd ;
659
760
}
@@ -714,10 +815,57 @@ static void conn_connect(neu_conn_t *conn)
714
815
}
715
816
break ;
716
817
}
818
+ case NEU_CONN_UDP_TO : {
819
+ if (conn -> block ) {
820
+ struct timeval tv = {
821
+ .tv_sec = conn -> param .params .udpto .timeout / 1000 ,
822
+ .tv_usec = (conn -> param .params .udpto .timeout % 1000 ) * 1000 ,
823
+ };
824
+
825
+ fd = socket (AF_INET , SOCK_DGRAM , IPPROTO_UDP );
826
+ setsockopt (fd , SOL_SOCKET , SO_RCVTIMEO , & tv , sizeof (tv ));
827
+ setsockopt (fd , SOL_SOCKET , SO_SNDTIMEO , & tv , sizeof (tv ));
828
+ } else {
829
+ fd = socket (AF_INET , SOCK_DGRAM | SOCK_NONBLOCK , IPPROTO_UDP );
830
+ }
831
+ int so_broadcast = 1 ;
832
+ setsockopt (fd , SOL_SOCKET , SO_BROADCAST , & so_broadcast ,
833
+ sizeof (so_broadcast ));
834
+ struct sockaddr_in local = {
835
+ .sin_family = AF_INET ,
836
+ .sin_port = htons (conn -> param .params .udpto .src_port ),
837
+ .sin_addr .s_addr = inet_addr (conn -> param .params .udpto .src_ip ),
838
+ };
839
+
840
+ ret = bind (fd , (struct sockaddr * ) & local , sizeof (struct sockaddr_in ));
841
+ if (ret != 0 ) {
842
+ close (fd );
843
+ zlog_error (conn -> param .log , "bind %s:%d error: %s(%d)" ,
844
+ conn -> param .params .udpto .src_ip ,
845
+ conn -> param .params .udpto .src_port , strerror (errno ),
846
+ errno );
847
+ conn -> is_connected = false;
848
+ return ;
849
+ }
850
+
851
+ conn -> is_connected = true;
852
+ conn -> fd = fd ;
853
+
854
+ break ;
855
+ }
717
856
case NEU_CONN_TTY_CLIENT : {
718
857
struct termios tty_opt = { 0 };
719
-
858
+ #ifdef NEU_SMART_LINK
859
+ #include "connection/neu_smart_link.h"
860
+ ret =
861
+ neu_conn_smart_link_auto_set (conn -> param .params .tty_client .device );
862
+ zlog_notice (conn -> param .log , "smart link ret: %d" , ret );
863
+ if (ret > 0 ) {
864
+ fd = ret ;
865
+ }
866
+ #else
720
867
fd = open (conn -> param .params .tty_client .device , O_RDWR | O_NOCTTY , 0 );
868
+ #endif
721
869
if (fd <= 0 ) {
722
870
zlog_error (conn -> param .log , "open %s error: %s(%d)" ,
723
871
conn -> param .params .tty_client .device , strerror (errno ),
@@ -894,6 +1042,7 @@ static void conn_disconnect(neu_conn_t *conn)
894
1042
break ;
895
1043
case NEU_CONN_TCP_CLIENT :
896
1044
case NEU_CONN_UDP :
1045
+ case NEU_CONN_UDP_TO :
897
1046
case NEU_CONN_TTY_CLIENT :
898
1047
if (conn -> fd > 0 ) {
899
1048
close (conn -> fd );
@@ -970,15 +1119,13 @@ int neu_conn_stream_consume(neu_conn_t *conn, void *context,
970
1119
} else if (used == -1 ) {
971
1120
neu_conn_disconnect (conn );
972
1121
break ;
1122
+ } else {
1123
+ conn -> offset -= used ;
1124
+ memmove (conn -> buf , conn -> buf + used , conn -> offset );
1125
+ neu_protocol_unpack_buf_init (& protocol_buf , conn -> buf ,
1126
+ conn -> offset );
973
1127
}
974
1128
}
975
- if (conn -> offset != 0 ) {
976
- conn -> offset -= neu_protocol_unpack_buf_used_size (& protocol_buf );
977
- memmove (conn -> buf ,
978
- conn -> buf +
979
- neu_protocol_unpack_buf_used_size (& protocol_buf ),
980
- conn -> offset );
981
- }
982
1129
}
983
1130
984
1131
return ret ;
@@ -1001,15 +1148,13 @@ int neu_conn_stream_tcp_server_consume(neu_conn_t *conn, int fd, void *context,
1001
1148
} else if (used == -1 ) {
1002
1149
neu_conn_tcp_server_close_client (conn , fd );
1003
1150
break ;
1151
+ } else {
1152
+ conn -> offset -= used ;
1153
+ memmove (conn -> buf , conn -> buf + used , conn -> offset );
1154
+ neu_protocol_unpack_buf_init (& protocol_buf , conn -> buf ,
1155
+ conn -> offset );
1004
1156
}
1005
1157
}
1006
- if (conn -> offset != 0 ) {
1007
- conn -> offset -= neu_protocol_unpack_buf_used_size (& protocol_buf );
1008
- memmove (conn -> buf ,
1009
- conn -> buf +
1010
- neu_protocol_unpack_buf_used_size (& protocol_buf ),
1011
- conn -> offset );
1012
- }
1013
1158
}
1014
1159
return ret ;
1015
1160
}
0 commit comments