Skip to content

Commit b530ed6

Browse files
committed
Allow async/await on multiple file descriptors
Provide two new public APIs: lwan_request_awaitv_any() and lwan_request_awaitv_all(), which, respectively, will await for an operation on at least one of the awaited file descriptors, returning the one that unblocked the coroutine, and for all the awaited file descriptors. The APIs are experimental but you can already see how much it improves the chat implementation of the websockets sample: now, instead of having to poll both the websocket and the pub/sub subscription, and wait a few milliseconds, it now instantaneously wakes up when there's data in either one of them, and processes only what has data. The chat now feels like a proper chat app (well, within reason for that crude app, but you get the idea). (As a side effect: we now send websocket pings periodically.) There's a lot to clean up here, but I'm tired and this will be done eventually.
1 parent 945fd78 commit b530ed6

9 files changed

+323
-91
lines changed

src/lib/liblwan.sym

+1
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ global:
7373
lwan_handler_info_*;
7474

7575
lwan_request_await_*;
76+
lwan_request_awaitv_*;
7677
lwan_request_async_*;
7778

7879
lwan_straitjacket_enforce*;

src/lib/lwan-private.h

+2
Original file line numberDiff line numberDiff line change
@@ -283,3 +283,5 @@ void lwan_request_foreach_header_for_cgi(struct lwan_request *request,
283283
size_t value_len,
284284
void *user_data),
285285
void *user_data);
286+
287+
bool lwan_send_websocket_ping_for_tq(struct lwan_connection *conn);

src/lib/lwan-request.c

-32
Original file line numberDiff line numberDiff line change
@@ -2083,38 +2083,6 @@ __attribute__((used)) int fuzz_parse_http_request(const uint8_t *data,
20832083
}
20842084
#endif
20852085

