@@ -130,18 +130,20 @@ inline int sockinfo_udp::poll_os()
130130 int ret;
131131 uint64_t pending_data = 0 ;
132132
133- m_rx_udp_poll_os_ratio_counter = 0 ;
134133 ret = orig_os_api.ioctl (m_fd, FIONREAD, &pending_data);
135- if (unlikely (ret == -1 )) {
134+ if (pending_data > 0 ) {
135+ return pending_data;
136+ }
137+
138+ // No data is available
139+ unset_immediate_os_sample ();
140+
141+ if (ret < 0 ) {
136142 m_p_socket_stats->counters .n_rx_os_errors ++;
137143 si_udp_logdbg (" orig_os_api.ioctl returned with error in polling loop (errno=%d %m)" , errno);
138- return -1 ;
139144 }
140- if (pending_data > 0 ) {
141- m_p_socket_stats->counters .n_rx_poll_os_hit ++;
142- return 1 ;
143- }
144- return 0 ;
145+
146+ return ret;
145147}
146148
147149inline int sockinfo_udp::rx_wait (bool blocking)
@@ -152,31 +154,26 @@ inline int sockinfo_udp::rx_wait(bool blocking)
152154 epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];
153155 uint64_t poll_sn;
154156
155- m_loops_timer.start ();
157+ m_loops_timer.start ();
156158
157159 while (loops_to_go) {
158160
159161 // Multi-thread polling support - let other threads have a go on this CPU
160- if ((m_n_sysvar_rx_poll_yield_loops > 0 ) && ((loops % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1 ))) {
162+ if ((m_n_sysvar_rx_poll_yield_loops > 0 ) && ((loops++ % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1 ))) {
161163 sched_yield ();
162164 }
163165
164- // Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
165- if ((m_n_sysvar_rx_udp_poll_os_ratio > 0 ) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
166- ret = poll_os ();
167- if ((ret == -1 ) || (ret == 1 )) {
168- return ret;
169- }
166+ if (m_b_os_data_available) {
167+ // OS data might be available
168+ return 1 ;
170169 }
171170
172171 // Poll cq for offloaded ready packets ...
173- m_rx_udp_poll_os_ratio_counter++;
174172 if (is_readable (&poll_sn)) {
175173 m_p_socket_stats->counters .n_rx_poll_hit ++;
176174 return 0 ;
177175 }
178176
179- loops++;
180177 if (!blocking || m_n_sysvar_rx_poll_num != -1 ) {
181178 loops_to_go--;
182179 }
@@ -294,7 +291,8 @@ inline int sockinfo_udp::rx_wait(bool blocking)
294291
295292 // Check if OS fd is ready for reading
296293 if (fd == m_fd) {
297- m_rx_udp_poll_os_ratio_counter = 0 ;
294+ // OS data might be available
295+ set_immediate_os_sample ();
298296 return 1 ;
299297 }
300298
@@ -385,7 +383,6 @@ sockinfo_udp::sockinfo_udp(int fd):
385383 ,m_b_mc_tx_loop(safe_mce_sys().tx_mc_loopback_default) // default value is 'true'. User can change this with config parameter SYS_VAR_TX_MC_LOOPBACK
386384 ,m_n_mc_ttl(DEFAULT_MC_TTL)
387385 ,m_loops_to_go(safe_mce_sys().rx_poll_num_init) // Start up with a init polling loops value
388- ,m_rx_udp_poll_os_ratio_counter(0 )
389386 ,m_sock_offload(true )
390387 ,m_mc_num_grp_with_src_filter(0 )
391388 ,m_port_map_lock(" sockinfo_udp::m_ports_map_lock" )
@@ -397,7 +394,6 @@ sockinfo_udp::sockinfo_udp(int fd):
397394 ,m_n_tsing_flags(0 )
398395 ,m_tos(0 )
399396 ,m_n_sysvar_rx_poll_yield_loops(safe_mce_sys().rx_poll_yield_loops)
400- ,m_n_sysvar_rx_udp_poll_os_ratio(safe_mce_sys().rx_udp_poll_os_ratio)
401397 ,m_n_sysvar_rx_ready_byte_min_limit(safe_mce_sys().rx_ready_byte_min_limit)
402398 ,m_n_sysvar_rx_cq_drain_rate_nsec(safe_mce_sys().rx_cq_drain_rate_nsec)
403399 ,m_n_sysvar_rx_delta_tsc_between_cq_polls(safe_mce_sys().rx_delta_tsc_between_cq_polls)
@@ -406,6 +402,7 @@ sockinfo_udp::sockinfo_udp(int fd):
406402 ,m_sockopt_mapped(false )
407403 ,m_is_connected(false )
408404 ,m_multicast(false )
405+ ,m_b_os_data_available(false )
409406{
410407 si_udp_logfunc (" " );
411408
@@ -427,17 +424,17 @@ sockinfo_udp::sockinfo_udp(int fd):
427424 rx_ready_byte_count_limit_update (n_so_rcvbuf_bytes);
428425
429426 epoll_event ev = {0 , {0 }};
430-
431427 ev.events = EPOLLIN;
432-
433- // Add the user's orig fd to the rx epfd handle
434- ev.data .fd = m_fd;
428+ ev.data .fd = m_fd; // Add the user's orig fd to the rx epfd handle
435429
436430 BULLSEYE_EXCLUDE_BLOCK_START
437431 if (unlikely (orig_os_api.epoll_ctl (m_rx_epfd, EPOLL_CTL_ADD, ev.data .fd , &ev)))
438432 si_udp_logpanic (" failed to add user's fd to internal epfd errno=%d (%m)" , errno);
439433 BULLSEYE_EXCLUDE_BLOCK_END
440434
435+ // Register this socket to read nonoffloaded data
436+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
437+
441438 si_udp_logfunc (" done" );
442439}
443440
@@ -1426,30 +1423,25 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
14261423
14271424 return_reuse_buffers_postponed ();
14281425
1429- // Drop lock to not starve other threads
1430- m_lock_rcv.unlock ();
1431-
1432- // Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
1433- if ((m_n_sysvar_rx_udp_poll_os_ratio > 0 ) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
1426+ // Check for nonoffloaded data if m_b_os_data_available is true
1427+ if (m_b_os_data_available) {
14341428 ret = poll_os ();
1435- if (ret == -1 ) {
1436- /* coverity[double_lock] TODO: RM#1049980 */
1437- m_lock_rcv.lock ();
1438- goto out;
1439- }
1440- if (ret == 1 ) {
1441- /* coverity[double_lock] TODO: RM#1049980 */
1442- m_lock_rcv.lock ();
1429+ if (ret > 0 ) {
14431430 goto os;
14441431 }
1432+ if (unlikely (ret < 0 )) {
1433+ goto out;
1434+ }
14451435 }
14461436
1437+ // Drop lock to not starve other threads
1438+ m_lock_rcv.unlock ();
1439+
14471440 // First check if we have a packet in the ready list
14481441 if ((m_n_rx_pkt_ready_list_count > 0 && m_n_sysvar_rx_cq_drain_rate_nsec == MCE_RX_CQ_DRAIN_RATE_DISABLED)
14491442 || is_readable (&poll_sn)) {
14501443 /* coverity[double_lock] TODO: RM#1049980 */
14511444 m_lock_rcv.lock ();
1452- m_rx_udp_poll_os_ratio_counter++;
14531445 if (m_n_rx_pkt_ready_list_count > 0 ) {
14541446 // Found a ready packet in the list
14551447 if (__msg) handle_cmsg (__msg);
@@ -1479,20 +1471,28 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
14791471 m_lock_rcv.unlock ();
14801472 goto wait;
14811473 }
1482- }
1483- else if (unlikely (rx_wait_ret < 0 )) {
1474+ } else if (unlikely (rx_wait_ret < 0 )) {
14841475 // Got < 0, means an error occurred
14851476 ret = rx_wait_ret;
14861477 goto out;
14871478 } // else - packet in OS
14881479
1480+ // Else, check for nonoffloaded data - rx_wait() returned 1.
1481+ ret = poll_os ();
1482+ if (ret == 0 ) {
1483+ m_lock_rcv.unlock ();
1484+ goto wait;
1485+ }
1486+ if (unlikely (ret < 0 )) {
1487+ goto out;
1488+ }
1489+
14891490 /*
14901491 * If we got here, either the socket is not offloaded or rx_wait() returned 1.
14911492 */
14921493os:
14931494 if (in_flags & MSG_VMA_ZCOPY_FORCE) {
14941495 // Enable the next non-blocked read to check the OS
1495- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
14961496 errno = EIO;
14971497 ret = -1 ;
14981498 goto out;
@@ -1506,10 +1506,9 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
15061506 ret = socket_fd_api::rx_os (call_type, p_iov, sz_iov, in_flags, __from, __fromlen, __msg);
15071507 *p_flags = in_flags;
15081508 save_stats_rx_os (ret);
1509- if (ret > 0 ) {
1510- // This will cause the next non-blocked read to check the OS again.
1511- // We do this only after a successful read.
1512- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
1509+ if (ret <= 0 ) {
1510+ // Do not poll the os fd on the next rx() call.
1511+ unset_immediate_os_sample ();
15131512 }
15141513
15151514out:
@@ -1644,16 +1643,17 @@ void sockinfo_udp::handle_cmsg(struct msghdr * msg)
16441643 cm_state.mhdr ->msg_controllen = cm_state.cmsg_bytes_consumed ;
16451644}
16461645
1647- // This function is relevant only for non-blocking socket
16481646void sockinfo_udp::set_immediate_os_sample ()
16491647{
1650- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio ;
1648+ m_b_os_data_available = true ;
16511649}
16521650
1653- // This function is relevant only for non-blocking socket
16541651void sockinfo_udp::unset_immediate_os_sample ()
16551652{
1656- m_rx_udp_poll_os_ratio_counter = 0 ;
1653+ m_b_os_data_available = false ;
1654+
1655+ // Reassign EPOLLIN event
1656+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
16571657}
16581658
16591659bool sockinfo_udp::is_readable (uint64_t *p_poll_sn, fd_array_t * p_fd_ready_array)
@@ -1973,7 +1973,6 @@ int sockinfo_udp::rx_verify_available_data()
19731973 if (ret >= 0 ) {
19741974 // This will cause the next non-blocked read to check the OS again.
19751975 // We do this only after a successful read.
1976- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
19771976 ret = pending_data;
19781977 }
19791978 } else if (errno == EAGAIN) {
@@ -2328,12 +2327,9 @@ void sockinfo_udp::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ri
23282327 si_udp_logdbg (" " );
23292328 sockinfo::rx_add_ring_cb (flow_key, p_ring, is_migration);
23302329
2331- // Now that we got at least 1 CQ attached enable the skip os mechanism.
2332- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
2333-
23342330 // Now that we got at least 1 CQ attached start polling the CQs
23352331 if (m_b_blocking) {
2336- m_loops_to_go = m_n_sysvar_rx_poll_num;
2332+ m_loops_to_go = m_n_sysvar_rx_poll_num;
23372333 }
23382334 else {
23392335 m_loops_to_go = 1 ; // Force single CQ poll in case of non-blocking socket
@@ -2787,24 +2783,30 @@ int sockinfo_udp::get_socket_tx_ring_fd(struct sockaddr *to, socklen_t tolen)
27872783 return res;
27882784}
27892785
2790- mem_buf_desc_t * sockinfo_udp::get_front_m_rx_pkt_ready_list (){
2786+ mem_buf_desc_t * sockinfo_udp::get_front_m_rx_pkt_ready_list ()
2787+ {
27912788 return m_rx_pkt_ready_list.front ();
27922789}
27932790
2794- size_t sockinfo_udp::get_size_m_rx_pkt_ready_list (){
2791+ size_t sockinfo_udp::get_size_m_rx_pkt_ready_list ()
2792+ {
27952793 return m_rx_pkt_ready_list.size ();
27962794}
27972795
2798- void sockinfo_udp::pop_front_m_rx_pkt_ready_list (){
2796+ void sockinfo_udp::pop_front_m_rx_pkt_ready_list ()
2797+ {
27992798 m_rx_pkt_ready_list.pop_front ();
28002799}
28012800
2802- void sockinfo_udp::push_back_m_rx_pkt_ready_list (mem_buf_desc_t * buff){
2801+ void sockinfo_udp::push_back_m_rx_pkt_ready_list (mem_buf_desc_t * buff)
2802+ {
28032803 m_rx_pkt_ready_list.push_back (buff);
28042804}
28052805
2806- bool sockinfo_udp::prepare_to_close (bool process_shutdown) {
2806+ bool sockinfo_udp::prepare_to_close (bool process_shutdown)
2807+ {
28072808 m_lock_rcv.lock ();
2809+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
28082810 do_wakeup ();
28092811 m_lock_rcv.unlock ();
28102812 NOT_IN_USE (process_shutdown);
0 commit comments