Skip to content

Commit a571567

Browse files
committed
Foundational work to allow async/await on multiple file descriptors
A slightly different version of this patch has been sitting on a local branch for almost two years, so I decided to take a look this weekend and ended up rewriting it this weekend so it's compatible with the current version of Lwan's event loop. While this won't allow a request handler to await on multiple file descriptors just yet, it lays down the foundations to do so by making all the necessary plumbing changes to allow a coro_yield() from a request handler to receive a pointer to the struct lwan_connection that caused the request handler to be awoken. The actual changes are minimal (with little to no overhead), but some comments had to be added throghout to explain certain things as the ownership of certain values isn't clear from the struct definition alone anymore. No APIs have been implemented to allow waiting for multiple file descriptors at once yet, but they will probably follow in the next few weeks. I got to make sure it also works with the HTTP connection coroutine as well as any other file descriptor a handler might decide to use (e.g. a pubsub subscription); this would allow, for instance, a chat application to wait on both the WebSocket connection and the pubsub subscription and react as fast as it's able to. Let's see what I end up coming up with. Tested to compile only; have not tested if this feature works yet, but normal requests seems to work fine.
1 parent c82374b commit a571567

File tree

4 files changed

+128
-33
lines changed

4 files changed

+128
-33
lines changed

src/lib/lwan-request.c

+8-7
Original file line numberDiff line numberDiff line change
@@ -2023,27 +2023,28 @@ make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
20232023
return (int64_t)(((uint64_t)fd << 32 | event));
20242024
}
20252025

2026-
static inline void async_await_fd(struct coro *coro,
2027-
int fd,
2028-
enum lwan_connection_coro_yield events)
2026+
static inline struct lwan_connection *async_await_fd(
2027+
struct coro *coro, int fd, enum lwan_connection_coro_yield events)
20292028
{
20302029
assert(events >= CONN_CORO_ASYNC_AWAIT_READ &&
20312030
events <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);
20322031

2033-
return (void)coro_yield(coro, make_async_yield_value(fd, events));
2032+
int64_t from_coro = coro_yield(coro, make_async_yield_value(fd, events));
2033+
return (struct lwan_connection *)(intptr_t)from_coro;
20342034
}
20352035

2036-
void lwan_request_await_read(struct lwan_request *r, int fd)
2036+
struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd)
20372037
{
20382038
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ);
20392039
}
20402040

2041-
void lwan_request_await_write(struct lwan_request *r, int fd)
2041+
struct lwan_connection *lwan_request_await_write(struct lwan_request *r, int fd)
20422042
{
20432043
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
20442044
}
20452045

2046-
void lwan_request_await_read_write(struct lwan_request *r, int fd)
2046+
struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r,
2047+
int fd)
20472048
{
20482049
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
20492050
}

src/lib/lwan-thread.c

+79-20
Original file line numberDiff line numberDiff line change
@@ -522,11 +522,23 @@ static void update_epoll_flags(const struct timeout_queue *tq,
522522
lwan_status_perror("epoll_ctl");
523523
}
524524

525-
static void clear_async_await_flag(void *data)
525+
static void unasync_await_conn(void *data1, void *data2)
526526
{
527-
struct lwan_connection *async_fd_conn = data;
527+
struct lwan_connection *async_fd_conn = data1;
528528

529-
async_fd_conn->flags &= ~CONN_ASYNC_AWAIT;
529+
async_fd_conn->flags &= ~(CONN_ASYNC_AWAIT | CONN_HUNG_UP);
530+
async_fd_conn->thread = data2;
531+
532+
/* If this file descriptor number is used again in the future as an HTTP
533+
* connection, we need the coro pointer to be NULL so a new coroutine is
534+
* created! */
535+
async_fd_conn->coro = NULL;
536+
537+
/* While not strictly necessary, make sure that prev/next point to
538+
* something valid rather than whatever junk was left from when their
539+
* storage was used for the parent pointer. */
540+
async_fd_conn->prev = -1;
541+
async_fd_conn->next = -1;
530542
}
531543

