Skip to content

Commit 7b6ea97

Browse files
author
Liran Oz
committed
issue: 1117626 Move epoll poll_os logic to the internal thread
This commit is not based on udp poll os commit (pr Mellanox#445). * f0adf45 issue: 1117626 Move epoll poll_os logic to the internal thread Signed-off-by: Liran Oz <lirano@mellanox.com>
1 parent 24be311 commit 7b6ea97

File tree

9 files changed

+157
-46
lines changed

9 files changed

+157
-46
lines changed

src/vma/event/event_handler_manager.cpp

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include <rdma/rdma_cma.h>
3838
#include <vma/dev/net_device_table_mgr.h>
3939
#include "vma/dev/ring_allocation_logic.h"
40+
#include "vma/sock/fd_collection.h"
4041
#include "vma/sock/sock-redirect.h" // calling orig_os_api.epoll()
4142
#include "vma/util/verbs_extra.h"
4243

@@ -395,11 +396,11 @@ void event_handler_manager::stop_thread()
395396
m_epfd = -1;
396397
}
397398

398-
void event_handler_manager::update_epfd(int fd, int operation)
399+
void event_handler_manager::update_epfd(int fd, int operation, int events)
399400
{
400401
epoll_event ev = {0, {0}};
401402

402-
ev.events = EPOLLIN | EPOLLPRI;
403+
ev.events = events;
403404
ev.data.fd = fd;
404405
BULLSEYE_EXCLUDE_BLOCK_START
405406
if (orig_os_api.epoll_ctl(m_epfd, operation, fd, &ev) < 0) {
@@ -520,7 +521,7 @@ void event_handler_manager::priv_register_ibverbs_events(ibverbs_reg_info_t& inf
520521

521522
priv_prepare_ibverbs_async_event_queue(i);
522523

523-
update_epfd(info.fd, EPOLL_CTL_ADD);
524+
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
524525
evh_logdbg("%d added to event_handler_map_t!", info.fd);
525526
}
526527
BULLSEYE_EXCLUDE_BLOCK_START
@@ -582,7 +583,7 @@ void event_handler_manager::priv_unregister_ibverbs_events(ibverbs_reg_info_t& i
582583

583584
i->second.ibverbs_ev.ev_map.erase(j);
584585
if (n == 1) {
585-
update_epfd(info.fd, EPOLL_CTL_DEL);
586+
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
586587
m_event_handler_map.erase(i);
587588
evh_logdbg("%d erased from event_handler_map_t!", info.fd);
588589
}
@@ -607,7 +608,7 @@ void event_handler_manager::priv_register_rdma_cm_events(rdma_cm_reg_info_t& inf
607608
/* cppcheck-suppress uninitStructMember */
608609
m_event_handler_map[info.fd] = map_value;
609610

610-
update_epfd(info.fd, EPOLL_CTL_ADD);
611+
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
611612
}
612613
else {
613614
BULLSEYE_EXCLUDE_BLOCK_START
@@ -651,7 +652,7 @@ void event_handler_manager::priv_unregister_rdma_cm_events(rdma_cm_reg_info_t& i
651652
iter_fd->second.rdma_cm_ev.map_rdma_cm_id.erase(iter_id);
652653
iter_fd->second.rdma_cm_ev.n_ref_count--;
653654
if (iter_fd->second.rdma_cm_ev.n_ref_count == 0) {
654-
update_epfd(info.fd, EPOLL_CTL_DEL);
655+
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
655656
m_event_handler_map.erase(iter_fd);
656657
evh_logdbg("Removed channel <%d %p>", info.fd, info.id);
657658
}
@@ -679,7 +680,7 @@ void event_handler_manager::priv_register_command_events(command_reg_info_t& inf
679680
/* coverity[uninit_use_in_call] */
680681
/* cppcheck-suppress uninitStructMember */
681682
m_event_handler_map[info.fd] = map_value;
682-
update_epfd(info.fd, EPOLL_CTL_ADD);
683+
update_epfd(info.fd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI);
683684
}
684685

685686
}
@@ -696,7 +697,7 @@ void event_handler_manager::priv_unregister_command_events(command_reg_info_t& i
696697
evh_logdbg(" This fd (%d) no longer COMMAND type fd", info.fd);
697698
}
698699
else {
699-
update_epfd(info.fd, EPOLL_CTL_DEL);
700+
update_epfd(info.fd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI);
700701
}
701702
}
702703

@@ -954,7 +955,10 @@ void* event_handler_manager::thread_loop()
954955

955956
event_handler_map_t::iterator i = m_event_handler_map.find(fd);
956957
if (i == m_event_handler_map.end()) {
957-
evh_logdbg("No event handler (fd=%d)", fd);
958+
// No event handler - this is probably a poll_os event!
959+
if (!g_p_fd_collection->set_immediate_os_sample(fd)) {
960+
evh_logdbg("No event handler (fd=%d)", fd);
961+
}
958962
continue;
959963
}
960964

src/vma/event/event_handler_manager.h

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,15 @@ struct reg_action_t {
127127

128128
typedef std::deque<struct reg_action_t> reg_action_q_t;
129129

130-
enum {
130+
enum ev_type {
131131
EV_IBVERBS,
132132
EV_RDMA_CM,
133133
EV_COMMAND,
134134
};
135135

136136

137137
struct event_data_t {
138-
int type;
138+
ev_type type;
139139
ibverbs_ev_t ibverbs_ev;
140140
rdma_cm_ev_t rdma_cm_ev;
141141
command_ev_t command_ev;
@@ -174,11 +174,13 @@ class event_handler_manager : public wakeup_pipe
174174
void* thread_loop();
175175
void stop_thread();
176176

177+
void update_epfd(int fd, int operation, int events);
178+
177179
private:
178180
pthread_t m_event_handler_tid;
179181
bool m_b_continue_running;
180182
int m_cq_epfd;
181-
int m_epfd;
183+
int m_epfd;
182184

183185
// pipe for the event registration handling
184186
reg_action_q_t m_reg_action_q;
@@ -209,7 +211,6 @@ class event_handler_manager : public wakeup_pipe
209211
void process_ibverbs_event(event_handler_map_t::iterator &i);
210212
void process_rdma_cm_event(event_handler_map_t::iterator &i);
211213
int start_thread();
212-
void update_epfd(int fd, int operation);
213214

214215
void event_channel_post_process_for_rdma_events(void* p_event);
215216
void* event_channel_pre_process_for_rdma_events(void* p_event_channel_handle, void** p_event);

src/vma/iomux/epfd_info.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,8 @@ int epfd_info::remove_fd_from_epoll_os(int fd)
5353
}
5454

5555
epfd_info::epfd_info(int epfd, int size) :
56-
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"), m_sysvar_thread_mode(safe_mce_sys().thread_mode)
56+
lock_mutex_recursive("epfd_info"), m_epfd(epfd), m_size(size), m_ring_map_lock("epfd_ring_map_lock"),
57+
m_sysvar_thread_mode(safe_mce_sys().thread_mode), m_b_os_data_available(false)
5758
{
5859
__log_funcall("");
5960
int max_sys_fd = get_sys_max_fd_num();
@@ -82,6 +83,9 @@ epfd_info::epfd_info(int epfd, int size) :
8283

8384
vma_stats_instance_create_epoll_block(m_epfd, &(m_stats->stats));
8485

86+
// Register this socket to read nonoffloaded data
87+
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_ADD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
88+
8589
wakeup_set_epoll_fd(m_epfd);
8690
}
8791

@@ -120,6 +124,9 @@ epfd_info::~epfd_info()
120124
}
121125
BULLSEYE_EXCLUDE_BLOCK_END
122126
}
127+
128+
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_DEL, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
129+
123130
unlock();
124131

125132
vma_stats_instance_remove_epoll_block(&m_stats->stats);
@@ -765,3 +772,28 @@ void epfd_info::statistics_print(vlog_levels_t log_level /* = VLOG_DEBUG */)
765772
}
766773
}
767774
}
775+
776+
void epfd_info::set_immediate_os_sample()
777+
{
778+
lock();
779+
m_b_os_data_available = true;
780+
unlock();
781+
}
782+
783+
void epfd_info::unset_immediate_os_sample()
784+
{
785+
lock();
786+
// Reassign EPOLLIN event
787+
m_b_os_data_available = false;
788+
g_p_event_handler_manager->update_epfd(m_epfd, EPOLL_CTL_MOD, EPOLLIN | EPOLLPRI | EPOLLONESHOT);
789+
unlock();
790+
}
791+
792+
bool epfd_info::is_os_data_available() {
793+
lock();
794+
bool ret = m_b_os_data_available;
795+
m_b_os_data_available = false;
796+
unlock();
797+
return ret;
798+
}
799+

