@@ -100,18 +100,20 @@ inline int sockinfo_udp::poll_os()
100100 int ret;
101101 uint64_t pending_data = 0 ;
102102
103- m_rx_udp_poll_os_ratio_counter = 0 ;
104103 ret = orig_os_api.ioctl (m_fd, FIONREAD, &pending_data);
105- if (unlikely (ret == -1 )) {
104+ if (pending_data > 0 ) {
105+ return pending_data;
106+ }
107+
108+ // No data is available
109+ unset_immediate_os_sample ();
110+
111+ if (ret < 0 ) {
106112 m_p_socket_stats->counters .n_rx_os_errors ++;
107113 si_udp_logdbg (" orig_os_api.ioctl returned with error in polling loop (errno=%d %m)" , errno);
108- return -1 ;
109114 }
110- if (pending_data > 0 ) {
111- m_p_socket_stats->counters .n_rx_poll_os_hit ++;
112- return 1 ;
113- }
114- return 0 ;
115+
116+ return ret;
115117}
116118
117119inline int sockinfo_udp::rx_wait (bool blocking)
@@ -122,31 +124,26 @@ inline int sockinfo_udp::rx_wait(bool blocking)
122124 epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX];
123125 uint64_t poll_sn;
124126
125- m_loops_timer.start ();
127+ m_loops_timer.start ();
126128
127129 while (loops_to_go) {
128130
129131 // Multi-thread polling support - let other threads have a go on this CPU
130- if ((m_n_sysvar_rx_poll_yield_loops > 0 ) && ((loops % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1 ))) {
132+ if ((m_n_sysvar_rx_poll_yield_loops > 0 ) && ((loops++ % m_n_sysvar_rx_poll_yield_loops) == (m_n_sysvar_rx_poll_yield_loops - 1 ))) {
131133 sched_yield ();
132134 }
133135
134- // Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
135- if ((m_n_sysvar_rx_udp_poll_os_ratio > 0 ) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
136- ret = poll_os ();
137- if ((ret == -1 ) || (ret == 1 )) {
138- return ret;
139- }
136+ if (m_b_os_data_available) {
137+ // OS data might be available
138+ return 1 ;
140139 }
141140
142141 // Poll cq for offloaded ready packets ...
143- m_rx_udp_poll_os_ratio_counter++;
144142 if (is_readable (&poll_sn)) {
145143 m_p_socket_stats->counters .n_rx_poll_hit ++;
146144 return 0 ;
147145 }
148146
149- loops++;
150147 if (!blocking || m_n_sysvar_rx_poll_num != -1 ) {
151148 loops_to_go--;
152149 }
@@ -264,7 +261,8 @@ inline int sockinfo_udp::rx_wait(bool blocking)
264261
265262 // Check if OS fd is ready for reading
266263 if (fd == m_fd) {
267- m_rx_udp_poll_os_ratio_counter = 0 ;
264+ // OS data might be available
265+ set_immediate_os_sample ();
268266 return 1 ;
269267 }
270268
@@ -334,15 +332,13 @@ sockinfo_udp::sockinfo_udp(int fd):
334332 ,m_b_mc_tx_loop(safe_mce_sys().tx_mc_loopback_default) // default value is 'true'. User can change this with config parameter SYS_VAR_TX_MC_LOOPBACK
335333 ,m_n_mc_ttl(DEFAULT_MC_TTL)
336334 ,m_loops_to_go(safe_mce_sys().rx_poll_num_init) // Start up with a init polling loops value
337- ,m_rx_udp_poll_os_ratio_counter(0 )
338335 ,m_sock_offload(true )
339336 ,m_mc_num_grp_with_src_filter(0 )
340337 ,m_port_map_lock(" sockinfo_udp::m_ports_map_lock" )
341338 ,m_port_map_index(0 )
342339 ,m_p_last_dst_entry(NULL )
343340 ,m_tos(0 )
344341 ,m_n_sysvar_rx_poll_yield_loops(safe_mce_sys().rx_poll_yield_loops)
345- ,m_n_sysvar_rx_udp_poll_os_ratio(safe_mce_sys().rx_udp_poll_os_ratio)
346342 ,m_n_sysvar_rx_ready_byte_min_limit(safe_mce_sys().rx_ready_byte_min_limit)
347343 ,m_n_sysvar_rx_cq_drain_rate_nsec(safe_mce_sys().rx_cq_drain_rate_nsec)
348344 ,m_n_sysvar_rx_delta_tsc_between_cq_polls(safe_mce_sys().rx_delta_tsc_between_cq_polls)
@@ -351,6 +347,7 @@ sockinfo_udp::sockinfo_udp(int fd):
351347 ,m_sockopt_mapped(false )
352348 ,m_is_connected(false )
353349 ,m_multicast(false )
350+ ,m_b_os_data_available(false )
354351{
355352 si_udp_logfunc (" " );
356353
@@ -372,17 +369,17 @@ sockinfo_udp::sockinfo_udp(int fd):
372369 rx_ready_byte_count_limit_update (n_so_rcvbuf_bytes);
373370
374371 epoll_event ev = {0 , {0 }};
375-
376372 ev.events = EPOLLIN;
377-
378- // Add the user's orig fd to the rx epfd handle
379- ev.data .fd = m_fd;
373+ ev.data .fd = m_fd; // Add the user's orig fd to the rx epfd handle
380374
381375 BULLSEYE_EXCLUDE_BLOCK_START
382376 if (unlikely (orig_os_api.epoll_ctl (m_rx_epfd, EPOLL_CTL_ADD, ev.data .fd , &ev)))
383377 si_udp_logpanic (" failed to add user's fd to internal epfd errno=%d (%m)" , errno);
384378 BULLSEYE_EXCLUDE_BLOCK_END
385379
380+ // Register this socket to read nonoffloaded data
381+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
382+
386383 si_udp_logfunc (" done" );
387384}
388385
@@ -1356,30 +1353,25 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
13561353
13571354 return_reuse_buffers_postponed ();
13581355
1359- // Drop lock to not starve other threads
1360- m_lock_rcv.unlock ();
1361-
1362- // Poll socket for OS ready packets... (at a ratio of the offloaded sockets as defined in m_n_sysvar_rx_udp_poll_os_ratio)
1363- if ((m_n_sysvar_rx_udp_poll_os_ratio > 0 ) && (m_rx_udp_poll_os_ratio_counter >= m_n_sysvar_rx_udp_poll_os_ratio)) {
1356+ // Check for nonoffloaded data if m_b_os_data_available is true
1357+ if (m_b_os_data_available) {
13641358 ret = poll_os ();
1365- if (ret == -1 ) {
1366- /* coverity[double_lock] TODO: RM#1049980 */
1367- m_lock_rcv.lock ();
1368- goto out;
1369- }
1370- if (ret == 1 ) {
1371- /* coverity[double_lock] TODO: RM#1049980 */
1372- m_lock_rcv.lock ();
1359+ if (ret > 0 ) {
13731360 goto os;
13741361 }
1362+ if (unlikely (ret < 0 )) {
1363+ goto out;
1364+ }
13751365 }
13761366
1367+ // Drop lock to not starve other threads
1368+ m_lock_rcv.unlock ();
1369+
13771370 // First check if we have a packet in the ready list
13781371 if ((m_n_rx_pkt_ready_list_count > 0 && m_n_sysvar_rx_cq_drain_rate_nsec == MCE_RX_CQ_DRAIN_RATE_DISABLED)
13791372 || is_readable (&poll_sn)) {
13801373 /* coverity[double_lock] TODO: RM#1049980 */
13811374 m_lock_rcv.lock ();
1382- m_rx_udp_poll_os_ratio_counter++;
13831375 if (m_n_rx_pkt_ready_list_count > 0 ) {
13841376 // Found a ready packet in the list
13851377 if (__msg) handle_cmsg (__msg);
@@ -1409,20 +1401,28 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
14091401 m_lock_rcv.unlock ();
14101402 goto wait;
14111403 }
1412- }
1413- else if (unlikely (rx_wait_ret < 0 )) {
1404+ } else if (unlikely (rx_wait_ret < 0 )) {
14141405 // Got < 0, means an error occurred
14151406 ret = rx_wait_ret;
14161407 goto out;
14171408 } // else - packet in OS
14181409
1410+ // Else, check for nonoffloaded data - rx_wait() returned 1.
1411+ ret = poll_os ();
1412+ if (ret == 0 ) {
1413+ m_lock_rcv.unlock ();
1414+ goto wait;
1415+ }
1416+ if (unlikely (ret < 0 )) {
1417+ goto out;
1418+ }
1419+
14191420 /*
14201421 * If we got here, either the socket is not offloaded or rx_wait() returned 1.
14211422 */
14221423os:
14231424 if (in_flags & MSG_VMA_ZCOPY_FORCE) {
14241425 // Enable the next non-blocked read to check the OS
1425- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
14261426 errno = EIO;
14271427 ret = -1 ;
14281428 goto out;
@@ -1436,10 +1436,9 @@ ssize_t sockinfo_udp::rx(const rx_call_t call_type, iovec* p_iov,ssize_t sz_iov,
14361436 ret = socket_fd_api::rx_os (call_type, p_iov, sz_iov, in_flags, __from, __fromlen, __msg);
14371437 *p_flags = in_flags;
14381438 save_stats_rx_os (ret);
1439- if (ret > 0 ) {
1440- // This will cause the next non-blocked read to check the OS again.
1441- // We do this only after a successful read.
1442- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
1439+ if (ret <= 0 ) {
1440+ // Do not poll the os fd on the next rx() call.
1441+ unset_immediate_os_sample ();
14431442 }
14441443
14451444out:
@@ -1480,16 +1479,17 @@ void sockinfo_udp::handle_ip_pktinfo(struct cmsg_state * cm_state)
14801479 insert_cmsg (cm_state, IPPROTO_IP, IP_PKTINFO, &in_pktinfo, sizeof (struct in_pktinfo ));
14811480}
14821481
1483- // This function is relevant only for non-blocking socket
14841482void sockinfo_udp::set_immediate_os_sample ()
14851483{
1486- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio ;
1484+ m_b_os_data_available = true ;
14871485}
14881486
1489- // This function is relevant only for non-blocking socket
14901487void sockinfo_udp::unset_immediate_os_sample ()
14911488{
1492- m_rx_udp_poll_os_ratio_counter = 0 ;
1489+ m_b_os_data_available = false ;
1490+
1491+ // Reassign EPOLLIN event
1492+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
14931493}
14941494
14951495bool sockinfo_udp::is_readable (uint64_t *p_poll_sn, fd_array_t * p_fd_ready_array)
@@ -1809,7 +1809,6 @@ int sockinfo_udp::rx_verify_available_data()
18091809 if (ret >= 0 ) {
18101810 // This will cause the next non-blocked read to check the OS again.
18111811 // We do this only after a successful read.
1812- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
18131812 ret = pending_data;
18141813 }
18151814 } else if (errno == EAGAIN) {
@@ -2142,12 +2141,9 @@ void sockinfo_udp::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ri
21422141 si_udp_logdbg (" " );
21432142 sockinfo::rx_add_ring_cb (flow_key, p_ring, is_migration);
21442143
2145- // Now that we got at least 1 CQ attached enable the skip os mechanism.
2146- m_rx_udp_poll_os_ratio_counter = m_n_sysvar_rx_udp_poll_os_ratio;
2147-
21482144 // Now that we got at least 1 CQ attached start polling the CQs
21492145 if (m_b_blocking) {
2150- m_loops_to_go = m_n_sysvar_rx_poll_num;
2146+ m_loops_to_go = m_n_sysvar_rx_poll_num;
21512147 }
21522148 else {
21532149 m_loops_to_go = 1 ; // Force single CQ poll in case of non-blocking socket
@@ -2610,24 +2606,30 @@ int sockinfo_udp::get_socket_tx_ring_fd(struct sockaddr *to, socklen_t tolen)
26102606 return res;
26112607}
26122608
2613- mem_buf_desc_t * sockinfo_udp::get_front_m_rx_pkt_ready_list (){
2609+ mem_buf_desc_t * sockinfo_udp::get_front_m_rx_pkt_ready_list ()
2610+ {
26142611 return m_rx_pkt_ready_list.front ();
26152612}
26162613
2617- size_t sockinfo_udp::get_size_m_rx_pkt_ready_list (){
2614+ size_t sockinfo_udp::get_size_m_rx_pkt_ready_list ()
2615+ {
26182616 return m_rx_pkt_ready_list.size ();
26192617}
26202618
2621- void sockinfo_udp::pop_front_m_rx_pkt_ready_list (){
2619+ void sockinfo_udp::pop_front_m_rx_pkt_ready_list ()
2620+ {
26222621 m_rx_pkt_ready_list.pop_front ();
26232622}
26242623
2625- void sockinfo_udp::push_back_m_rx_pkt_ready_list (mem_buf_desc_t * buff){
2624+ void sockinfo_udp::push_back_m_rx_pkt_ready_list (mem_buf_desc_t * buff)
2625+ {
26262626 m_rx_pkt_ready_list.push_back (buff);
26272627}
26282628
2629- bool sockinfo_udp::prepare_to_close (bool process_shutdown) {
2629+ bool sockinfo_udp::prepare_to_close (bool process_shutdown)
2630+ {
26302631 m_lock_rcv.lock ();
2632+ g_p_event_handler_manager->update_epfd (m_fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
26312633 do_wakeup ();
26322634 m_lock_rcv.unlock ();
26332635 NOT_IN_USE (process_shutdown);
0 commit comments