Rsocket upstream epoll#1728
Conversation
Introduce repoll_create as the rsocket equivalent of epoll_create. A background monitor thread polls registered fds via rpoll and sleeps when user events are ready, letting repoll_wait collect them without contention. Signed-off-by: Batsheva Black <bblack@nvidia.com>
Implement repoll_ctl as the rsocket equivalent of epoll_ctl. Supports EPOLL_CTL_ADD, EPOLL_CTL_MOD, and EPOLL_CTL_DEL operations with dynamic fd array growth. After modifying the fd list, sends a reload command to the monitor thread and waits for acknowledgment. Signed-off-by: Batsheva Black <bblack@nvidia.com>
bc7f366 to
14a6371
Compare
|
The first commit in this series is literally titled "git push --force batsheva rsocket_upstream_epolllibrdmacm: Add support for epoll_wait". Can you please check the patches are correct with proper titles and commit messages? |
Implement repoll_wait as the rsocket equivalent of epoll_wait. Snapshots the fd array under the lock, calls rpoll without holding it, then collects events back under the lock. Wakes the monitor thread on timeout so it can resume background polling. Future option: the monitor thread can be disabled entirely for a threadless mode where repoll_wait calls rpoll directly each time. Signed-off-by: Batsheva Black <bblack@nvidia.com>
14a6371 to
b6497b7
Compare
|
@shefty fixed, sorry for that |
The atomic_compare_exchange_strong_explicit macro used 'r' instead of '__r' and '__val' instead of '__v', causing build failures under sparse analysis when code uses atomic_compare_exchange_strong/weak. Signed-off-by: Batsheva Black <bblack@nvidia.com>
shefty
left a comment
There was a problem hiding this comment.
I didn't get to epoll_wait, but see comments on how we might restructure the fds allocation and monitor state. I'm particularly concerned that the states are racy in a way that will be difficult to debug.
| memcpy(*fds, ri->fds, sizeof(**fds) * ri->capacity); | ||
| *nfds = ri->slot_count; | ||
| return 0; | ||
| } |
There was a problem hiding this comment.
I don't understand why we're making a copy of ri->fds, rather than using ri->fds directly. If we ever hit capacity, simply grow the array by some chunk amount.
There was a problem hiding this comment.
We keep a private snapshot for the monitor on purpose.
The monitor does a blocking rpoll(), and for performance we don’t want to hold ri->lock across that call. Holding the lock there would serialize repoll_ctl/repoll_wait and hurt the hot path.
So repoll_ctl updates ri->fds under the lock, then asks the monitor to reload; the monitor refreshes its local copy and polls lock-free. This avoids lock contention and also avoids races with array growth/reallocation.
| case REPOLL_CMD_REFRESH_FDS: | ||
| if (repoll_monitor_refresh_fds(ri, &fds, &nfds)) | ||
| goto out; | ||
| repoll_monitor_ack_reload(ri); |
There was a problem hiding this comment.
The code is maintaining 2 copies of the fds. One local to the repoll thread and the other part of the repoll_info. We only need 1 copy. Pass a command to add, del, ctrl a given fd. That is write a small cmd message to the pipe, with the needed data. This thread can then wake-up and process it. There isn't a need to continually free/malloc a new array of fd's every time we report an event.
There was a problem hiding this comment.
partly addressed- commented abut the copy earlier
| if (!new_fds) | ||
| return -ENOMEM; | ||
| for (int i = 0; i < new_max; ++i) | ||
| new_fds[i].fd = -1; |
There was a problem hiding this comment.
i can start at the previous capacity, since the memcpy below is going to immediately write over the values.
| return ri->slot_count++; | ||
|
|
||
| for (int i = 0; i < ri->slot_count; ++i) { | ||
| if (ri->fds[i].fd <= 0) |
There was a problem hiding this comment.
This check should be fd == -1. But we shouldn't need to search for a free slot. Always add new entries to the end (active_fds). When removing an entry, swap the current one at the end to the location of the removed fd and decrement active_fds. We don't need to maintain order on the fds.
| state = REPOLL_MONITOR_RELOADED; | ||
| while (atomic_load(&ri->monitor_state) != REPOLL_MONITOR_SLEEPING && | ||
| !atomic_compare_exchange_weak(&ri->monitor_state, &state, | ||
| REPOLL_MONITOR_RUNNING)) |
There was a problem hiding this comment.
MONITOR_RELOADED shouldn't really be a state of the monitor. Reloading is a command.
I think the states are ending up racy. We have threads on both sides of the monitor trying to direct the state without some higher-level coordination. The monitor state should really be driven entirely by the monitor thread. Threads on this side should basically be communicating by writing commands to the pipe. The only time the threads should worry about the state is during startup and shutdown.
There was a problem hiding this comment.
Thanks - agreed that RELOADED is better modeled as a command than a monitor state.
I tested such a change but it introduced a large throughput regression, so the downside appears to be synchronization/wake-path overhead.
I thought about it and in practice, with the current approach, the remaining race risk seems bounded: epoll_wait re-polls anyway, the monitor is just to drive the events further, so impact is mostly occasional latency rather than functional breakage. Do you agree?
Remove unnecessary explicit enum assignments and use clearer symbolic constants for repoll internals so monitor state handling is easier to read and maintain. Signed-off-by: bblack <bblack@nvidia.com>
Rename monitor lifecycle tracking fields to explicit lifecycle names instead of boolean-style state so startup/shutdown transitions are clear and less error-prone. Signed-off-by: bblack <bblack@nvidia.com>
Keep and grow the monitor pollfd buffer with capacity tracking rather than reallocating from scratch on every refresh to reduce allocation churn in steady-state operation. Signed-off-by: bblack <bblack@nvidia.com>
Simplify monitor sleep/wake flow and lifecycle checks so monitor loop control is easier to follow while preserving existing behavior. Signed-off-by: bblack <bblack@nvidia.com>
Start the monitor thread only after repoll instance registration is complete and align unwind paths with that order to avoid partial startup visibility. Signed-off-by: bblack <bblack@nvidia.com>
Use append/swap compaction for repoll fd slots and guard monitor wake transitions under lock so slot management and wake logic stay consistent under concurrent updates. Signed-off-by: bblack <bblack@nvidia.com>
| memcpy(local_fds, ri->fds + 1, sizeof(*local_fds) * nfds); | ||
| pthread_mutex_unlock(&ri->lock); | ||
|
|
||
| ret = rpoll(local_fds, nfds, timeout); |
There was a problem hiding this comment.
I think it will be easier to discuss the changes if we focus on specific aspects of API correctness. I don't want to focus on a single application use case.
epoll_wait() may be called with a timeout of 0 or non-zero. For a timeout of 0, we need something like this:
if (!timeout) {
lock(ri->lock);
rpoll(ri->fds, ri->nfds, 0);
repoll_collect_events(ri, events, maxevents);
unlock(ri->lock);
}
I'm holding the lock around rpoll() and collect_events() to protect against changes to the ri->fds array.
The problem is handling a non-zero timeout, especially if it's infinite. The !timeout flow above doesn't work because it blocks changes to the epoll fd list (ri->fds). Making a copy of ri->fds (current patch) also doesn't work because the copy is not updated with changes to ri->fds. That is, if another thread wants to delete an fd or add a new one, the thread in repoll_wait() will not see those changes. This is a behavior difference from epoll.
Fixing that behavior requires reworking the design. This is why the libfabric implementation uses signaling from the epoll_ctrl calls to unblock a waiting thread:
https://github.com/ofiwg/libfabric/blob/main/src/common.c#L1727
In that implementation, all changes to the epoll fd array are done by whichever thread is calling epoll_wait. It doesn't have a separate epoll 'progress' thread.
To align with the general design being proposed here, I think a flow like this might work:
do {
/* always go through !timeout flow for fast check */
lock(ri->lock);
rpoll(ri->fds, ri->nfds, 0);
repoll_collect_events(ri, events, maxevents);
unlock(ri->lock);
if (!timeout || we have events to return)
return;
wakeup_monitor(ri);
wait on signal from monitor with for timeout duration
on wake-up, adjust timeout by time spent waiting
} while (1);
The monitor thread will call rpoll until it has events to report. Once it finds any events, it signals any waiting thread to wakeup and recheck.
Changes to the fd's being monitored (epoll_ctl calls) should be processed by the monitor thread, similar to the code linked to above. This ensures that the fd's are monitored correctly.
Rework repoll wait/monitor synchronization to use generation-based wakeups and coalesced nonblocking reload commands so epoll_ctl does not block on command pipe backpressure under benchmark load. Signed-off-by: bblack <bblack@nvidia.com>
Add epoll support for rsocket via
repoll_create,repoll_ctl, andrepoll_wait.A single monitor thread per epoll instance uses
rpollinternally to poll rsocket fds. The flow:rpoll(blocking) on all registered fds (idle, zero CPU)rpollreturns with events, the thread goes to sleepepoll_wait, it collects the events, returns them, and wakes the threadepoll_ctlchanges notify the thread via a command pipePreload wrappers (
epoll_create,epoll_ctl,epoll_wait) are included so existing applications work transparently withLD_PRELOAD.