src/vma/iomux/epfd_info.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake
102102

103103
void statistics_print(vlog_levels_t log_level = VLOG_DEBUG);
104104

105+
// Instructing the socket to immediately sample/un-sample the OS in receive flow
106+
void set_immediate_os_sample();
107+
void unset_immediate_os_sample();
108+
105109
static inline size_t epfd_info_node_offset(void) {return NODE_OFFSET(epfd_info, epfd_info_node);}
106110
list_node<epfd_info, epfd_info::epfd_info_node_offset> epfd_info_node;
107111

112+
bool is_os_data_available();
113+
108114
private:
109115

110116
const int m_epfd;
@@ -120,6 +126,7 @@ class epfd_info : public lock_mutex_recursive, public cleanable_obj, public wake
120126
epoll_stats_t m_local_stats;
121127
epoll_stats_t *m_stats;
122128
int m_log_invalid_events;
129+
bool m_b_os_data_available; // true when not offloaded data is available
123130

124131
int add_fd(int fd, epoll_event *event);
125132
int del_fd(int fd, bool passthrough = false);

src/vma/iomux/epoll_wait_call.cpp

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,43 @@ bool epoll_wait_call::handle_epoll_event(bool is_ready, uint32_t events, socket_
364364

365365
}
366366

367+
bool epoll_wait_call::handle_os_countdown(int &poll_os_countdown)
368+
{
369+
NOT_IN_USE(poll_os_countdown);
370+
371+
if (!m_epfd_info->is_os_data_available()) {
372+
return false;
373+
}
374+
375+
/*
376+
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
377+
* This also handles the 0 ratio case - do not poll OS at all.
378+
*/
379+
bool cq_ready = wait_os(true);
380+
381+
m_epfd_info->unset_immediate_os_sample();
382+
383+
if (cq_ready) {
384+
// This will empty the cqepfd
385+
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
386+
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
387+
}
388+
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
389+
* below after calling check_all_offloaded_sockets();
390+
* IMPORTANT : We cannot do an opposite with current code,
391+
* means we cannot poll cq and then poll os (for epoll) - because poll os
392+
* will delete ready offloaded fds.
393+
*/
394+
if (m_n_all_ready_fds) {
395+
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
396+
ring_poll_and_process_element(&m_poll_sn, NULL);
397+
check_all_offloaded_sockets(&m_poll_sn);
398+
return true;
399+
}
400+
401+
return false;
402+
}
403+
367404
int epoll_wait_call::ring_poll_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array/* = NULL*/)
368405
{
369406
return m_epfd_info->ring_poll_and_process_element(p_poll_sn, pv_fd_ready_array);

src/vma/iomux/epoll_wait_call.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,8 @@ class epoll_wait_call : public io_mux_call
113113

114114
virtual int ring_wait_for_notification_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array = NULL);
115115