2086-
static inline int64_t
2087-
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
2088-
{
2089-
return (int64_t)(((uint64_t)fd << 32 | event));
2090-
}
2091-
2092-
static inline struct lwan_connection *async_await_fd(
2093-
struct coro *coro, int fd, enum lwan_connection_coro_yield events)
2094-
{
2095-
assert(events >= CONN_CORO_ASYNC_AWAIT_READ &&
2096-
events <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);
2097-
2098-
int64_t from_coro = coro_yield(coro, make_async_yield_value(fd, events));
2099-
return (struct lwan_connection *)(intptr_t)from_coro;
2100-
}
2101-
2102-
struct lwan_connection *lwan_request_await_read(struct lwan_request *r, int fd)
2103-
{
2104-
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ);
2105-
}
2106-
2107-
struct lwan_connection *lwan_request_await_write(struct lwan_request *r, int fd)
2108-
{
2109-
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
2110-
}
2111-
2112-
struct lwan_connection *lwan_request_await_read_write(struct lwan_request *r,
2113-
int fd)
2114-
{
2115-
return async_await_fd(r->conn->coro, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
2116-
}
2117-
21182086
ssize_t lwan_request_async_read_flags(
21192087
struct lwan_request *request, int fd, void *buf, size_t len, int flags)
21202088
{

src/lib/lwan-strbuf.c

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#include <stdlib.h>
2828
#include <string.h>
2929
#include <sys/stat.h>
30+
#include <unistd.h>
3031

3132
#include "lwan-private.h"
3233

src/lib/lwan-thread.c

+178-12
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ conn_flags_to_epoll_events(enum lwan_connection_flags flags)
559559
return EPOLL_EVENTS(flags);
560560
}
561561

562-
static void update_epoll_flags(const struct timeout_queue *tq,
562+
static void update_epoll_flags(const struct lwan *lwan,
563563
struct lwan_connection *conn,
564564
int epoll_fd,
565565
enum lwan_connection_coro_yield yield_result)
@@ -609,7 +609,7 @@ static void update_epoll_flags(const struct timeout_queue *tq,
609609

610610
struct epoll_event event = {.events = conn_flags_to_epoll_events(conn->flags),
611611
.data.ptr = conn};
612-
int fd = lwan_connection_get_fd(tq->lwan, conn);
612+
int fd = lwan_connection_get_fd(lwan, conn);
613613

614614
if (UNLIKELY(epoll_ctl(epoll_fd, EPOLL_CTL_MOD, fd, &event) < 0))
615615
lwan_status_perror("epoll_ctl");
@@ -619,7 +619,11 @@ static void unasync_await_conn(void *data1, void *data2)
619619
{
620620
struct lwan_connection *async_fd_conn = data1;
621621

622-
async_fd_conn->flags &= ~(CONN_ASYNC_AWAIT | CONN_HUNG_UP);
622+
async_fd_conn->flags &=
623+
~(CONN_ASYNC_AWAIT | CONN_HUNG_UP | CONN_ASYNC_AWAIT_MULTIPLE);
624+
assert(async_fd_conn->parent);
625+
async_fd_conn->parent->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;
626+
623627
async_fd_conn->thread = data2;
624628

625629
/* If this file descriptor number is used again in the future as an HTTP
@@ -635,9 +639,9 @@ static void unasync_await_conn(void *data1, void *data2)
635639
}
636640

637641
static enum lwan_connection_coro_yield
638-
resume_async(const struct timeout_queue *tq,
642+
resume_async(const struct lwan *l,
639643
enum lwan_connection_coro_yield yield_result,
640-
int64_t from_coro,
644+
int await_fd,
641645
struct lwan_connection *conn,
642646
int epoll_fd)
643647
{
@@ -646,7 +650,6 @@ resume_async(const struct timeout_queue *tq,
646650
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_EVENTS_WRITE,
647651
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_EVENTS_READ_WRITE,
648652
};
649-
int await_fd = (int)((uint64_t)from_coro >> 32);
650653
enum lwan_connection_flags flags;
651654
int op;
652655

@@ -656,7 +659,7 @@ resume_async(const struct timeout_queue *tq,
656659

657660
flags = to_connection_flags[yield_result];
658661

659-
struct lwan_connection *await_fd_conn = &tq->lwan->conns[await_fd];
662+
struct lwan_connection *await_fd_conn = &l->conns[await_fd];
660663
if (LIKELY(await_fd_conn->flags & CONN_ASYNC_AWAIT)) {
661664
if (LIKELY((await_fd_conn->flags & CONN_EVENTS_MASK) == flags))
662665
return CONN_CORO_SUSPEND;
@@ -697,6 +700,168 @@ resume_async(const struct timeout_queue *tq,
697700
return CONN_CORO_ABORT;
698701
}
699702

703+
struct flag_update {
704+
unsigned int num_awaiting;
705+
enum lwan_connection_coro_yield request_conn_yield;
706+
};
707+
708+
static struct flag_update
709+
update_flags_for_async_awaitv(struct lwan_request *r, struct lwan *l, va_list ap)
710+
{
711+
int epoll_fd = r->conn->thread->epoll_fd;
712+
struct flag_update update = {.num_awaiting = 0,
713+
.request_conn_yield = CONN_CORO_YIELD};
714+
715+
while (true) {
716+
int await_fd = va_arg(ap, int);
717+
if (await_fd < 0) {
718+
return update;
719+
}
720+
721+
enum lwan_connection_coro_yield events =
722+
va_arg(ap, enum lwan_connection_coro_yield);
723+
if (UNLIKELY(events < CONN_CORO_ASYNC_AWAIT_READ ||
724+
events > CONN_CORO_ASYNC_AWAIT_READ_WRITE)) {
725+
lwan_status_error("awaitv() called with invalid events");
726+
coro_yield(r->conn->coro, CONN_CORO_ABORT);
727+
__builtin_unreachable();
728+
}
729+
730+
struct lwan_connection *conn = &l->conns[await_fd];
731+
732+
if (UNLIKELY(conn->flags & CONN_ASYNC_AWAIT_MULTIPLE)) {
733+
lwan_status_debug("ignoring second awaitv call on same fd: %d",
734+
await_fd);
735+
continue;
736+
}
737+
738+
conn->flags |= CONN_ASYNC_AWAIT_MULTIPLE;
739+
update.num_awaiting++;
740+
741+
if (await_fd == r->fd) {
742+
static const enum lwan_connection_coro_yield to_request_yield[] = {
743+
[CONN_CORO_ASYNC_AWAIT_READ] = CONN_CORO_WANT_READ,
744+
[CONN_CORO_ASYNC_AWAIT_WRITE] = CONN_CORO_WANT_WRITE,
745+
[CONN_CORO_ASYNC_AWAIT_READ_WRITE] = CONN_CORO_WANT_READ_WRITE,
746+
};
747+
748+
update.request_conn_yield = to_request_yield[events];
749+
continue;
750+
}
751+
752+
events = resume_async(l, events, await_fd, r->conn, epoll_fd);
753+
if (UNLIKELY(events == CONN_CORO_ABORT)) {
754+
lwan_status_error("could not register fd for async operation");
755+
coro_yield(r->conn->coro, CONN_CORO_ABORT);
756+
__builtin_unreachable();
757+
}
758+
}
759+
}
760+
761+
static void reset_conn_async_await_multiple_flag(struct lwan_connection *conns,
762+
va_list ap)
763+
{
764+
while (true) {
765+
int await_fd = va_arg(ap, int);
766+
if (await_fd < 0)
767+
return;
768+
769+
struct lwan_connection *conn = &conns[await_fd];
770+
conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;
771+
772+
LWAN_NO_DISCARD(va_arg(ap, enum lwan_connection_coro_yield));
773+
}
774+
}
775+
776+
int lwan_request_awaitv_any(struct lwan_request *r, ...)
777+
{
778+
struct lwan *l = r->conn->thread->lwan;
779+
va_list ap;
780+
781+
va_start(ap, r);
782+
reset_conn_async_await_multiple_flag(l->conns, ap);
783+
va_end(ap);
784+
785+
va_start(ap, r);
786+
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
787+
va_end(ap);
788+
789+
while (true) {
790+
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
791+
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;
792+
793+
if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) {
794+
va_start(ap, r);
795+
reset_conn_async_await_multiple_flag(l->conns, ap);
796+
va_end(ap);
797+
798+
return lwan_connection_get_fd(l, conn);
799+
}
800+
}
801+
}
802+
803+
void lwan_request_awaitv_all(struct lwan_request *r, ...)
804+
{
805+
struct lwan *l = r->conn->thread->lwan;
806+
va_list ap;
807+
808+
va_start(ap, r);
809+
reset_conn_async_await_multiple_flag(l->conns, ap);
810+
va_end(ap);
811+
812+
va_start(ap, r);
813+
struct flag_update update = update_flags_for_async_awaitv(r, l, ap);
814+
va_end(ap);
815+
816+
while (update.num_awaiting) {
817+
int64_t v = coro_yield(r->conn->coro, update.request_conn_yield);
818+
struct lwan_connection *conn = (struct lwan_connection *)(uintptr_t)v;
819+
820+
if (conn->flags & CONN_ASYNC_AWAIT_MULTIPLE) {
821+
conn->flags &= ~CONN_ASYNC_AWAIT_MULTIPLE;
822+
update.num_awaiting--;
823+
}
824+
}
825+
}
826+
827+
static inline int64_t
828+
make_async_yield_value(int fd, enum lwan_connection_coro_yield event)
829+
{
830+
assert(event >= CONN_CORO_ASYNC_AWAIT_READ &&
831+
event <= CONN_CORO_ASYNC_AWAIT_READ_WRITE);
832+
833+
return (int64_t)(((uint64_t)fd << 32 | event));
834+
}
835+
836+
static inline int async_await_fd(struct lwan_connection *conn,
837+
int fd,
838+
enum lwan_connection_coro_yield events)
839+
{
840+
int64_t yield_value = make_async_yield_value(fd, events);
841+
int64_t from_coro = coro_yield(conn->coro, yield_value);
842+
struct lwan_connection *conn_from_coro =
843+
(struct lwan_connection *)(intptr_t)from_coro;
844+
845+
assert(conn_from_coro->flags & CONN_ASYNC_AWAIT);
846+
847+
return lwan_connection_get_fd(conn->thread->lwan, conn_from_coro);
848+
}
849+
850+
inline int lwan_request_await_read(struct lwan_request *r, int fd)
851+
{
852+
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ);
853+
}
854+
855+
inline int lwan_request_await_write(struct lwan_request *r, int fd)
856+
{
857+
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_WRITE);
858+
}
859+
860+
inline int lwan_request_await_read_write(struct lwan_request *r, int fd)
861+
{
862+
return async_await_fd(r->conn, fd, CONN_CORO_ASYNC_AWAIT_READ_WRITE);
863+
}
864+
700865
static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
701866
struct lwan_connection *conn_to_resume,
702867
struct lwan_connection *conn_to_yield,
@@ -710,14 +875,15 @@ static ALWAYS_INLINE void resume_coro(struct timeout_queue *tq,
710875
enum lwan_connection_coro_yield yield_result = from_coro & 0xffffffff;
711876

712877
if (UNLIKELY(yield_result >= CONN_CORO_ASYNC)) {
713-
yield_result =
714-
resume_async(tq, yield_result, from_coro, conn_to_resume, epoll_fd);
878+
int await_fd = (int)((uint64_t)from_coro >> 32);
879+
yield_result = resume_async(tq->lwan, yield_result, await_fd,
880+
conn_to_resume, epoll_fd);
715881
}
716882

717883
if (UNLIKELY(yield_result == CONN_CORO_ABORT)) {
718884
timeout_queue_expire(tq, conn_to_resume);
719885
} else {
720-
update_epoll_flags(tq, conn_to_resume, epoll_fd, yield_result);
886+
update_epoll_flags(tq->lwan, conn_to_resume, epoll_fd, yield_result);
721887
timeout_queue_move_to_last(tq, conn_to_resume);
722888
}
723889
}
@@ -787,7 +953,7 @@ static bool process_pending_timers(struct timeout_queue *tq,
787953
}
788954

789955
request = container_of(timeout, struct lwan_request, timeout);
790-
update_epoll_flags(tq, request->conn, epoll_fd, CONN_CORO_RESUME);
956+
update_epoll_flags(tq->lwan, request->conn, epoll_fd, CONN_CORO_RESUME);
791957
}
792958

793959
if (should_expire_timers) {
@@ -1452,7 +1618,7 @@ void lwan_thread_init(struct lwan *l)
14521618

14531619
for (unsigned int i = 0; i < l->thread.count; i++) {
14541620
struct lwan_thread *thread;
1455-
1621+
14561622
if (schedtbl) {
14571623
/* For SO_ATTACH_REUSEPORT_CBPF to work with the program
14581624
* we provide the kernel, sockets have to be added to the

src/lib/lwan-tq.c

+8-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,14 @@ void timeout_queue_expire_waiting(struct timeout_queue *tq)
112112
if (conn->time_to_expire > tq->current_time)
113113
return;
114114

115-
timeout_queue_expire(tq, conn);
115+
if (LIKELY(!(conn->flags & CONN_IS_WEBSOCKET))) {
116+
timeout_queue_expire(tq, conn);
117+
} else {
118+
if (LIKELY(lwan_send_websocket_ping_for_tq(conn)))
119+
timeout_queue_move_to_last(tq, conn);
120+
else
121+
timeout_queue_expire(tq, conn);
122+
}
116123
}
117124

118125
/* Timeout queue exhausted: reset epoch */

0 commit comments

Comments
 (0)