532544
static enum lwan_connection_coro_yield
@@ -558,13 +570,31 @@ resume_async(const struct timeout_queue *tq,
558570

559571
op = EPOLL_CTL_MOD;
560572
} else {
573+
await_fd_conn->parent = conn;
574+
575+
/* We assert() in the timeout queue that we're not freeing a
576+
* coroutine when CONN_ASYNC_AWAIT is set in the connection, and are
577+
* careful to not ever do that. This makes us get away with struct
578+
* coro not being refcounted, even though this kinda feels like
579+
* running with scissors. */
580+
assert(!await_fd_conn->coro);
581+
await_fd_conn->coro = conn->coro;
582+
583+
/* Since scheduling is performed during startup, we gotta take note
584+
* of which thread was originally supposed to handle this particular
585+
* file descriptor once we're done borrowing this lwan_connection
586+
* for the awaited file descriptor. */
587+
struct lwan_thread *old_thread = await_fd_conn->thread;
588+
await_fd_conn->thread = conn->thread;
589+
561590
op = EPOLL_CTL_ADD;
562591
flags |= CONN_ASYNC_AWAIT;
563-
coro_defer(conn->coro, clear_async_await_flag, await_fd_conn);
592+
593+
coro_defer2(conn->coro, unasync_await_conn, await_fd_conn, old_thread);
564594
}
565595

566596
struct epoll_event event = {.events = conn_flags_to_epoll_events(flags),
567-
.data.ptr = conn};
597+
.data.ptr = await_fd_conn};
568598
if (LIKELY(!epoll_ctl(epoll_fd, op, await_fd, &event))) {
569599
await_fd_conn->flags &= ~CONN_EVENTS_MASK;
570600
await_fd_conn->flags |= flags;
@@ -575,24 +605,27 @@ resume_async(const struct timeout_queue *tq,
575605
}
576606

577607
static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
578-
struct lwan_connection *conn,
608+
struct lwan_connection *conn_to_resume,
609+
struct lwan_connection *conn_to_yield,
579610
int epoll_fd)
580611
{
581-
assert(conn->coro);
612+
assert(conn_to_resume->coro);
613+
assert(conn_to_yield->coro);
582614

583-
int64_t from_coro = coro_resume(conn->coro);
615+
int64_t from_coro = coro_resume_value(conn_to_resume->coro,
616+
(int64_t)(intptr_t)conn_to_yield);
584617
enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff;
585618

586619
if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) {
587620
yield_result =
588-
resume_async(tq, yield_result, from_coro, conn, epoll_fd);
621+
resume_async(tq, yield_result, from_coro, conn_to_resume, epoll_fd);
589622
}
590623

591624
if (UNLIKELY(yield_result == CONN_CORO_ABORT)) {
592-
timeout_queue_expire(tq, conn);
625+
timeout_queue_expire(tq, conn_to_resume);
593626
} else {
594-
update_epoll_flags(tq, conn, epoll_fd, yield_result);
595-
timeout_queue_move_to_last(tq, conn);
627+
update_epoll_flags(tq, conn_to_resume, epoll_fd, yield_result);
628+
timeout_queue_move_to_last(tq, conn_to_resume);
596629
}
597630
}
598631

