Skip to content

Commit d7e092d

Browse files
committed
sd-varlink: make sd-varlink fiber-aware
Add varlink_server_bind_fiber() and varlink_server_bind_fiber_many() in varlink-util.{c,h} for registering a method handler that should run on a dedicated fiber per dispatch. The fiber-bound methods live in a separate s->fiber_methods map alongside the regular s->methods; bind_internal()/bind_many_internal() are factored out so the regular and fiber bind variants share their parsing/insertion code. Registering the same method in both maps is rejected because the dispatcher consults the regular map first and would otherwise silently shadow the fiber binding. varlink_dispatch_fiber() builds a VarlinkFiberData (refs to the connection, parameters, and method name), spawns a fiber via sd_fiber_new(), and makes the future floating so the fiber self-manages its lifetime — neither the dispatcher nor the connection has to track it. The fiber's priority is set to one below the connection's quit event source so that on graceful shutdown the fiber's exit handler fires (and runs its cleanup) before varlink's quit_callback() closes the connection underneath it; this is what lets a fiber-bound handler reply or flush its sentinel on a still-open connection during shutdown. The connection state transitions are reordered so they happen before the fiber spawn rather than after the synchronous callback returns: the fiber runs after dispatch has already moved past PROCESSING, which matches the behaviour expected for a deferred reply (the fiber may either reply immediately, or stash the connection and reply later, in which case the post-callback logic treats it as a PENDING_METHOD). Note that all the synchronous varlink APIs (sd_varlink_call() and friends) already behave properly when on a fiber because they call json_stream_wait() which calls ppoll_usec() which we already fixed to suspend when called from a fiber. The client/server varlink tests are migrated to fibers (threads → mock server fibers on the same event loop) to exercise the new paths.
1 parent 365520f commit d7e092d

4 files changed

Lines changed: 529 additions & 141 deletions

File tree

src/libsystemd/sd-varlink/sd-varlink.c

Lines changed: 257 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include "sd-daemon.h"
88
#include "sd-event.h"
9+
#include "sd-future.h"
910
#include "sd-varlink.h"
1011

1112
#include "alloc-util.h"
@@ -37,6 +38,7 @@
3738
#include "varlink-internal.h"
3839
#include "varlink-io.systemd.h"
3940
#include "varlink-org.varlink.service.h"
41+
#include "varlink-util.h"
4042

4143
#define VARLINK_DEFAULT_CONNECTIONS_MAX 4096U
4244
#define VARLINK_DEFAULT_CONNECTIONS_PER_UID_MAX 128U
@@ -956,6 +958,178 @@ static int generic_method_get_interface_description(
956958
SD_JSON_BUILD_PAIR_STRING("description", text));
957959
}
958960

