From 6b5db968bb77d9cf74c443b708d6a08ac67aafdc Mon Sep 17 00:00:00 2001 From: Yohanan Betsis Date: Thu, 22 Oct 2020 13:40:49 +0000 Subject: [PATCH 1/3] issue: 2342345 Add new wakeup mechanism using eventfd Add class wakeup_eventfd which hold fd created from eventfd() (represented counter in kernel). This fd using as an event wait/notify mechanism. Call read() for this fd will increment counter and fd will be ready to read. Call write() will decrement counter to zero. Signed-off-by: Yohanan Betsis Reviewed-by: Igor Ivanov --- src/vma/Makefile.am | 2 + src/vma/event/event_handler_manager.cpp | 2 +- src/vma/iomux/epfd_info.cpp | 2 +- src/vma/sock/sockinfo.cpp | 2 +- src/vma/util/wakeup.cpp | 15 ++--- src/vma/util/wakeup.h | 5 +- src/vma/util/wakeup_eventfd.cpp | 85 +++++++++++++++++++++++++ src/vma/util/wakeup_eventfd.h | 53 +++++++++++++++ src/vma/util/wakeup_pipe.cpp | 9 +-- src/vma/util/wakeup_pipe.h | 1 + 10 files changed, 158 insertions(+), 18 deletions(-) create mode 100644 src/vma/util/wakeup_eventfd.cpp create mode 100644 src/vma/util/wakeup_eventfd.h diff --git a/src/vma/Makefile.am b/src/vma/Makefile.am index 7da4d6a3da..53f3793a63 100644 --- a/src/vma/Makefile.am +++ b/src/vma/Makefile.am @@ -156,6 +156,7 @@ libvma_la_SOURCES := \ \ util/wakeup.cpp \ util/wakeup_pipe.cpp \ + util/wakeup_eventfd.cpp \ util/match.cpp \ util/utils.cpp \ util/instrumentation.cpp \ @@ -303,6 +304,7 @@ libvma_la_SOURCES := \ util/vtypes.h \ util/wakeup.h \ util/wakeup_pipe.h \ + util/wakeup_eventfd.h \ util/agent.h \ util/agent_def.h \ util/data_updater.h \ diff --git a/src/vma/event/event_handler_manager.cpp b/src/vma/event/event_handler_manager.cpp index 107c59ec5a..9fb156be08 100644 --- a/src/vma/event/event_handler_manager.cpp +++ b/src/vma/event/event_handler_manager.cpp @@ -249,7 +249,7 @@ event_handler_manager::event_handler_manager() : m_b_continue_running = true; m_event_handler_tid = 0; - wakeup_set_epoll_fd(m_epfd); + wakeup_set_fd(m_epfd); going_to_sleep(); return; diff --git a/src/vma/iomux/epfd_info.cpp b/src/vma/iomux/epfd_info.cpp index c3886f4bd3..ad499ff0b2 100644 --- a/src/vma/iomux/epfd_info.cpp +++ b/src/vma/iomux/epfd_info.cpp @@ -88,7 +88,7 @@ epfd_info::epfd_info(int epfd, int size) : // Register this socket to read nonoffloaded data g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT); - wakeup_set_epoll_fd(m_epfd); + wakeup_set_fd(m_epfd); } epfd_info::~epfd_info() diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index d5891a48a7..f070383dbc 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -97,7 +97,7 @@ sockinfo::sockinfo(int fd): if (unlikely(m_rx_epfd == -1)) { throw_vma_exception("create internal epoll"); } - wakeup_set_epoll_fd(m_rx_epfd); + wakeup_set_fd(m_rx_epfd); m_p_socket_stats = &m_socket_stats; // Save stats as local copy and allow state publisher to copy from this location vma_stats_instance_create_socket_block(m_p_socket_stats); diff --git a/src/vma/util/wakeup.cpp b/src/vma/util/wakeup.cpp index c57bb28850..1ba1504d18 100644 --- a/src/vma/util/wakeup.cpp +++ b/src/vma/util/wakeup.cpp @@ -48,30 +48,29 @@ #define wkup_entry_dbg __log_entry_dbg #undef MODULE_HDR_INFO -#define MODULE_HDR_INFO MODULE_NAME "[epfd=%d]:%d:%s() " +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " #undef __INFO__ -#define __INFO__ m_epfd +#define __INFO__ m_wakeup_fd wakeup::wakeup() { - m_epfd = 0; + m_wakeup_fd = 0; m_is_sleeping = 0; - memset(&m_ev, 0, sizeof(m_ev)); } void wakeup::going_to_sleep() { BULLSEYE_EXCLUDE_BLOCK_START - if(likely(m_epfd)) + if(likely(m_wakeup_fd)) m_is_sleeping++; else { - wkup_logerr(" m_epfd is not initialized - cannot use wakeup mechanism\n"); + wkup_logerr(" m_wakeup_fd is not initialized - cannot use wakeup mechanism\n"); m_is_sleeping = 0; } BULLSEYE_EXCLUDE_BLOCK_END } -void wakeup::wakeup_set_epoll_fd(int epfd) +void wakeup::wakeup_set_fd(int fd) { - m_epfd = epfd; + m_wakeup_fd = fd; } diff --git a/src/vma/util/wakeup.h b/src/vma/util/wakeup.h index d50573df06..d99f8bd0ef 100644 --- a/src/vma/util/wakeup.h +++ b/src/vma/util/wakeup.h @@ -52,13 +52,12 @@ class wakeup void return_from_sleep() { --m_is_sleeping; }; protected: - virtual void wakeup_set_epoll_fd(int epfd); + virtual void wakeup_set_fd(int fd); int m_is_sleeping; //lock_spin_recursive m_wakeup_lock; This lock is not needed for now. Maybe we will need it for epoll. - int m_epfd; - struct epoll_event m_ev; + int m_wakeup_fd; }; #endif /* WAKEUP_H */ diff --git a/src/vma/util/wakeup_eventfd.cpp b/src/vma/util/wakeup_eventfd.cpp new file mode 100644 index 0000000000..1e173d1d00 --- /dev/null +++ b/src/vma/util/wakeup_eventfd.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include "vlogger/vlogger.h" +#include "wakeup_eventfd.h" + +#define MODULE_NAME "wakeup_eventfd" + +#define wkup_logpanic __log_info_panic +#define wkup_logerr __log_info_err +#define wkup_logwarn __log_info_warn +#define wkup_loginfo __log_info_info +#define wkup_logdbg __log_info_dbg +#define wkup_logfunc __log_info_func +#define wkup_logfuncall __log_info_funcall +#define wkup_entry_dbg __log_entry_dbg + +#undef MODULE_HDR_INFO +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " +#undef __INFO__ +#define __INFO__ m_wakeup_fd + +wakeup_eventfd::wakeup_eventfd() +{ + m_wakeup_fd = eventfd(0, 0); +} + +void wakeup_eventfd::do_wakeup() +{ + wkup_logfuncall(""); + if (!m_is_sleeping) { + wkup_logfunc("There is no thread in poll_wait, therefore not calling for wakeup"); + return; + } + wkup_entry_dbg(""); + uint64_t inc = 1; + if (write(m_wakeup_fd, &inc, sizeof(uint64_t)) != sizeof(uint64_t)) { + wkup_logerr("Failed to increase counter wakeup fd"); + } +} + +void wakeup_eventfd::remove_wakeup_fd() +{ + if (m_is_sleeping) return; + wkup_entry_dbg(""); + uint64_t inc; + if (read(m_wakeup_fd, &inc, sizeof(uint64_t)) != sizeof(uint64_t)) { + wkup_logerr("Failed to reduce counter wakeup fd"); + } +} + +wakeup_eventfd::~wakeup_eventfd() +{ + close(m_wakeup_fd); +} diff --git a/src/vma/util/wakeup_eventfd.h b/src/vma/util/wakeup_eventfd.h new file mode 100644 index 0000000000..85d35a4ad4 --- /dev/null +++ b/src/vma/util/wakeup_eventfd.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + +#ifndef WAKEUP_EVENTFD_H +#define WAKEUP_EVENTFD_H + +#include "wakeup.h" + +class wakeup_eventfd : public wakeup +{ +public: + wakeup_eventfd(void); + ~wakeup_eventfd(); + virtual void do_wakeup(); + virtual inline bool is_wakeup_fd(int fd) + { + return fd == m_wakeup_fd; + }; + virtual void remove_wakeup_fd(); + virtual inline int wakeup_get_fd() { return m_wakeup_fd; }; +}; + +#endif /* WAKEUP_EVENTFD_H */ diff --git a/src/vma/util/wakeup_pipe.cpp b/src/vma/util/wakeup_pipe.cpp index d53c404207..2f2e89eb24 100644 --- a/src/vma/util/wakeup_pipe.cpp +++ b/src/vma/util/wakeup_pipe.cpp @@ -48,9 +48,9 @@ #define wkup_entry_dbg __log_entry_dbg #undef MODULE_HDR_INFO -#define MODULE_HDR_INFO MODULE_NAME "[epfd=%d]:%d:%s() " +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " #undef __INFO__ -#define __INFO__ m_epfd +#define __INFO__ m_wakeup_fd #define UNINIT_PIPE_FD (-1) int wakeup_pipe::g_wakeup_pipes[2] = {UNINIT_PIPE_FD, UNINIT_PIPE_FD}; @@ -58,6 +58,7 @@ atomic_t wakeup_pipe::ref_count = ATOMIC_INIT(0); wakeup_pipe::wakeup_pipe() { + memset(&m_ev, 0, sizeof(m_ev)); int ref = atomic_fetch_and_inc(&ref_count); if (ref == 0) { BULLSEYE_EXCLUDE_BLOCK_START @@ -98,7 +99,7 @@ void wakeup_pipe::do_wakeup() int errno_tmp = errno; //don't let wakeup affect errno, as this can fail with EEXIST BULLSEYE_EXCLUDE_BLOCK_START - if ((orig_os_api.epoll_ctl(m_epfd, EPOLL_CTL_ADD, g_wakeup_pipes[0], &m_ev)) && (errno != EEXIST)) { + if ((orig_os_api.epoll_ctl(m_wakeup_fd, EPOLL_CTL_ADD, g_wakeup_pipes[0], &m_ev)) && (errno != EEXIST)) { wkup_logerr("Failed to add wakeup fd to internal epfd (errno=%d %m)", errno); } BULLSEYE_EXCLUDE_BLOCK_END @@ -113,7 +114,7 @@ void wakeup_pipe::remove_wakeup_fd() if (m_is_sleeping) return; wkup_entry_dbg(""); int tmp_errno = errno; - if (orig_os_api.epoll_ctl(m_epfd, EPOLL_CTL_DEL, g_wakeup_pipes[0], NULL)) + if (orig_os_api.epoll_ctl(m_wakeup_fd, EPOLL_CTL_DEL, g_wakeup_pipes[0], NULL)) { BULLSEYE_EXCLUDE_BLOCK_START if (errno == ENOENT) { diff --git a/src/vma/util/wakeup_pipe.h b/src/vma/util/wakeup_pipe.h index 78415cc087..f7d6f651fd 100644 --- a/src/vma/util/wakeup_pipe.h +++ b/src/vma/util/wakeup_pipe.h @@ -55,6 +55,7 @@ class wakeup_pipe : public wakeup private: static int g_wakeup_pipes[2]; static atomic_t ref_count; + struct epoll_event m_ev; }; #endif /* WAKEUP_PIPE_H */ From 292e792ce148d8c543cda089fff133c0245b1db7 Mon Sep 17 00:00:00 2001 From: Yohanan Betsis Date: Thu, 22 Oct 2020 13:49:20 +0000 Subject: [PATCH 2/3] issue: 2342345 Change poll instead of epoll for wait CQ event There issue with big CPU consumption in softirq when we use epoll and have large number of waiters for CQ fd. This poll() implementation using wakeup_eventfd() for wakeup mechanism. Signed-off-by: Yohanan Betsis Reviewed-by: Igor Ivanov --- src/vma/dev/ring_bond.cpp | 20 +++-------- src/vma/sock/sockinfo.cpp | 36 +++++-------------- src/vma/sock/sockinfo.h | 38 ++++++++++++++++---- src/vma/sock/sockinfo_tcp.cpp | 63 +++++++++++++-------------------- src/vma/sock/sockinfo_udp.cpp | 66 +++++++++++++++-------------------- 5 files changed, 98 insertions(+), 125 deletions(-) diff --git a/src/vma/dev/ring_bond.cpp b/src/vma/dev/ring_bond.cpp index 63de22fab8..d34b3c034c 100644 --- a/src/vma/dev/ring_bond.cpp +++ b/src/vma/dev/ring_bond.cpp @@ -186,12 +186,10 @@ void ring_bond::restart() for (j = 0; j < m_rx_flows.size(); j++) { sockinfo* si = static_cast (m_rx_flows[j].sink); for (k = 0; k < num_ring_rx_fds; k++ ) { - epfd = si->get_rx_epfd(); - if (epfd > 0) { - fd = ring_rx_fds_array[k]; - rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); - } + fd = ring_rx_fds_array[k]; + //it may not work with concurrent poll() + si->delete_fd_from_poll_array(fd); + epfd = si->get_epoll_context_fd(); if (epfd > 0) { fd = ring_rx_fds_array[k]; @@ -230,15 +228,7 @@ void ring_bond::restart() sockinfo* si = static_cast (m_rx_flows[j].sink); p_ring_bond_netvsc->m_vf_ring->attach_flow(m_rx_flows[j].flow, m_rx_flows[j].sink); for (k = 0; k < num_ring_rx_fds; k++ ) { - epfd = si->get_rx_epfd(); - if (epfd > 0) { - epoll_event ev = {0, {0}}; - fd = ring_rx_fds_array[k]; - ev.events = EPOLLIN; - ev.data.fd = fd; - rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); - ring_logdbg("Add fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); - } + si->add_fd_to_poll_array(ring_rx_fds_array[k]); epfd = si->get_epoll_context_fd(); if (epfd > 0) { #define CQ_FD_MARK 0xabcd /* see socket_fd_api */ diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index f070383dbc..1dd5d99622 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -33,7 +33,6 @@ #include "sockinfo.h" -#include #include #include @@ -93,11 +92,10 @@ sockinfo::sockinfo(int fd): { m_ring_alloc_logic = ring_allocation_logic_rx(get_fd(), m_ring_alloc_log_rx, this); - m_rx_epfd = orig_os_api.epoll_create(128); - if (unlikely(m_rx_epfd == -1)) { - throw_vma_exception("create internal epoll"); - } - wakeup_set_fd(m_rx_epfd); + m_poll_fds_array_capacity = DEFAULT_FDS_ARR_SIZE; + m_poll_fds_array = (pollfd *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(pollfd)); + m_poll_fds_array_size = 0; + add_fd_to_poll_array(wakeup_get_fd()); m_p_socket_stats = &m_socket_stats; // Save stats as local copy and allow state publisher to copy from this location vma_stats_instance_create_socket_block(m_p_socket_stats); @@ -133,7 +131,6 @@ sockinfo::~sockinfo() // Change to non-blocking socket so calling threads can exit m_b_blocking = false; - orig_os_api.close(m_rx_epfd); // this will wake up any blocked thread in rx() call to orig_os_api.epoll_wait() if (m_p_rings_fds) { delete[] m_p_rings_fds; m_p_rings_fds = NULL; @@ -1146,22 +1143,11 @@ void sockinfo::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, notify_epoll = true; - // Add this new CQ channel fd to the rx epfd handle (no need to wake up any sleeping thread about this new fd) - epoll_event ev = {0, {0}}; - ev.events = EPOLLIN; size_t num_ring_rx_fds; int *ring_rx_fds_array = p_ring->get_rx_channel_fds(num_ring_rx_fds); - for (size_t i = 0; i < num_ring_rx_fds; i++) { - int cq_ch_fd = ring_rx_fds_array[i]; - - ev.data.fd = cq_ch_fd; - - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, cq_ch_fd, &ev))) { - si_logerr("failed to add cq channel fd to internal epfd errno=%d (%m)", errno); - } - BULLSEYE_EXCLUDE_BLOCK_END + for (int i = 0; i < (int)num_ring_rx_fds; i++) { + add_fd_to_poll_array(ring_rx_fds_array[i]); } do_wakeup(); // A ready wce can be pending due to the drain logic (cq channel will not wake up by itself) @@ -1225,13 +1211,9 @@ void sockinfo::rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, size_t num_ring_rx_fds; int *ring_rx_fds_array = base_ring->get_rx_channel_fds(num_ring_rx_fds); - for (size_t i = 0; i < num_ring_rx_fds; i++) { - int cq_ch_fd = ring_rx_fds_array[i]; - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_DEL, cq_ch_fd, NULL))) { - si_logerr("failed to delete cq channel fd from internal epfd (errno=%d %m)", errno); - } - BULLSEYE_EXCLUDE_BLOCK_END + // it may not work with concurrent poll() + for (int i = 0; i < (int)num_ring_rx_fds; i++) { + delete_fd_from_poll_array(ring_rx_fds_array[i]); } notify_epoll = true; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 7c6e7d4581..1795433ec1 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -41,7 +41,7 @@ #include "vma/util/sock_addr.h" #include "vma/util/vma_stats.h" #include "vma/util/sys_vars.h" -#include "vma/util/wakeup_pipe.h" +#include "vma/util/wakeup_eventfd.h" #include "vma/proto/flow_tuple.h" #include "vma/proto/mem_buf_desc.h" #include "vma/proto/dst_entry.h" @@ -57,10 +57,11 @@ #ifndef BASE_SOCKINFO_H #define BASE_SOCKINFO_H -#define SI_RX_EPFD_EVENT_MAX 16 #define BYTE_TO_KB(byte_value) ((byte_value) / 125) #define KB_TO_BYTE(kbit_value) ((kbit_value) * 125) +#define DEFAULT_FDS_ARR_SIZE 10 + #if DEFINED_MISSING_NET_TSTAMP enum { SOF_TIMESTAMPING_TX_HARDWARE = (1<<0), @@ -154,7 +155,7 @@ const uint8_t ip_tos2prio[16] = { 4, 4, 4, 4 }; -class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_pipe +class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_eventfd { public: sockinfo(int fd); @@ -181,8 +182,28 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou return false; } inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; } - inline int get_rx_epfd(void) { return m_rx_epfd; } - + inline pollfd* get_poll_fds_array(void) { return m_poll_fds_array; } + inline int add_fd_to_poll_array(int fd) { + if (m_poll_fds_array_size >= m_poll_fds_array_capacity) { + m_poll_fds_array_capacity *= 2; + m_poll_fds_array = (pollfd *)realloc(m_poll_fds_array, m_poll_fds_array_capacity * sizeof(pollfd)); + } + m_poll_fds_array[m_poll_fds_array_size].fd = fd; + m_poll_fds_array[m_poll_fds_array_size].events = POLLIN; + return ++m_poll_fds_array_size; + } + inline int delete_fd_from_poll_array(int fd) { + int i; + if (unlikely(m_poll_fds_array_size == 0)) { return 0; } + for (i = 0; i < m_poll_fds_array_size && m_poll_fds_array[i].fd != fd; i++) ; + //safe for overlapping memory blocks + if (i < m_poll_fds_array_size - 1) { + memmove(&m_poll_fds_array[i], &m_poll_fds_array[i + 1], (m_poll_fds_array_size - i - 1) * sizeof(m_poll_fds_array[0])); + } + if (i < m_poll_fds_array_size) { m_poll_fds_array_size--; } + return m_poll_fds_array_size; + } + inline int get_poll_fds_array_size() { return m_poll_fds_array_size; } virtual bool flow_in_reuse(void) { return false;}; virtual int* get_rings_fds(int &res_length); virtual int get_rings_num(); @@ -217,7 +238,10 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou socket_stats_t m_socket_stats; socket_stats_t* m_p_socket_stats; - int m_rx_epfd; + struct pollfd* m_poll_fds_array; + int m_poll_fds_array_size; + int m_poll_fds_array_capacity; + cache_observer m_rx_nd_observer; rx_net_device_map_t m_rx_nd_map; rx_flow_map_t m_rx_flow_map; @@ -335,7 +359,7 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual inline void do_wakeup() { if (!is_socketxtreme()) { - wakeup_pipe::do_wakeup(); + wakeup_eventfd::do_wakeup(); } } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index ccff41b951..1be057a2d7 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -2491,24 +2491,7 @@ int sockinfo_tcp::listen(int backlog) BULLSEYE_EXCLUDE_BLOCK_END // Add the user's orig fd to the rx epfd handle - epoll_event ev = {0, {0}}; - ev.events = EPOLLIN; - ev.data.fd = m_fd; - int ret = orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely(ret)) { - if (errno == EEXIST) { - si_tcp_logdbg("failed to add user's fd to internal epfd errno=%d (%m)", errno); - } else { - si_tcp_logerr("failed to add user's fd to internal epfd errno=%d (%m)", errno); - si_tcp_logdbg("Fallback the connection to os"); - destructor_helper(); - setPassthrough(); - unlock_tcp_con(); - return 0; - } - } - BULLSEYE_EXCLUDE_BLOCK_END + add_fd_to_poll_array(m_fd); if (m_sysvar_tcp_ctl_thread > CTL_THREAD_DISABLE) m_timer_handle = g_p_event_handler_manager->register_timer_event(safe_mce_sys().timer_resolution_msec , this, PERIODIC_TIMER, 0, NULL); @@ -4047,7 +4030,6 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) int n; uint64_t poll_sn = 0; rx_ring_map_t::iterator rx_ring_iter; - epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX]; // poll for completion __log_info_func(""); @@ -4145,7 +4127,7 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) } //sleep on different CQs and OS listen socket - ret = orig_os_api.epoll_wait(m_rx_epfd, rx_epfd_events, SI_RX_EPFD_EVENT_MAX, m_loops_timer.time_left_msec()); + ret = orig_os_api.poll(m_poll_fds_array, m_poll_fds_array_size, m_loops_timer.time_left_msec()); lock_tcp_con(); return_from_sleep(); @@ -4158,28 +4140,31 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) if(m_n_rx_pkt_ready_list_count) return 0; - for (int event_idx = 0; event_idx < ret; event_idx++) + for (int event_idx = 0; event_idx < m_poll_fds_array_size; event_idx++) { - int fd = rx_epfd_events[event_idx].data.fd; - if (is_wakeup_fd(fd)) - { // wakeup event - lock_tcp_con(); - remove_wakeup_fd(); - unlock_tcp_con(); - continue; - } + if (m_poll_fds_array[event_idx].revents & POLLIN) { + m_poll_fds_array[event_idx].revents = 0; + int fd = m_poll_fds_array[event_idx].fd; + if (is_wakeup_fd(fd)) + { // wakeup event + lock_tcp_con(); + remove_wakeup_fd(); + unlock_tcp_con(); + continue; + } - // Check if OS fd is ready for reading - if (fd == m_fd) { - continue; - } + // Check if OS fd is ready for reading + if (fd == m_fd) { + continue; + } - // poll cq. fd == cq channel fd. - cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); - if (p_cq_ch_info) { - ring* p_ring = p_cq_ch_info->get_ring(); - if (p_ring) { - p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + // poll cq. fd == cq channel fd. + cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); + if (p_cq_ch_info) { + ring* p_ring = p_cq_ch_info->get_ring(); + if (p_ring) { + p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + } } } } diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 30810af7e7..40e910fc9a 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -119,7 +119,6 @@ inline int sockinfo_udp::rx_wait(bool blocking) ssize_t ret = 0; int32_t loops = 0; int32_t loops_to_go = blocking ? m_loops_to_go : 1; - epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX]; uint64_t poll_sn = 0; m_loops_timer.start(); @@ -213,7 +212,7 @@ inline int sockinfo_udp::rx_wait(bool blocking) continue; } - ret = orig_os_api.epoll_wait(m_rx_epfd, rx_epfd_events, SI_RX_EPFD_EVENT_MAX, m_loops_timer.time_left_msec()); + ret = orig_os_api.poll(m_poll_fds_array, m_poll_fds_array_size, m_loops_timer.time_left_msec()); /* coverity[double_lock] TODO: RM#1049980 */ m_lock_rcv.lock(); @@ -251,34 +250,36 @@ inline int sockinfo_udp::rx_wait(bool blocking) } // Run through all ready fd's - for (int event_idx = 0; event_idx < ret; ++event_idx) { - int fd = rx_epfd_events[event_idx].data.fd; - if (is_wakeup_fd(fd)) { - /* coverity[double_lock] TODO: RM#1049980 */ - m_lock_rcv.lock(); - remove_wakeup_fd(); - /* coverity[double_unlock] TODO: RM#1049980 */ - m_lock_rcv.unlock(); - continue; - } + for (int event_idx = 0; event_idx < m_poll_fds_array_size; ++event_idx) { + if (m_poll_fds_array[event_idx].revents & POLLIN) { + int fd = m_poll_fds_array[event_idx].fd; + if (is_wakeup_fd(fd)) { + /* coverity[double_lock] TODO: RM#1049980 */ + m_lock_rcv.lock(); + remove_wakeup_fd(); + /* coverity[double_unlock] TODO: RM#1049980 */ + m_lock_rcv.unlock(); + continue; + } - // Check if OS fd is ready for reading - if (fd == m_fd) { - m_rx_udp_poll_os_ratio_counter = 0; - return 1; - } + // Check if OS fd is ready for reading + if (fd == m_fd) { + m_rx_udp_poll_os_ratio_counter = 0; + return 1; + } - // All that is left is our CQ offloading channel fd's - // poll cq. fd == cq channel fd. - // Process one wce on the relevant CQ - // The Rx CQ channel is non-blocking so this will always return quickly - cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); - if (p_cq_ch_info) { - ring* p_ring = p_cq_ch_info->get_ring(); - if (p_ring) { - p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + // All that is left is our CQ offloading channel fd's + // poll cq. fd == cq channel fd. + // Process one wce on the relevant CQ + // The Rx CQ channel is non-blocking so this will always return quickly + cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); + if (p_cq_ch_info) { + ring* p_ring = p_cq_ch_info->get_ring(); + if (p_ring) { + p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + } + } } - } } } @@ -370,17 +371,8 @@ sockinfo_udp::sockinfo_udp(int fd): si_udp_logdbg("Sockets RCVBUF = %d bytes", n_so_rcvbuf_bytes); rx_ready_byte_count_limit_update(n_so_rcvbuf_bytes); - epoll_event ev = {0, {0}}; - - ev.events = EPOLLIN; - // Add the user's orig fd to the rx epfd handle - ev.data.fd = m_fd; - - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely(orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev))) - si_udp_logpanic("failed to add user's fd to internal epfd errno=%d (%m)", errno); - BULLSEYE_EXCLUDE_BLOCK_END + add_fd_to_poll_array(m_fd); si_udp_logfunc("done"); } From d14501d22b253b0cbfb39ab0ce13ce6c24a13ef5 Mon Sep 17 00:00:00 2001 From: Igor Ivanov Date: Thu, 4 Mar 2021 15:29:47 +0200 Subject: [PATCH 3/3] issue: 2342345 Handle concurrent delete from fds poll array Add fds which we want to delete in array and wakeup. After wake up we handle its event and delete fds from delete fds' array from poll array. Signed-off-by: Yohanan Betsis Reviewed-by: Igor Ivanov --- src/vma/dev/ring_bond.cpp | 5 +-- src/vma/sock/sockinfo.cpp | 12 ++++-- src/vma/sock/sockinfo.h | 69 +++++++++++++++++++++++++---------- src/vma/sock/sockinfo_tcp.cpp | 1 + src/vma/sock/sockinfo_udp.cpp | 2 + 5 files changed, 63 insertions(+), 26 deletions(-) diff --git a/src/vma/dev/ring_bond.cpp b/src/vma/dev/ring_bond.cpp index d34b3c034c..b7025e25b6 100644 --- a/src/vma/dev/ring_bond.cpp +++ b/src/vma/dev/ring_bond.cpp @@ -187,16 +187,15 @@ void ring_bond::restart() sockinfo* si = static_cast (m_rx_flows[j].sink); for (k = 0; k < num_ring_rx_fds; k++ ) { fd = ring_rx_fds_array[k]; - //it may not work with concurrent poll() - si->delete_fd_from_poll_array(fd); + si->delete_fd_from_poll_array_deferred(fd); epfd = si->get_epoll_context_fd(); if (epfd > 0) { - fd = ring_rx_fds_array[k]; rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); } } + si->do_wakeup(); } p_ring_tap->m_active = true; diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index 1dd5d99622..e8e5094a7f 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -94,7 +94,9 @@ sockinfo::sockinfo(int fd): m_ring_alloc_logic = ring_allocation_logic_rx(get_fd(), m_ring_alloc_log_rx, this); m_poll_fds_array_capacity = DEFAULT_FDS_ARR_SIZE; m_poll_fds_array = (pollfd *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(pollfd)); + m_poll_fds_delete_array = (int *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(int)); m_poll_fds_array_size = 0; + m_poll_fds_delete_array_size = 0; add_fd_to_poll_array(wakeup_get_fd()); m_p_socket_stats = &m_socket_stats; // Save stats as local copy and allow state publisher to copy from this location @@ -135,7 +137,11 @@ sockinfo::~sockinfo() delete[] m_p_rings_fds; m_p_rings_fds = NULL; } - vma_stats_instance_remove_socket_block(m_p_socket_stats); + + free(m_poll_fds_array); + free(m_poll_fds_delete_array); + + vma_stats_instance_remove_socket_block(m_p_socket_stats); } void sockinfo::set_blocking(bool is_blocked) @@ -1211,10 +1217,10 @@ void sockinfo::rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, size_t num_ring_rx_fds; int *ring_rx_fds_array = base_ring->get_rx_channel_fds(num_ring_rx_fds); - // it may not work with concurrent poll() for (int i = 0; i < (int)num_ring_rx_fds; i++) { - delete_fd_from_poll_array(ring_rx_fds_array[i]); + delete_fd_from_poll_array_deferred(ring_rx_fds_array[i]); } + do_wakeup(); notify_epoll = true; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 1795433ec1..469b4a9cc2 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -182,28 +182,56 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou return false; } inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; } - inline pollfd* get_poll_fds_array(void) { return m_poll_fds_array; } - inline int add_fd_to_poll_array(int fd) { + inline void add_fd_to_poll_array(int fd) { if (m_poll_fds_array_size >= m_poll_fds_array_capacity) { m_poll_fds_array_capacity *= 2; - m_poll_fds_array = (pollfd *)realloc(m_poll_fds_array, m_poll_fds_array_capacity * sizeof(pollfd)); + pollfd* new_m_poll_fds_array = static_cast(realloc(m_poll_fds_array, m_poll_fds_array_capacity * sizeof(pollfd))); + if (new_m_poll_fds_array == NULL) { + vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__); + return ; + } else { + m_poll_fds_array = new_m_poll_fds_array; + } } m_poll_fds_array[m_poll_fds_array_size].fd = fd; m_poll_fds_array[m_poll_fds_array_size].events = POLLIN; - return ++m_poll_fds_array_size; + ++m_poll_fds_array_size; } - inline int delete_fd_from_poll_array(int fd) { - int i; - if (unlikely(m_poll_fds_array_size == 0)) { return 0; } - for (i = 0; i < m_poll_fds_array_size && m_poll_fds_array[i].fd != fd; i++) ; - //safe for overlapping memory blocks - if (i < m_poll_fds_array_size - 1) { - memmove(&m_poll_fds_array[i], &m_poll_fds_array[i + 1], (m_poll_fds_array_size - i - 1) * sizeof(m_poll_fds_array[0])); + inline void delete_fds_from_poll_array() { + int j; + if (unlikely(m_poll_fds_array_size == 0)) { + return ; + } + if (unlikely(m_poll_fds_delete_array_size != 0)) { + for (int i = 0; i < m_poll_fds_delete_array_size; i++) { + for (j = 0; j < m_poll_fds_array_size && m_poll_fds_array[j].fd != m_poll_fds_delete_array[i]; j++) ; + //safe for overlapping memory blocks + if (j < m_poll_fds_array_size - 1) { + memmove(&m_poll_fds_array[j], &m_poll_fds_array[j + 1], (m_poll_fds_array_size - j - 1) * sizeof(m_poll_fds_array[0])); + } + if (j < m_poll_fds_array_size) { + m_poll_fds_array_size--; + } + } + memset(m_poll_fds_delete_array, 0, m_poll_fds_delete_array_size * sizeof(int)); + m_poll_fds_delete_array_size = 0; } - if (i < m_poll_fds_array_size) { m_poll_fds_array_size--; } - return m_poll_fds_array_size; } - inline int get_poll_fds_array_size() { return m_poll_fds_array_size; } + inline void delete_fd_from_poll_array_deferred(int fd) { + if (m_poll_fds_delete_array_size >= m_poll_fds_array_capacity) { + m_poll_fds_array_capacity *= 2; + int* new_m_poll_fds_delete_array = static_cast(realloc(m_poll_fds_delete_array, m_poll_fds_array_capacity * sizeof(int))); + if (new_m_poll_fds_delete_array == NULL) { + vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__); + return ; + } else { + m_poll_fds_delete_array = new_m_poll_fds_delete_array; + } + + } + m_poll_fds_delete_array[m_poll_fds_delete_array_size] = fd; + ++m_poll_fds_delete_array_size; + } virtual bool flow_in_reuse(void) { return false;}; virtual int* get_rings_fds(int &res_length); virtual int get_rings_num(); @@ -212,6 +240,11 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual void statistics_print(vlog_levels_t log_level = VLOG_DEBUG); uint32_t get_flow_tag_val() { return m_flow_tag_id; } inline in_protocol_t get_protocol(void) { return m_protocol; } + virtual inline void do_wakeup() { + if (!is_socketxtreme()) { + wakeup_eventfd::do_wakeup(); + } + } private: int fcntl_helper(int __cmd, unsigned long int __arg, bool& bexit); @@ -241,6 +274,8 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou struct pollfd* m_poll_fds_array; int m_poll_fds_array_size; int m_poll_fds_array_capacity; + int* m_poll_fds_delete_array; + int m_poll_fds_delete_array_size; cache_observer m_rx_nd_observer; rx_net_device_map_t m_rx_nd_map; @@ -357,12 +392,6 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual bool try_un_offloading(); // un-offload the socket if possible - virtual inline void do_wakeup() { - if (!is_socketxtreme()) { - wakeup_eventfd::do_wakeup(); - } - } - inline bool is_socketxtreme() { return (m_p_rx_ring && m_p_rx_ring->is_socketxtreme()); } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index 1be057a2d7..10683483eb 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -4149,6 +4149,7 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) { // wakeup event lock_tcp_con(); remove_wakeup_fd(); + delete_fds_from_poll_array(); unlock_tcp_con(); continue; } diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 40e910fc9a..8c3be77db5 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -257,6 +257,8 @@ inline int sockinfo_udp::rx_wait(bool blocking) /* coverity[double_lock] TODO: RM#1049980 */ m_lock_rcv.lock(); remove_wakeup_fd(); + delete_fds_from_poll_array(); + /* coverity[double_unlock] TODO: RM#1049980 */ m_lock_rcv.unlock(); continue;