@@ -696,8 +729,7 @@ static ALWAYS_INLINE bool spawn_coro(struct lwan_connection *conn,
696729
#endif
697730

698731
assert(!conn->coro);
699-
assert(!(conn->flags & CONN_ASYNC_AWAIT));
700-
assert(!(conn->flags & CONN_AWAITED_FD));
732+
assert(!(conn->flags & (CONN_ASYNC_AWAIT | CONN_HUNG_UP)));
701733
assert(!(conn->flags & CONN_LISTENER));
702734
assert(t);
703735
assert((uintptr_t)t >= (uintptr_t)tq->lwan->thread.threads);
@@ -978,7 +1010,36 @@ static void *thread_io_loop(void *data)
9781010
for (struct epoll_event *event = events; n_fds--; event++) {
9791011
struct lwan_connection *conn = event->data.ptr;
9801012

981-
assert(!(conn->flags & CONN_ASYNC_AWAIT));
1013+
if (conn->flags & CONN_ASYNC_AWAIT) {
1014+
/* Assert that the connection is part of the conns array,
1015+
* since the storage for conn->parent is shared with
1016+
* prev/next. */
1017+
assert(conn->parent >= lwan->conns);
1018+
assert(conn->parent <= &lwan->conns[lwan->thread.max_fd]);
1019+
1020+
/* Also validate that conn->parent is in fact a HTTP client
1021+
* connection and not an awaited fd! */
1022+
assert(!(conn->parent->flags & CONN_ASYNC_AWAIT));
1023+
1024+
/* CONN_ASYNC_AWAIT conns *must* have a coro and thread as
1025+
* it's the same as the HTTP client coro for API
1026+
* consistency, as struct lwan_connection isn't opaque. (If
1027+
* it were opaque, or at least a private API, though, we
1028+
* might be able to get away with reusing the space for
1029+
* these two pointers for something else in some cases.
1030+
* This has not been necessary yet, but might become useful
1031+
* in the future.) */
1032+
assert(conn->coro);
1033+
assert(conn->coro == conn->parent->coro);
1034+
assert(conn->thread == conn->parent->thread);
1035+
1036+
if (UNLIKELY(events->events & (EPOLLRDHUP | EPOLLHUP)))
1037+
conn->flags |= CONN_HUNG_UP;
1038+
1039+
resume_coro(&tq, conn->parent, conn, epoll_fd);
1040+
1041+
continue;
1042+
}
9821043

9831044
if (conn->flags & CONN_LISTENER) {
9841045
if (LIKELY(accept_waiting_clients(t, conn)))
@@ -989,10 +1050,8 @@ static void *thread_io_loop(void *data)
9891050
}
9901051

9911052
if (UNLIKELY(event->events & (EPOLLRDHUP | EPOLLHUP))) {
992-
if ((conn->flags & CONN_AWAITED_FD) != CONN_SUSPENDED_MASK) {
993-
timeout_queue_expire(&tq, conn);
994-
continue;
995-
}
1053+
timeout_queue_expire(&tq, conn);
1054+
continue;
9961055
}
9971056

9981057
if (!conn->coro) {
@@ -1004,7 +1063,7 @@ static void *thread_io_loop(void *data)
10041063
created_coros = true;
10051064
}
10061065

1007-
resume_coro(&tq, conn, epoll_fd);
1066+
resume_coro(&tq, conn, conn, epoll_fd);
10081067
}
10091068

10101069
if (created_coros)

src/lib/lwan-tq.c

+4
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ timeout_queue_idx_to_node(struct timeout_queue *tq, int idx)
3838
inline void timeout_queue_insert(struct timeout_queue *tq,
3939
struct lwan_connection *new_node)
4040
{
41+
assert(!(new_node->flags & (CONN_HUNG_UP | CONN_ASYNC_AWAIT)));
42+
4143
new_node->next = -1;
4244
new_node->prev = tq->head.prev;
4345
struct lwan_connection *prev = timeout_queue_idx_to_node(tq, tq->head.prev);
@@ -87,6 +89,8 @@ void timeout_queue_init(struct timeout_queue *tq, const struct lwan *lwan)
8789
void timeout_queue_expire(struct timeout_queue *tq,
8890
struct lwan_connection *conn)
8991
{
92+
assert(!(conn->flags & (CONN_HUNG_UP | CONN_ASYNC_AWAIT)));
93+
9094
timeout_queue_remove(tq, conn);
9195

9296
if (LIKELY(conn->coro)) {

src/lib/lwan.h

+37-6
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,6 @@ enum lwan_connection_flags {
275275
CONN_SUSPENDED_MASK = 1 << 5,
276276
CONN_SUSPENDED = (EPOLLRDHUP << CONN_EPOLL_EVENT_SHIFT) | CONN_SUSPENDED_MASK,
277277
CONN_HAS_REMOVE_SLEEP_DEFER = 1 << 6,
278-
CONN_AWAITED_FD = CONN_SUSPENDED_MASK | CONN_HAS_REMOVE_SLEEP_DEFER,
279278

280279
/* Used when HTTP pipelining has been detected. This enables usage of the
281280
* MSG_MORE flags when sending responses to batch as many short responses
@@ -298,7 +297,12 @@ enum lwan_connection_flags {
298297

299298
CONN_USE_DYNAMIC_BUFFER = 1 << 12,
300299

301-
CONN_FLAG_LAST = CONN_USE_DYNAMIC_BUFFER,
300+
/* Only valid when CONN_ASYNC_AWAIT is set. Set on file descriptors that
301+
* got (EPOLLHUP|EPOLLRDHUP) events from epoll so that request handlers
302+
* can deal with this fact. */
303+
CONN_HUNG_UP = 1 << 13,
304+
305+
CONN_FLAG_LAST = CONN_HUNG_UP,
302306
};
303307

304308
static_assert(CONN_FLAG_LAST < ((1 << 15) - 1),
@@ -373,10 +377,35 @@ struct lwan_connection {
373377
/* This structure is exactly 32-bytes on x86-64. If it is changed,
374378
* make sure the scheduler (lwan-thread.c) is updated as well. */
375379
enum lwan_connection_flags flags;
380+
376381
unsigned int time_to_expire;
382+
377383
struct coro *coro;
378384
struct lwan_thread *thread;
379-
int prev, next; /* for timeout queue */
385+
386+
/* This union is here to support async/await when a handler is waiting
387+
* on multiple file descriptors. By storing a pointer to the parent
388+
* connection here, we're able to register the awaited file descriptor
389+
* in epoll using a pointer to the awaited file descriptor struct,
390+
* allowing us to yield to the handler this information and signal which
391+
* file descriptor caused the handler to be awoken. (We can yield just
392+
* the file descriptor plus another integer with values to signal things
393+
* like timeouts and whatnot. Future problems!)
394+
*
395+
* Also, when CONN_ASYNC_AWAIT is set, `coro` points to parent->coro,
396+
* so that conn->coro is consistently usable. Gotta be careful though,
397+
* because struct coros are not refcounted and this could explode with
398+
* a double free. */
399+
union {
400+
/* For HTTP client connections handling inside the timeout queue */
401+
struct {
402+
int prev;
403+
int next;
404+
};
405+
406+
/* For awaited file descriptor, only valid if flags&CONN_ASYNC_AWAIT */
407+
struct lwan_connection *parent;
408+
};
380409
};
381410

382411
struct lwan_proxy {
@@ -639,9 +668,11 @@ void lwan_response_websocket_write_binary(struct lwan_request *request);
639668
int lwan_response_websocket_read(struct lwan_request *request);
640669
int lwan_response_websocket_read_hint(struct lwan_request *request, size_t size_hint);
641670

642-
void lwan_request_await_read(struct lwan_request *r, int fd);
643-
void lwan_request_await_write(struct lwan_request *r, int fd);
644-
void lwan_request_await_read_write(struct lwan_request *r, int fd);
671+
struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd);
672+
struct lwan_connection *lwan_request_await_write(struct lwan_request *r,
673+
int fd);
674+
struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r,
675+
int fd);
645676
ssize_t lwan_request_async_read(struct lwan_request *r, int fd, void *buf, size_t len);
646677
ssize_t lwan_request_async_read_flags(struct lwan_request *request, int fd, void *buf, size_t len, int flags);
647678
ssize_t lwan_request_async_write(struct lwan_request *r, int fd, const void *buf, size_t len);

0 commit comments

Comments
 (0)