diff --git a/src/vma/Makefile.am b/src/vma/Makefile.am index 7da4d6a3da..53f3793a63 100644 --- a/src/vma/Makefile.am +++ b/src/vma/Makefile.am @@ -156,6 +156,7 @@ libvma_la_SOURCES := \ \ util/wakeup.cpp \ util/wakeup_pipe.cpp \ + util/wakeup_eventfd.cpp \ util/match.cpp \ util/utils.cpp \ util/instrumentation.cpp \ @@ -303,6 +304,7 @@ libvma_la_SOURCES := \ util/vtypes.h \ util/wakeup.h \ util/wakeup_pipe.h \ + util/wakeup_eventfd.h \ util/agent.h \ util/agent_def.h \ util/data_updater.h \ diff --git a/src/vma/dev/ring_bond.cpp b/src/vma/dev/ring_bond.cpp index 63de22fab8..b7025e25b6 100644 --- a/src/vma/dev/ring_bond.cpp +++ b/src/vma/dev/ring_bond.cpp @@ -186,19 +186,16 @@ void ring_bond::restart() for (j = 0; j < m_rx_flows.size(); j++) { sockinfo* si = static_cast (m_rx_flows[j].sink); for (k = 0; k < num_ring_rx_fds; k++ ) { - epfd = si->get_rx_epfd(); - if (epfd > 0) { - fd = ring_rx_fds_array[k]; - rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); - ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); - } + fd = ring_rx_fds_array[k]; + si->delete_fd_from_poll_array_deferred(fd); + epfd = si->get_epoll_context_fd(); if (epfd > 0) { - fd = ring_rx_fds_array[k]; rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL); ring_logdbg("Remove fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); } } + si->do_wakeup(); } p_ring_tap->m_active = true; @@ -230,15 +227,7 @@ void ring_bond::restart() sockinfo* si = static_cast (m_rx_flows[j].sink); p_ring_bond_netvsc->m_vf_ring->attach_flow(m_rx_flows[j].flow, m_rx_flows[j].sink); for (k = 0; k < num_ring_rx_fds; k++ ) { - epfd = si->get_rx_epfd(); - if (epfd > 0) { - epoll_event ev = {0, {0}}; - fd = ring_rx_fds_array[k]; - ev.events = EPOLLIN; - ev.data.fd = fd; - rc = orig_os_api.epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev); - ring_logdbg("Add fd=%d from epfd=%d rc=%d errno=%d", fd, epfd, rc, errno); - } + si->add_fd_to_poll_array(ring_rx_fds_array[k]); epfd = si->get_epoll_context_fd(); if (epfd > 0) { #define CQ_FD_MARK 0xabcd /* see socket_fd_api */ diff --git a/src/vma/event/event_handler_manager.cpp b/src/vma/event/event_handler_manager.cpp index 107c59ec5a..9fb156be08 100644 --- a/src/vma/event/event_handler_manager.cpp +++ b/src/vma/event/event_handler_manager.cpp @@ -249,7 +249,7 @@ event_handler_manager::event_handler_manager() : m_b_continue_running = true; m_event_handler_tid = 0; - wakeup_set_epoll_fd(m_epfd); + wakeup_set_fd(m_epfd); going_to_sleep(); return; diff --git a/src/vma/iomux/epfd_info.cpp b/src/vma/iomux/epfd_info.cpp index c3886f4bd3..ad499ff0b2 100644 --- a/src/vma/iomux/epfd_info.cpp +++ b/src/vma/iomux/epfd_info.cpp @@ -88,7 +88,7 @@ epfd_info::epfd_info(int epfd, int size) : // Register this socket to read nonoffloaded data g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT); - wakeup_set_epoll_fd(m_epfd); + wakeup_set_fd(m_epfd); } epfd_info::~epfd_info() diff --git a/src/vma/sock/sockinfo.cpp b/src/vma/sock/sockinfo.cpp index d5891a48a7..e8e5094a7f 100644 --- a/src/vma/sock/sockinfo.cpp +++ b/src/vma/sock/sockinfo.cpp @@ -33,7 +33,6 @@ #include "sockinfo.h" -#include #include #include @@ -93,11 +92,12 @@ sockinfo::sockinfo(int fd): { m_ring_alloc_logic = ring_allocation_logic_rx(get_fd(), m_ring_alloc_log_rx, this); - m_rx_epfd = orig_os_api.epoll_create(128); - if (unlikely(m_rx_epfd == -1)) { - throw_vma_exception("create internal epoll"); - } - wakeup_set_epoll_fd(m_rx_epfd); + m_poll_fds_array_capacity = DEFAULT_FDS_ARR_SIZE; + m_poll_fds_array = (pollfd *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(pollfd)); + m_poll_fds_delete_array = (int *)malloc(DEFAULT_FDS_ARR_SIZE * sizeof(int)); + m_poll_fds_array_size = 0; + m_poll_fds_delete_array_size = 0; + add_fd_to_poll_array(wakeup_get_fd()); m_p_socket_stats = &m_socket_stats; // Save stats as local copy and allow state publisher to copy from this location vma_stats_instance_create_socket_block(m_p_socket_stats); @@ -133,12 +133,15 @@ sockinfo::~sockinfo() // Change to non-blocking socket so calling threads can exit m_b_blocking = false; - orig_os_api.close(m_rx_epfd); // this will wake up any blocked thread in rx() call to orig_os_api.epoll_wait() if (m_p_rings_fds) { delete[] m_p_rings_fds; m_p_rings_fds = NULL; } - vma_stats_instance_remove_socket_block(m_p_socket_stats); + + free(m_poll_fds_array); + free(m_poll_fds_delete_array); + + vma_stats_instance_remove_socket_block(m_p_socket_stats); } void sockinfo::set_blocking(bool is_blocked) @@ -1146,22 +1149,11 @@ void sockinfo::rx_add_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, notify_epoll = true; - // Add this new CQ channel fd to the rx epfd handle (no need to wake up any sleeping thread about this new fd) - epoll_event ev = {0, {0}}; - ev.events = EPOLLIN; size_t num_ring_rx_fds; int *ring_rx_fds_array = p_ring->get_rx_channel_fds(num_ring_rx_fds); - for (size_t i = 0; i < num_ring_rx_fds; i++) { - int cq_ch_fd = ring_rx_fds_array[i]; - - ev.data.fd = cq_ch_fd; - - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, cq_ch_fd, &ev))) { - si_logerr("failed to add cq channel fd to internal epfd errno=%d (%m)", errno); - } - BULLSEYE_EXCLUDE_BLOCK_END + for (int i = 0; i < (int)num_ring_rx_fds; i++) { + add_fd_to_poll_array(ring_rx_fds_array[i]); } do_wakeup(); // A ready wce can be pending due to the drain logic (cq channel will not wake up by itself) @@ -1225,14 +1217,10 @@ void sockinfo::rx_del_ring_cb(flow_tuple_with_local_if &flow_key, ring* p_ring, size_t num_ring_rx_fds; int *ring_rx_fds_array = base_ring->get_rx_channel_fds(num_ring_rx_fds); - for (size_t i = 0; i < num_ring_rx_fds; i++) { - int cq_ch_fd = ring_rx_fds_array[i]; - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely( orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_DEL, cq_ch_fd, NULL))) { - si_logerr("failed to delete cq channel fd from internal epfd (errno=%d %m)", errno); - } - BULLSEYE_EXCLUDE_BLOCK_END + for (int i = 0; i < (int)num_ring_rx_fds; i++) { + delete_fd_from_poll_array_deferred(ring_rx_fds_array[i]); } + do_wakeup(); notify_epoll = true; diff --git a/src/vma/sock/sockinfo.h b/src/vma/sock/sockinfo.h index 7c6e7d4581..469b4a9cc2 100644 --- a/src/vma/sock/sockinfo.h +++ b/src/vma/sock/sockinfo.h @@ -41,7 +41,7 @@ #include "vma/util/sock_addr.h" #include "vma/util/vma_stats.h" #include "vma/util/sys_vars.h" -#include "vma/util/wakeup_pipe.h" +#include "vma/util/wakeup_eventfd.h" #include "vma/proto/flow_tuple.h" #include "vma/proto/mem_buf_desc.h" #include "vma/proto/dst_entry.h" @@ -57,10 +57,11 @@ #ifndef BASE_SOCKINFO_H #define BASE_SOCKINFO_H -#define SI_RX_EPFD_EVENT_MAX 16 #define BYTE_TO_KB(byte_value) ((byte_value) / 125) #define KB_TO_BYTE(kbit_value) ((kbit_value) * 125) +#define DEFAULT_FDS_ARR_SIZE 10 + #if DEFINED_MISSING_NET_TSTAMP enum { SOF_TIMESTAMPING_TX_HARDWARE = (1<<0), @@ -154,7 +155,7 @@ const uint8_t ip_tos2prio[16] = { 4, 4, 4, 4 }; -class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_pipe +class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_source, public wakeup_eventfd { public: sockinfo(int fd); @@ -181,8 +182,56 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou return false; } inline bool flow_tag_enabled(void) { return m_flow_tag_enabled; } - inline int get_rx_epfd(void) { return m_rx_epfd; } - + inline void add_fd_to_poll_array(int fd) { + if (m_poll_fds_array_size >= m_poll_fds_array_capacity) { + m_poll_fds_array_capacity *= 2; + pollfd* new_m_poll_fds_array = static_cast(realloc(m_poll_fds_array, m_poll_fds_array_capacity * sizeof(pollfd))); + if (new_m_poll_fds_array == NULL) { + vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__); + return ; + } else { + m_poll_fds_array = new_m_poll_fds_array; + } + } + m_poll_fds_array[m_poll_fds_array_size].fd = fd; + m_poll_fds_array[m_poll_fds_array_size].events = POLLIN; + ++m_poll_fds_array_size; + } + inline void delete_fds_from_poll_array() { + int j; + if (unlikely(m_poll_fds_array_size == 0)) { + return ; + } + if (unlikely(m_poll_fds_delete_array_size != 0)) { + for (int i = 0; i < m_poll_fds_delete_array_size; i++) { + for (j = 0; j < m_poll_fds_array_size && m_poll_fds_array[j].fd != m_poll_fds_delete_array[i]; j++) ; + //safe for overlapping memory blocks + if (j < m_poll_fds_array_size - 1) { + memmove(&m_poll_fds_array[j], &m_poll_fds_array[j + 1], (m_poll_fds_array_size - j - 1) * sizeof(m_poll_fds_array[0])); + } + if (j < m_poll_fds_array_size) { + m_poll_fds_array_size--; + } + } + memset(m_poll_fds_delete_array, 0, m_poll_fds_delete_array_size * sizeof(int)); + m_poll_fds_delete_array_size = 0; + } + } + inline void delete_fd_from_poll_array_deferred(int fd) { + if (m_poll_fds_delete_array_size >= m_poll_fds_array_capacity) { + m_poll_fds_array_capacity *= 2; + int* new_m_poll_fds_delete_array = static_cast(realloc(m_poll_fds_delete_array, m_poll_fds_array_capacity * sizeof(int))); + if (new_m_poll_fds_delete_array == NULL) { + vlog_printf(VLOG_ERROR, "%s:%d: Realloc cannot find enough space\n", __func__, __LINE__); + return ; + } else { + m_poll_fds_delete_array = new_m_poll_fds_delete_array; + } + + } + m_poll_fds_delete_array[m_poll_fds_delete_array_size] = fd; + ++m_poll_fds_delete_array_size; + } virtual bool flow_in_reuse(void) { return false;}; virtual int* get_rings_fds(int &res_length); virtual int get_rings_num(); @@ -191,6 +240,11 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual void statistics_print(vlog_levels_t log_level = VLOG_DEBUG); uint32_t get_flow_tag_val() { return m_flow_tag_id; } inline in_protocol_t get_protocol(void) { return m_protocol; } + virtual inline void do_wakeup() { + if (!is_socketxtreme()) { + wakeup_eventfd::do_wakeup(); + } + } private: int fcntl_helper(int __cmd, unsigned long int __arg, bool& bexit); @@ -217,7 +271,12 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou socket_stats_t m_socket_stats; socket_stats_t* m_p_socket_stats; - int m_rx_epfd; + struct pollfd* m_poll_fds_array; + int m_poll_fds_array_size; + int m_poll_fds_array_capacity; + int* m_poll_fds_delete_array; + int m_poll_fds_delete_array_size; + cache_observer m_rx_nd_observer; rx_net_device_map_t m_rx_nd_map; rx_flow_map_t m_rx_flow_map; @@ -333,12 +392,6 @@ class sockinfo : public socket_fd_api, public pkt_rcvr_sink, public pkt_sndr_sou virtual bool try_un_offloading(); // un-offload the socket if possible - virtual inline void do_wakeup() { - if (!is_socketxtreme()) { - wakeup_pipe::do_wakeup(); - } - } - inline bool is_socketxtreme() { return (m_p_rx_ring && m_p_rx_ring->is_socketxtreme()); } diff --git a/src/vma/sock/sockinfo_tcp.cpp b/src/vma/sock/sockinfo_tcp.cpp index ccff41b951..10683483eb 100644 --- a/src/vma/sock/sockinfo_tcp.cpp +++ b/src/vma/sock/sockinfo_tcp.cpp @@ -2491,24 +2491,7 @@ int sockinfo_tcp::listen(int backlog) BULLSEYE_EXCLUDE_BLOCK_END // Add the user's orig fd to the rx epfd handle - epoll_event ev = {0, {0}}; - ev.events = EPOLLIN; - ev.data.fd = m_fd; - int ret = orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev); - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely(ret)) { - if (errno == EEXIST) { - si_tcp_logdbg("failed to add user's fd to internal epfd errno=%d (%m)", errno); - } else { - si_tcp_logerr("failed to add user's fd to internal epfd errno=%d (%m)", errno); - si_tcp_logdbg("Fallback the connection to os"); - destructor_helper(); - setPassthrough(); - unlock_tcp_con(); - return 0; - } - } - BULLSEYE_EXCLUDE_BLOCK_END + add_fd_to_poll_array(m_fd); if (m_sysvar_tcp_ctl_thread > CTL_THREAD_DISABLE) m_timer_handle = g_p_event_handler_manager->register_timer_event(safe_mce_sys().timer_resolution_msec , this, PERIODIC_TIMER, 0, NULL); @@ -4047,7 +4030,6 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) int n; uint64_t poll_sn = 0; rx_ring_map_t::iterator rx_ring_iter; - epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX]; // poll for completion __log_info_func(""); @@ -4145,7 +4127,7 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) } //sleep on different CQs and OS listen socket - ret = orig_os_api.epoll_wait(m_rx_epfd, rx_epfd_events, SI_RX_EPFD_EVENT_MAX, m_loops_timer.time_left_msec()); + ret = orig_os_api.poll(m_poll_fds_array, m_poll_fds_array_size, m_loops_timer.time_left_msec()); lock_tcp_con(); return_from_sleep(); @@ -4158,28 +4140,32 @@ int sockinfo_tcp::rx_wait_helper(int &poll_count, bool is_blocking) if(m_n_rx_pkt_ready_list_count) return 0; - for (int event_idx = 0; event_idx < ret; event_idx++) + for (int event_idx = 0; event_idx < m_poll_fds_array_size; event_idx++) { - int fd = rx_epfd_events[event_idx].data.fd; - if (is_wakeup_fd(fd)) - { // wakeup event - lock_tcp_con(); - remove_wakeup_fd(); - unlock_tcp_con(); - continue; - } + if (m_poll_fds_array[event_idx].revents & POLLIN) { + m_poll_fds_array[event_idx].revents = 0; + int fd = m_poll_fds_array[event_idx].fd; + if (is_wakeup_fd(fd)) + { // wakeup event + lock_tcp_con(); + remove_wakeup_fd(); + delete_fds_from_poll_array(); + unlock_tcp_con(); + continue; + } - // Check if OS fd is ready for reading - if (fd == m_fd) { - continue; - } + // Check if OS fd is ready for reading + if (fd == m_fd) { + continue; + } - // poll cq. fd == cq channel fd. - cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); - if (p_cq_ch_info) { - ring* p_ring = p_cq_ch_info->get_ring(); - if (p_ring) { - p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + // poll cq. fd == cq channel fd. + cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); + if (p_cq_ch_info) { + ring* p_ring = p_cq_ch_info->get_ring(); + if (p_ring) { + p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + } } } } diff --git a/src/vma/sock/sockinfo_udp.cpp b/src/vma/sock/sockinfo_udp.cpp index 30810af7e7..8c3be77db5 100644 --- a/src/vma/sock/sockinfo_udp.cpp +++ b/src/vma/sock/sockinfo_udp.cpp @@ -119,7 +119,6 @@ inline int sockinfo_udp::rx_wait(bool blocking) ssize_t ret = 0; int32_t loops = 0; int32_t loops_to_go = blocking ? m_loops_to_go : 1; - epoll_event rx_epfd_events[SI_RX_EPFD_EVENT_MAX]; uint64_t poll_sn = 0; m_loops_timer.start(); @@ -213,7 +212,7 @@ inline int sockinfo_udp::rx_wait(bool blocking) continue; } - ret = orig_os_api.epoll_wait(m_rx_epfd, rx_epfd_events, SI_RX_EPFD_EVENT_MAX, m_loops_timer.time_left_msec()); + ret = orig_os_api.poll(m_poll_fds_array, m_poll_fds_array_size, m_loops_timer.time_left_msec()); /* coverity[double_lock] TODO: RM#1049980 */ m_lock_rcv.lock(); @@ -251,34 +250,38 @@ inline int sockinfo_udp::rx_wait(bool blocking) } // Run through all ready fd's - for (int event_idx = 0; event_idx < ret; ++event_idx) { - int fd = rx_epfd_events[event_idx].data.fd; - if (is_wakeup_fd(fd)) { - /* coverity[double_lock] TODO: RM#1049980 */ - m_lock_rcv.lock(); - remove_wakeup_fd(); - /* coverity[double_unlock] TODO: RM#1049980 */ - m_lock_rcv.unlock(); - continue; - } + for (int event_idx = 0; event_idx < m_poll_fds_array_size; ++event_idx) { + if (m_poll_fds_array[event_idx].revents & POLLIN) { + int fd = m_poll_fds_array[event_idx].fd; + if (is_wakeup_fd(fd)) { + /* coverity[double_lock] TODO: RM#1049980 */ + m_lock_rcv.lock(); + remove_wakeup_fd(); + delete_fds_from_poll_array(); + + /* coverity[double_unlock] TODO: RM#1049980 */ + m_lock_rcv.unlock(); + continue; + } - // Check if OS fd is ready for reading - if (fd == m_fd) { - m_rx_udp_poll_os_ratio_counter = 0; - return 1; - } + // Check if OS fd is ready for reading + if (fd == m_fd) { + m_rx_udp_poll_os_ratio_counter = 0; + return 1; + } - // All that is left is our CQ offloading channel fd's - // poll cq. fd == cq channel fd. - // Process one wce on the relevant CQ - // The Rx CQ channel is non-blocking so this will always return quickly - cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); - if (p_cq_ch_info) { - ring* p_ring = p_cq_ch_info->get_ring(); - if (p_ring) { - p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + // All that is left is our CQ offloading channel fd's + // poll cq. fd == cq channel fd. + // Process one wce on the relevant CQ + // The Rx CQ channel is non-blocking so this will always return quickly + cq_channel_info* p_cq_ch_info = g_p_fd_collection->get_cq_channel_fd(fd); + if (p_cq_ch_info) { + ring* p_ring = p_cq_ch_info->get_ring(); + if (p_ring) { + p_ring->wait_for_notification_and_process_element(fd, &poll_sn); + } + } } - } } } @@ -370,17 +373,8 @@ sockinfo_udp::sockinfo_udp(int fd): si_udp_logdbg("Sockets RCVBUF = %d bytes", n_so_rcvbuf_bytes); rx_ready_byte_count_limit_update(n_so_rcvbuf_bytes); - epoll_event ev = {0, {0}}; - - ev.events = EPOLLIN; - // Add the user's orig fd to the rx epfd handle - ev.data.fd = m_fd; - - BULLSEYE_EXCLUDE_BLOCK_START - if (unlikely(orig_os_api.epoll_ctl(m_rx_epfd, EPOLL_CTL_ADD, ev.data.fd, &ev))) - si_udp_logpanic("failed to add user's fd to internal epfd errno=%d (%m)", errno); - BULLSEYE_EXCLUDE_BLOCK_END + add_fd_to_poll_array(m_fd); si_udp_logfunc("done"); } diff --git a/src/vma/util/wakeup.cpp b/src/vma/util/wakeup.cpp index c57bb28850..1ba1504d18 100644 --- a/src/vma/util/wakeup.cpp +++ b/src/vma/util/wakeup.cpp @@ -48,30 +48,29 @@ #define wkup_entry_dbg __log_entry_dbg #undef MODULE_HDR_INFO -#define MODULE_HDR_INFO MODULE_NAME "[epfd=%d]:%d:%s() " +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " #undef __INFO__ -#define __INFO__ m_epfd +#define __INFO__ m_wakeup_fd wakeup::wakeup() { - m_epfd = 0; + m_wakeup_fd = 0; m_is_sleeping = 0; - memset(&m_ev, 0, sizeof(m_ev)); } void wakeup::going_to_sleep() { BULLSEYE_EXCLUDE_BLOCK_START - if(likely(m_epfd)) + if(likely(m_wakeup_fd)) m_is_sleeping++; else { - wkup_logerr(" m_epfd is not initialized - cannot use wakeup mechanism\n"); + wkup_logerr(" m_wakeup_fd is not initialized - cannot use wakeup mechanism\n"); m_is_sleeping = 0; } BULLSEYE_EXCLUDE_BLOCK_END } -void wakeup::wakeup_set_epoll_fd(int epfd) +void wakeup::wakeup_set_fd(int fd) { - m_epfd = epfd; + m_wakeup_fd = fd; } diff --git a/src/vma/util/wakeup.h b/src/vma/util/wakeup.h index d50573df06..d99f8bd0ef 100644 --- a/src/vma/util/wakeup.h +++ b/src/vma/util/wakeup.h @@ -52,13 +52,12 @@ class wakeup void return_from_sleep() { --m_is_sleeping; }; protected: - virtual void wakeup_set_epoll_fd(int epfd); + virtual void wakeup_set_fd(int fd); int m_is_sleeping; //lock_spin_recursive m_wakeup_lock; This lock is not needed for now. Maybe we will need it for epoll. - int m_epfd; - struct epoll_event m_ev; + int m_wakeup_fd; }; #endif /* WAKEUP_H */ diff --git a/src/vma/util/wakeup_eventfd.cpp b/src/vma/util/wakeup_eventfd.cpp new file mode 100644 index 0000000000..1e173d1d00 --- /dev/null +++ b/src/vma/util/wakeup_eventfd.cpp @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +#include +#include "vlogger/vlogger.h" +#include "wakeup_eventfd.h" + +#define MODULE_NAME "wakeup_eventfd" + +#define wkup_logpanic __log_info_panic +#define wkup_logerr __log_info_err +#define wkup_logwarn __log_info_warn +#define wkup_loginfo __log_info_info +#define wkup_logdbg __log_info_dbg +#define wkup_logfunc __log_info_func +#define wkup_logfuncall __log_info_funcall +#define wkup_entry_dbg __log_entry_dbg + +#undef MODULE_HDR_INFO +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " +#undef __INFO__ +#define __INFO__ m_wakeup_fd + +wakeup_eventfd::wakeup_eventfd() +{ + m_wakeup_fd = eventfd(0, 0); +} + +void wakeup_eventfd::do_wakeup() +{ + wkup_logfuncall(""); + if (!m_is_sleeping) { + wkup_logfunc("There is no thread in poll_wait, therefore not calling for wakeup"); + return; + } + wkup_entry_dbg(""); + uint64_t inc = 1; + if (write(m_wakeup_fd, &inc, sizeof(uint64_t)) != sizeof(uint64_t)) { + wkup_logerr("Failed to increase counter wakeup fd"); + } +} + +void wakeup_eventfd::remove_wakeup_fd() +{ + if (m_is_sleeping) return; + wkup_entry_dbg(""); + uint64_t inc; + if (read(m_wakeup_fd, &inc, sizeof(uint64_t)) != sizeof(uint64_t)) { + wkup_logerr("Failed to reduce counter wakeup fd"); + } +} + +wakeup_eventfd::~wakeup_eventfd() +{ + close(m_wakeup_fd); +} diff --git a/src/vma/util/wakeup_eventfd.h b/src/vma/util/wakeup_eventfd.h new file mode 100644 index 0000000000..85d35a4ad4 --- /dev/null +++ b/src/vma/util/wakeup_eventfd.h @@ -0,0 +1,53 @@ +/* + * Copyright (c) 2001-2020 Mellanox Technologies, Ltd. All rights reserved. + * + * This software is available to you under a choice of one of two + * licenses. You may choose to be licensed under the terms of the GNU + * General Public License (GPL) Version 2, available from the file + * COPYING in the main directory of this source tree, or the + * BSD license below: + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * - Redistributions of source code must retain the above + * copyright notice, this list of conditions and the following + * disclaimer. + * + * - Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, + * EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF + * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND + * NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS + * BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN + * ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN + * CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + + +#ifndef WAKEUP_EVENTFD_H +#define WAKEUP_EVENTFD_H + +#include "wakeup.h" + +class wakeup_eventfd : public wakeup +{ +public: + wakeup_eventfd(void); + ~wakeup_eventfd(); + virtual void do_wakeup(); + virtual inline bool is_wakeup_fd(int fd) + { + return fd == m_wakeup_fd; + }; + virtual void remove_wakeup_fd(); + virtual inline int wakeup_get_fd() { return m_wakeup_fd; }; +}; + +#endif /* WAKEUP_EVENTFD_H */ diff --git a/src/vma/util/wakeup_pipe.cpp b/src/vma/util/wakeup_pipe.cpp index d53c404207..2f2e89eb24 100644 --- a/src/vma/util/wakeup_pipe.cpp +++ b/src/vma/util/wakeup_pipe.cpp @@ -48,9 +48,9 @@ #define wkup_entry_dbg __log_entry_dbg #undef MODULE_HDR_INFO -#define MODULE_HDR_INFO MODULE_NAME "[epfd=%d]:%d:%s() " +#define MODULE_HDR_INFO MODULE_NAME "[wakeup_fd=%d]:%d:%s() " #undef __INFO__ -#define __INFO__ m_epfd +#define __INFO__ m_wakeup_fd #define UNINIT_PIPE_FD (-1) int wakeup_pipe::g_wakeup_pipes[2] = {UNINIT_PIPE_FD, UNINIT_PIPE_FD}; @@ -58,6 +58,7 @@ atomic_t wakeup_pipe::ref_count = ATOMIC_INIT(0); wakeup_pipe::wakeup_pipe() { + memset(&m_ev, 0, sizeof(m_ev)); int ref = atomic_fetch_and_inc(&ref_count); if (ref == 0) { BULLSEYE_EXCLUDE_BLOCK_START @@ -98,7 +99,7 @@ void wakeup_pipe::do_wakeup() int errno_tmp = errno; //don't let wakeup affect errno, as this can fail with EEXIST BULLSEYE_EXCLUDE_BLOCK_START - if ((orig_os_api.epoll_ctl(m_epfd, EPOLL_CTL_ADD, g_wakeup_pipes[0], &m_ev)) && (errno != EEXIST)) { + if ((orig_os_api.epoll_ctl(m_wakeup_fd, EPOLL_CTL_ADD, g_wakeup_pipes[0], &m_ev)) && (errno != EEXIST)) { wkup_logerr("Failed to add wakeup fd to internal epfd (errno=%d %m)", errno); } BULLSEYE_EXCLUDE_BLOCK_END @@ -113,7 +114,7 @@ void wakeup_pipe::remove_wakeup_fd() if (m_is_sleeping) return; wkup_entry_dbg(""); int tmp_errno = errno; - if (orig_os_api.epoll_ctl(m_epfd, EPOLL_CTL_DEL, g_wakeup_pipes[0], NULL)) + if (orig_os_api.epoll_ctl(m_wakeup_fd, EPOLL_CTL_DEL, g_wakeup_pipes[0], NULL)) { BULLSEYE_EXCLUDE_BLOCK_START if (errno == ENOENT) { diff --git a/src/vma/util/wakeup_pipe.h b/src/vma/util/wakeup_pipe.h index 78415cc087..f7d6f651fd 100644 --- a/src/vma/util/wakeup_pipe.h +++ b/src/vma/util/wakeup_pipe.h @@ -55,6 +55,7 @@ class wakeup_pipe : public wakeup private: static int g_wakeup_pipes[2]; static atomic_t ref_count; + struct epoll_event m_ev; }; #endif /* WAKEUP_PIPE_H */