961+
static int varlink_dispatch_sentinel(sd_varlink *v) {
962+
int r;
963+
964+
assert(v);
965+
assert(v->sentinel);
966+
967+
if (v->previous) {
968+
r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
969+
if (r >= 0) {
970+
v->previous = sd_json_variant_unref(v->previous);
971+
v->previous_fds = mfree(v->previous_fds);
972+
v->n_previous_fds = 0;
973+
/* Mirror sd_varlink_reply()'s post-enqueue state machine: PENDING_* means we're
974+
* outside the dispatch stack frame (e.g. called from varlink_fiber_entry after
975+
* the fiber returned), so we go straight to IDLE_SERVER ourselves. PROCESSING_*
976+
* means we're inside varlink_dispatch_method(), which will transition us. */
977+
if (IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE)) {
978+
varlink_clear_current(v);
979+
varlink_set_state(v, VARLINK_IDLE_SERVER);
980+
} else
981+
varlink_set_state(v, VARLINK_PROCESSED_METHOD);
982+
}
983+
984+
return r;
985+
}
986+
987+
char *sentinel = TAKE_PTR(v->sentinel);
988+
989+
/* Propagate the sentinel to the client if one was configured and no replies were enqueued by
990+
* the callback. */
991+
if (sentinel == POINTER_MAX)
992+
r = sd_varlink_reply(v, NULL);
993+
else {
994+
r = sd_varlink_error(v, sentinel, NULL);
995+
/* sd_varlink_error() deliberately returns a negative
996+
* errno mapped from the error id on success (so method
997+
* callbacks can `return sd_varlink_error(...);` to
998+
* enqueue a reply and propagate a matching errno in one
999+
* go). For sentinel dispatch we don't care about that
1000+
* mapping — the reply is either enqueued or not, which
1001+
* we detect via the state transition instead. */
1002+
if (IN_SET(v->state, VARLINK_PROCESSED_METHOD, VARLINK_IDLE_SERVER))
1003+
r = 0;
1004+
}
1005+
1006+
if (sentinel != POINTER_MAX)
1007+
free(sentinel);
1008+
1009+
return r;
1010+
}
1011+
1012+
typedef struct VarlinkFiberData {
1013+
sd_varlink *link;
1014+
sd_json_variant *parameters;
1015+
sd_varlink_method_flags_t flags;
1016+
void *userdata;
1017+
sd_varlink_method_t callback;
1018+
} VarlinkFiberData;
1019+
1020+
static VarlinkFiberData* varlink_fiber_data_free(VarlinkFiberData *d) {
1021+
if (!d)
1022+
return NULL;
1023+
1024+
sd_json_variant_unref(d->parameters);
1025+
sd_varlink_unref(d->link);
1026+
return mfree(d);
1027+
}
1028+
1029+
DEFINE_TRIVIAL_CLEANUP_FUNC(VarlinkFiberData*, varlink_fiber_data_free);
1030+
1031+
static void varlink_fiber_data_destroy(void *userdata) {
1032+
varlink_fiber_data_free(userdata);
1033+
}
1034+
1035+
static int varlink_fiber_entry(void *userdata) {
1036+
VarlinkFiberData *d = ASSERT_PTR(userdata);
1037+
sd_varlink *v = d->link;
1038+
int r;
1039+
1040+
r = d->callback(v, d->parameters, d->flags, d->userdata);
1041+
1042+
/* The fiber runs after varlink_dispatch_method() has already transitioned the state from
1043+
* VARLINK_PROCESSING_METHOD{,_MORE} to VARLINK_PENDING_METHOD{,_MORE}, so that's what we match
1044+
* here to decide whether the call still needs a reply. Any other state (e.g. IDLE_SERVER after
1045+
* the callback replied, or DISCONNECTED after sd_varlink_close()) means no fixup is needed. */
1046+
if (!IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1047+
return r;
1048+
1049+
if (r < 0) {
1050+
varlink_log_errno(v, r, "Fiber returned error: %m");
1051+
1052+
/* Propagate error to the client if the method call remains unanswered. */
1053+
r = sd_varlink_error_errno(v, r);
1054+
} else if (v->sentinel) {
1055+
r = varlink_dispatch_sentinel(v);
1056+
if (r < 0)
1057+
varlink_log_errno(v, r, "Failed to process sentinel: %m");
1058+
} else if (v->n_ref <= 2) {
1059+
/* Bare minimum refs (server + fiber data) means the connection wasn't stashed
1060+
* to reply later, so the fiber was supposed to reply itself but didn't. */
1061+
r = varlink_log_errno(v, SYNTHETIC_ERRNO(EPROTO),
1062+
"Fiber returned without enqueuing a reply or stashing connection, failing.");
1063+
goto fail;
1064+
} else
1065+
r = 0;
1066+
1067+
/* If we didn't manage to enqueue a response, then fail the connection completely. */
1068+
if (r < 0 && IN_SET(v->state, VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE))
1069+
goto fail;
1070+
1071+
return r;
1072+
1073+
fail:
1074+
varlink_set_state(v, VARLINK_PROCESSING_FAILURE);
1075+
varlink_dispatch_local_error(v, SD_VARLINK_ERROR_PROTOCOL);
1076+
sd_varlink_close(v);
1077+
1078+
return r;
1079+
}
1080+
1081+
static int varlink_dispatch_fiber(sd_varlink *v, const char *method, sd_varlink_method_t callback, sd_json_variant *parameters, sd_varlink_method_flags_t flags) {
1082+
int r;
1083+
1084+
assert(v);
1085+
assert(v->server);
1086+
assert(method);
1087+
assert(callback);
1088+
1089+
if (!v->server->event)
1090+
return varlink_log_errno(v, SYNTHETIC_ERRNO(EDEADLK),
1091+
"Cannot dispatch fiber method without event loop.");
1092+
1093+
_cleanup_(varlink_fiber_data_freep) VarlinkFiberData *d = new(VarlinkFiberData, 1);
1094+
if (!d)
1095+
return log_oom_debug();
1096+
1097+
*d = (VarlinkFiberData) {
1098+
.link = sd_varlink_ref(v),
1099+
.parameters = sd_json_variant_ref(parameters),
1100+
.flags = flags,
1101+
.userdata = v->userdata,
1102+
.callback = callback,
1103+
};
1104+
1105+
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
1106+
r = sd_fiber_new(v->server->event, method, varlink_fiber_entry, d, varlink_fiber_data_destroy, &f);
1107+
if (r < 0)
1108+
return r;
1109+
1110+
TAKE_PTR(d); /* The fiber owns the data now. */
1111+
1112+
/* Run the fiber at a higher priority than the connection's quit event source, so that on event
1113+
* loop exit the fiber's exit source (which cancels it and drives its cleanup) fires before
1114+
* varlink's quit_callback closes the connection. This lets a fiber handler reply with an error
1115+
* or flush its sentinel on a still-open connection during graceful shutdown. */
1116+
int64_t priority;
1117+
r = sd_event_source_get_priority(v->quit_event_source, &priority);
1118+
if (r < 0)
1119+
return r;
1120+
1121+
r = sd_future_set_priority(f, priority > INT64_MIN ? priority - 1 : priority);
1122+
if (r < 0)
1123+
return r;
1124+
1125+
/* Hand the future's lifetime over to the event loop: it'll auto-unref on resolve. */
1126+
r = sd_fiber_set_floating(f, true);
1127+
if (r < 0)
1128+
return r;
1129+
1130+
return 0;
1131+
}
1132+
9591133
static int varlink_dispatch_method(sd_varlink *v) {
9601134
_cleanup_(sd_json_variant_unrefp) sd_json_variant *parameters = NULL;
9611135
sd_varlink_method_flags_t flags = 0;
@@ -1053,7 +1227,13 @@ static int varlink_dispatch_method(sd_varlink *v) {
10531227
v->protocol_upgrade || FLAGS_SET(v->server->flags, SD_VARLINK_SERVER_UPGRADABLE));
10541228

10551229
/* First consult user supplied method implementations */
1230+
bool is_fiber = false;
10561231
callback = hashmap_get(v->server->methods, method);
1232+
if (!callback) {
1233+
callback = hashmap_get(v->server->fiber_methods, method);
1234+
if (callback)
1235+
is_fiber = true;
1236+
}
10571237
if (!callback) {
10581238
if (streq(method, "org.varlink.service.GetInfo"))
10591239
callback = generic_method_get_info;
@@ -1105,7 +1285,13 @@ static int varlink_dispatch_method(sd_varlink *v) {
11051285
}
11061286

11071287
if (!invalid) {
1108-
r = callback(v, parameters, flags, v->userdata);
1288+
if (is_fiber)
1289+
/* Spawn a fiber to run the callback. The VarlinkFiberData takes a ref on the
1290+
* connection (bumping n_ref above 2), so the post-callback logic below treats
1291+
* this as a deferred reply and moves state to PENDING_METHOD. */
1292+
r = varlink_dispatch_fiber(v, method, callback, parameters, flags);
1293+
else
1294+
r = callback(v, parameters, flags, v->userdata);
11091295
if (VARLINK_STATE_WANTS_REPLY(v->state)) {
11101296
if (r < 0) {
11111297
varlink_log_errno(v, r, "Callback for '%s' returned error: %m", method);
@@ -1114,37 +1300,7 @@ static int varlink_dispatch_method(sd_varlink *v) {
11141300
* if the method call remains unanswered. */
11151301
r = sd_varlink_error_errno(v, r);
11161302
} else if (v->sentinel) {
1117-
if (v->previous) {
1118-
r = json_stream_enqueue_full(&v->stream, v->previous, v->previous_fds, v->n_previous_fds);
1119-
if (r >= 0) {
1120-
v->previous = sd_json_variant_unref(v->previous);
1121-
v->previous_fds = mfree(v->previous_fds);
1122-
v->n_previous_fds = 0;
1123-
varlink_set_state(v, VARLINK_PROCESSED_METHOD);
1124-
}
1125-
} else {
1126-
char *sentinel = TAKE_PTR(v->sentinel);
1127-
1128-
/* Propagate the sentinel to the client if one was configured
1129-
* and no replies were enqueued by the callback. */
1130-
if (sentinel == POINTER_MAX)
1131-
r = sd_varlink_reply(v, NULL);
1132-
else {
1133-
r = sd_varlink_error(v, sentinel, NULL);
1134-
/* sd_varlink_error() deliberately returns a negative
1135-
* errno mapped from the error id on success (so method
1136-
* callbacks can `return sd_varlink_error(...);` to
1137-
* enqueue a reply and propagate a matching errno in one
1138-
* go). For sentinel dispatch we don't care about that
1139-
* mapping — the reply is either enqueued or not, which
1140-
* we detect via the state transition instead. */
1141-
if (v->state == VARLINK_PROCESSED_METHOD)
1142-
r = 0;
1143-
}
1144-
1145-
if (sentinel != POINTER_MAX)
1146-
free(sentinel);
1147-
}
1303+
r = varlink_dispatch_sentinel(v);
11481304
if (r < 0)
11491305
varlink_log_errno(v, r, "Failed to process sentinel for method '%s': %m", method);
11501306
} else {
@@ -2596,8 +2752,12 @@ _public_ int sd_varlink_set_sentinel(sd_varlink *v, const char *error_id) {
25962752
if (v->state == VARLINK_PROCESSING_METHOD_ONEWAY)
25972753
return 0;
25982754

2599-
/* This has to be called during a callback, and not after it has exited. */
2600-
assert_return(IN_SET(v->state, VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE),
2755+
/* This has to be called during a callback, and not after it has exited. The PENDING states
2756+
* apply to fiber callbacks, which run after varlink_dispatch_method() has already transitioned
2757+
* the state from PROCESSING to PENDING. */
2758+
assert_return(IN_SET(v->state,
2759+
VARLINK_PROCESSING_METHOD, VARLINK_PROCESSING_METHOD_MORE,
2760+
VARLINK_PENDING_METHOD, VARLINK_PENDING_METHOD_MORE),
26012761
-EUCLEAN);
26022762

26032763
char *s = NULL;
@@ -2899,7 +3059,11 @@ static sd_varlink_server* varlink_server_destroy(sd_varlink_server *s) {
28993059
while ((m = hashmap_steal_first_key(s->methods)))
29003060
free(m);
29013061

3062+
while ((m = hashmap_steal_first_key(s->fiber_methods)))
3063+
free(m);
3064+
29023065
hashmap_free(s->methods);
3066+
hashmap_free(s->fiber_methods);
29033067
hashmap_free(s->interfaces);
29043068
hashmap_free(s->symbols);
29053069
hashmap_free(s->by_uid);
@@ -3590,23 +3754,32 @@ static bool varlink_symbol_in_interface(const char *method, const char *interfac
35903754
return !strchr(p+1, '.');
35913755
}
35923756

3593-
_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
3757+
static int varlink_server_bind_internal(sd_varlink_server *s, Hashmap **methods, const char *method, sd_varlink_method_t callback) {
35943758
_cleanup_free_ char *m = NULL;
35953759
int r;
35963760

3597-
assert_return(s, -EINVAL);
3598-
assert_return(method, -EINVAL);
3599-
assert_return(callback, -EINVAL);
3761+
assert(s);
3762+
assert(methods);
3763+
assert(method);
3764+
assert(callback);
36003765

36013766
if (varlink_symbol_in_interface(method, "org.varlink.service") ||
36023767
varlink_symbol_in_interface(method, "io.systemd"))
36033768
return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST), "Cannot bind server to '%s'.", method);
36043769

3770+
/* Refuse to register the same method in both the regular and fiber method maps: the dispatcher
3771+
* always consults methods first and would silently ignore a shadowed fiber_methods entry (or vice
3772+
* versa), hiding the misconfiguration. */
3773+
Hashmap *other = methods == &s->methods ? s->fiber_methods : s->methods;
3774+
if (hashmap_contains(other, method))
3775+
return varlink_server_log_errno(s, SYNTHETIC_ERRNO(EEXIST),
3776+
"Method '%s' is already bound in the other method map.", method);
3777+
36053778
m = strdup(method);
36063779
if (!m)
36073780
return log_oom_debug();
36083781

3609-
r = hashmap_ensure_put(&s->methods, &string_hash_ops, m, callback);
3782+
r = hashmap_ensure_put(methods, &string_hash_ops, m, callback);
36103783
if (r == -ENOMEM)
36113784
return log_oom_debug();
36123785
if (r < 0)
@@ -3617,13 +3790,12 @@ _public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *met
36173790
return 0;
36183791
}
36193792

3620-
_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
3621-
va_list ap;
3793+
static int varlink_server_bind_many_internal(sd_varlink_server *s, Hashmap **methods, va_list ap) {
36223794
int r = 0;
36233795

3624-
assert_return(s, -EINVAL);
3796+
assert(s);
3797+
assert(methods);
36253798

3626-
va_start(ap, s);
36273799
for (;;) {
36283800
sd_varlink_method_t callback;
36293801
const char *method;
@@ -3634,10 +3806,51 @@ _public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, .
36343806

36353807
callback = va_arg(ap, sd_varlink_method_t);
36363808

3637-
r = sd_varlink_server_bind_method(s, method, callback);
3809+
r = varlink_server_bind_internal(s, methods, method, callback);
36383810
if (r < 0)
36393811
break;
36403812
}
3813+
3814+
return r;
3815+
}
3816+
3817+
_public_ int sd_varlink_server_bind_method(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
3818+
assert_return(s, -EINVAL);
3819+
assert_return(method, -EINVAL);
3820+
assert_return(callback, -EINVAL);
3821+
3822+
return varlink_server_bind_internal(s, &s->methods, method, callback);
3823+
}
3824+
3825+
_public_ int sd_varlink_server_bind_method_many_internal(sd_varlink_server *s, ...) {
3826+
va_list ap;
3827+
int r;
3828+
3829+
assert_return(s, -EINVAL);
3830+
3831+
va_start(ap, s);
3832+
r = varlink_server_bind_many_internal(s, &s->methods, ap);
3833+
va_end(ap);
3834+
3835+
return r;
3836+
}
3837+
3838+
int varlink_server_bind_fiber(sd_varlink_server *s, const char *method, sd_varlink_method_t callback) {
3839+
assert_return(s, -EINVAL);
3840+
assert_return(method, -EINVAL);
3841+
assert_return(callback, -EINVAL);
3842+
3843+
return varlink_server_bind_internal(s, &s->fiber_methods, method, callback);
3844+
}
3845+
3846+
int varlink_server_bind_fiber_many_internal(sd_varlink_server *s, ...) {
3847+
va_list ap;
3848+
int r;
3849+
3850+
assert_return(s, -EINVAL);
3851+
3852+
va_start(ap, s);
3853+
r = varlink_server_bind_many_internal(s, &s->fiber_methods, ap);
36413854
va_end(ap);
36423855

36433856
return r;

0 commit comments

Comments
 (0)