116+
virtual bool handle_os_countdown(int &poll_os_countdown);
117+
116118
private:
117119
bool _wait(int timeout);
118120

src/vma/iomux/io_mux_call.cpp

Lines changed: 37 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -245,14 +245,45 @@ void io_mux_call::check_offloaded_rsockets(uint64_t *p_poll_sn)
245245
//return false;
246246
}
247247

248+
bool io_mux_call::handle_os_countdown(int &poll_os_countdown)
249+
{
250+
/*
251+
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
252+
* This also handles the 0 ratio case - do not poll OS at all.
253+
*/
254+
if (poll_os_countdown-- == 0 && m_n_sysvar_select_poll_os_ratio > 0) {
255+
bool cq_ready = wait_os(true);
256+
if (cq_ready) {
257+
// This will empty the cqepfd
258+
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
259+
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
260+
}
261+
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
262+
* below after calling check_all_offloaded_sockets();
263+
* IMPORTANT : We cannot do an opposite with current code,
264+
* means we cannot poll cq and then poll os (for epoll) - because poll os
265+
* will delete ready offloaded fds.
266+
*/
267+
if (m_n_all_ready_fds) {
268+
269+
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
270+
ring_poll_and_process_element(&m_poll_sn, NULL);
271+
check_all_offloaded_sockets(&m_poll_sn);
272+
return true;
273+
}
274+
poll_os_countdown = m_n_sysvar_select_poll_os_ratio - 1;
275+
}
276+
277+
return false;
278+
}
279+
248280
void io_mux_call::polling_loops()
249281
{
250282
int poll_counter;
251-
int check_timer_countdown;
283+
int check_timer_countdown = 1; // Poll once before checking the time
252284
int poll_os_countdown;
253285
bool multiple_polling_loops, finite_polling;
254286
timeval before_polling_timer = TIMEVAL_INITIALIZER, after_polling_timer = TIMEVAL_INITIALIZER, delta;
255-
int delta_time; // in usec
256287

257288
prepare_to_poll();
258289

@@ -262,9 +293,6 @@ void io_mux_call::polling_loops()
262293
TAKE_T_POLL_START;
263294
ZERO_POLL_COUNT;
264295
#endif
265-
266-
// Poll once before checking the time
267-
check_timer_countdown = 1;
268296

269297
/*
270298
* Give OS priority in 1 of SELECT_SKIP_OS times
@@ -308,31 +336,9 @@ void io_mux_call::polling_loops()
308336
poll_os_countdown, m_n_sysvar_select_poll_os_ratio, check_timer_countdown, *m_p_num_all_offloaded_fds,
309337
m_n_all_ready_fds, m_n_ready_rfds, m_n_ready_wfds, m_n_ready_efds, multiple_polling_loops);
310338

311-
/*
312-
* Poll OS when count down reaches zero. This honors CQ-OS ratio.
313-
* This also handles the 0 ratio case - do not poll OS at all.
314-
*/
315-
if (poll_os_countdown-- == 0 && m_n_sysvar_select_poll_os_ratio > 0) {
316-
bool cq_ready = wait_os(true);
317-
if (cq_ready) {
318-
// This will empty the cqepfd
319-
// (most likely in case of a wakeup and probably only under epoll_wait (Not select/poll))
320-
ring_wait_for_notification_and_process_element(&m_poll_sn, NULL);
321-
}
322-
/* Before we exit with ready OS fd's we'll check the CQs once more and exit
323-
* below after calling check_all_offloaded_sockets();
324-
* IMPORTANT : We cannot do an opposite with current code,
325-
* means we cannot poll cq and then poll os (for epoll) - because poll os
326-
* will delete ready offloaded fds.
327-
*/
328-
if (m_n_all_ready_fds) {
329-
330-
m_p_stats->n_iomux_os_rx_ready += m_n_all_ready_fds; // TODO: fix it - we only know all counter, not read counter
331-
ring_poll_and_process_element(&m_poll_sn, NULL);
332-
check_all_offloaded_sockets(&m_poll_sn);
333-
break;
334-
}
335-
poll_os_countdown = m_n_sysvar_select_poll_os_ratio - 1;
339+
// TODO explain break
340+
if (handle_os_countdown(poll_os_countdown)) {
341+
break;
336342
}
337343

338344
/*
@@ -385,8 +391,7 @@ void io_mux_call::polling_loops()
385391

386392
//calc accumulated polling time
387393
tv_sub(&after_polling_timer, &before_polling_timer, &delta);
388-
delta_time=tv_to_usec(&delta);
389-
g_polling_time_usec += delta_time ;
394+
g_polling_time_usec += tv_to_usec(&delta);
390395

391396
zero_polling_cpu(after_polling_timer);
392397
}

src/vma/iomux/io_mux_call.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ class io_mux_call
253253

254254
virtual int ring_wait_for_notification_and_process_element(uint64_t *p_poll_sn, void* pv_fd_ready_array = NULL);
255255

256+
virtual bool handle_os_countdown(int &poll_os_countdown);
257+
256258
/// Pointer to an array of all offloaded fd's
257259
int *m_p_all_offloaded_fds;
258260
offloaded_mode_t *m_p_offloaded_modes;

0 commit comments

Comments
 (0)