Skip to content

Commit 90f1420

Browse files
committed
qmp-client: add fiber-aware call paths
The synchronous qmp_client_call() pumps the event loop until its reply arrives, pinning the parsed reply on c->current so it can hand out borrowed pointers to the caller. That model only fits one in-flight sync call: a second qmp_client_call() on the same client clears c->current before issuing its own send, invalidating the first caller's borrowed pointers. On a single-threaded event loop that was fine, but with fibers two concurrent calls on the same client can interleave through the pump (json_stream_wait() suspends the running fiber) and trample each other. To fix this, make qmp_client_call() detect when it's running on a fiber whose event loop matches the client and transparently delegate to qmp_client_call_suspend(), which makes use of a new QmpFuture to allow multiple concurrent calls to qmp_client_call(). To make this work concurrently, we also change qmp_client_call() to hand out references and copies of errors so that we don't have to store the borrowed pointers we hand out in the QmpClient struct.
1 parent 924f6e8 commit 90f1420

2 files changed

Lines changed: 222 additions & 27 deletions

File tree

src/shared/qmp-client.c

Lines changed: 210 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
/* SPDX-License-Identifier: LGPL-2.1-or-later */
22

33
#include "sd-event.h"
4+
#include "sd-future.h"
45
#include "sd-json.h"
56

67
#include "alloc-util.h"
@@ -226,19 +227,23 @@ static int qmp_extract_response_id(sd_json_variant *v, uint64_t *ret) {
226227
return 1;
227228
}
228229

229-
/* Returns 0 on success (ret_result = "return" value), -EIO on QMP error (reterr_desc set). */
230-
static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, const char **reterr_desc) {
230+
/* Returns 0 on success (ret_result = freshly reffed "return" value), -EIO on QMP error
231+
* (ret_error_desc set to a freshly allocated string). Caller owns both outputs. */
232+
static int qmp_parse_response(sd_json_variant *v, sd_json_variant **ret_result, char **reterr_error_desc) {
231233
const char *desc;
232234

233235
desc = qmp_extract_error_description(v);
234236
if (desc) {
235-
if (reterr_desc)
236-
*reterr_desc = desc;
237+
if (reterr_error_desc) {
238+
*reterr_error_desc = strdup(desc);
239+
if (!*reterr_error_desc)
240+
return -ENOMEM;
241+
}
237242
return -EIO;
238243
}
239244

240245
if (ret_result)
241-
*ret_result = sd_json_variant_by_key(v, "return");
246+
*ret_result = sd_json_variant_ref(sd_json_variant_by_key(v, "return"));
242247
return 0;
243248
}
244249

@@ -273,8 +278,8 @@ static int qmp_client_build_command(
273278

274279
/* Route c->current to event callback or matching async slot. Returns 1 on dispatch. */
275280
static int qmp_client_dispatch(QmpClient *c) {
276-
sd_json_variant *result = NULL;
277-
const char *desc = NULL;
281+
_cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
282+
_cleanup_free_ char *desc = NULL;
278283
uint64_t id;
279284
int error, r;
280285

@@ -318,8 +323,8 @@ static int qmp_client_dispatch(QmpClient *c) {
318323
}
319324

320325
/* Synchronous slot (no callback): leave c->current pinned so qmp_client_call() can
321-
* pick up the reply and hand out borrowed pointers into it. The sync caller owns a
322-
* ref on the slot and detects completion by observing slot->client turning NULL. */
326+
* pick the reply up after its pump loop. The sync caller owns a ref on the slot and
327+
* detects completion by observing slot->client turning NULL. */
323328
if (!slot->callback) {
324329
qmp_slot_disconnect(slot, /* unref= */ true);
325330
return 1;
@@ -574,6 +579,10 @@ static void qmp_client_clear(QmpClient *c) {
574579
qmp_client_detach_event(c);
575580
qmp_client_clear_current(c);
576581
json_stream_done(&c->stream);
582+
/* qmp_client_handle_disconnect() above drained every entry via qmp_client_fail_pending();
583+
* the set is borrow-only for non-floating slots, so set_free() can't safely run a
584+
* destructor over leftovers — enforce the drain invariant instead. */
585+
assert(set_isempty(c->slots));
577586
c->slots = set_free(c->slots);
578587
}
579588

@@ -745,7 +754,7 @@ DEFINE_TRIVIAL_CLEANUP_FUNC(QmpClientArgs*, qmp_client_args_close_fds);
745754

746755
/* Shared send path for qmp_client_invoke() and qmp_client_call(). A NULL callback registers
747756
* a "synchronous" slot: dispatch_reply leaves c->current pinned on match instead of invoking
748-
* a callback, so qmp_client_call() can hand out borrowed pointers into the reply. If ret_slot
757+
* a callback, so qmp_client_call() can pick the reply up after its pump loop. If ret_slot
749758
* is NULL the slot is allocated as floating (owned by c->slots); otherwise a reference is
750759
* handed back to the caller. */
751760
static int qmp_client_send(
@@ -810,21 +819,193 @@ int qmp_client_invoke(
810819
return qmp_client_send(c, command, args, callback, userdata, ret_slot);
811820
}
812821

822+
typedef struct QmpFuture {
823+
QmpSlot *slot; /* owned, non-floating; NULL once disconnected */
824+
sd_json_variant *result;
825+
char *error_desc;
826+
} QmpFuture;
827+
828+
static void* qmp_future_alloc(void) {
829+
return new0(QmpFuture, 1);
830+
}
831+
832+
static void qmp_future_free(sd_future *f) {
833+
QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
834+
qmp_slot_unref(qf->slot);
835+
sd_json_variant_unref(qf->result);
836+
free(qf->error_desc);
837+
free(qf);
838+
}
839+
840+
static int qmp_future_cancel(sd_future *f) {
841+
QmpFuture *qf = ASSERT_PTR(sd_future_get_private(ASSERT_PTR(f)));
842+
843+
/* Drop the pending slot so dispatch_reply won't try to fire our callback (and touch
844+
* freed memory) when the reply eventually arrives. */
845+
qf->slot = qmp_slot_unref(qf->slot);
846+
return sd_future_resolve(f, -ECANCELED);
847+
}
848+
849+
static const sd_future_ops qmp_call_future_ops = {
850+
.size = sizeof(sd_future_ops),
851+
.alloc = qmp_future_alloc,
852+
.free = qmp_future_free,
853+
.cancel = qmp_future_cancel,
854+
};
855+
856+
static int qmp_future_callback(
857+
QmpClient *c,
858+
sd_json_variant *result,
859+
const char *desc,
860+
int error,
861+
void *userdata) {
862+
863+
sd_future *f = ASSERT_PTR(userdata);
864+
QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
865+
int r;
866+
867+
assert(result || desc || error);
868+
869+
if (result)
870+
qf->result = sd_json_variant_ref(result);
871+
if (desc) {
872+
qf->error_desc = strdup(desc);
873+
if (!qf->error_desc)
874+
/* No usable reply payload to surface — propagate as transport-style
875+
* failure so suspend() / sd_future_result() see the OOM. */
876+
return sd_future_resolve(f, -ENOMEM);
877+
}
878+
879+
/* Resolve with 0 on a success reply and -EIO on a QMP-level error (matching the synchronous
880+
* path's errno for the desc-without-ret_error_desc case), so a caller awaiting the future
881+
* learns about call failures from the resolution value alone. The reply payload itself
882+
* (result or error_desc) is always stashed on the QmpFuture so future_get_qmp_reply() can
883+
* hand the description string back on top of the bare -EIO. With no reply at all (transport
884+
* failure, disconnect), resolve with the propagated transport errno; cancellation surfaces
885+
* as -ECANCELED via qmp_future_cancel(). */
886+
if (result)
887+
r = 0;
888+
else if (desc)
889+
r = -EIO;
890+
else
891+
r = error;
892+
893+
return sd_future_resolve(f, r);
894+
}
895+
896+
int qmp_client_call_future(
897+
QmpClient *c,
898+
const char *command,
899+
QmpClientArgs *args,
900+
sd_future **ret) {
901+
902+
int r;
903+
904+
assert(c);
905+
assert(command);
906+
assert(ret);
907+
908+
_cleanup_(sd_future_unrefp) sd_future *f = NULL;
909+
r = sd_future_new(&qmp_call_future_ops, &f);
910+
if (r < 0)
911+
return r;
912+
913+
QmpFuture *qf = sd_future_get_private(f);
914+
915+
r = qmp_client_send(c, command, args, qmp_future_callback, f, &qf->slot);
916+
if (r < 0)
917+
return r;
918+
919+
*ret = TAKE_PTR(f);
920+
return 0;
921+
}
922+
923+
/* Extract the reply from a resolved qmp_client_call_future(). Returns 1 on success (with
924+
* *ret_result a fresh reference the caller unrefs), -EIO on a QMP-level error (with the detail
925+
* description copied into *ret_error_desc when the caller passed one to receive it), and the
926+
* future's negative resume errno when no reply landed at all (transport failure / cancellation).
927+
*/
928+
int future_get_qmp_reply(sd_future *f, sd_json_variant **ret_result, char **reterr_error_desc) {
929+
assert(f);
930+
assert(sd_future_get_ops(f) == &qmp_call_future_ops);
931+
assert(sd_future_state(f) == SD_FUTURE_RESOLVED);
932+
933+
QmpFuture *qf = ASSERT_PTR(sd_future_get_private(f));
934+
935+
/* No reply at all: transport failure or cancellation — surface the future result. */
936+
if (!qf->result && !qf->error_desc)
937+
return sd_future_result(f);
938+
939+
if (qf->error_desc) {
940+
if (reterr_error_desc) {
941+
char *desc = strdup(qf->error_desc);
942+
if (!desc)
943+
return -ENOMEM;
944+
*reterr_error_desc = desc;
945+
}
946+
return -EIO;
947+
}
948+
949+
if (reterr_error_desc)
950+
*reterr_error_desc = NULL;
951+
if (ret_result)
952+
*ret_result = sd_json_variant_ref(qf->result);
953+
954+
return 1;
955+
}
956+
957+
static int qmp_client_call_suspend(
958+
QmpClient *c,
959+
const char *command,
960+
QmpClientArgs *args,
961+
sd_json_variant **ret_result,
962+
char **ret_error_desc) {
963+
964+
int r;
965+
966+
assert(c);
967+
assert(command);
968+
assert(sd_fiber_is_running());
969+
970+
_cleanup_(sd_future_cancel_wait_unrefp) sd_future *call = NULL;
971+
r = qmp_client_call_future(c, command, args, &call);
972+
if (r < 0)
973+
return r;
974+
975+
r = sd_fiber_suspend();
976+
977+
/* If the future isn't resolved, the suspend was interrupted before a reply arrived (fiber
978+
* cancelled, fiber-wide SD_FIBER_TIMEOUT scope expired, …). There's no reply to extract,
979+
* so surface the resume error directly. When the future is resolved, future_get_qmp_reply()
980+
* already encodes success (1), QMP-level error (-EIO with the desc captured if asked for),
981+
* and no-reply (negative future result) — pass it through. */
982+
if (sd_future_state(call) != SD_FUTURE_RESOLVED)
983+
return r;
984+
985+
return future_get_qmp_reply(call, ret_result, ret_error_desc);
986+
}
987+
813988
int qmp_client_call(
814989
QmpClient *c,
815990
const char *command,
816991
QmpClientArgs *args,
817992
sd_json_variant **ret_result,
818-
const char **ret_error_desc) {
993+
char **reterr_error_desc) {
819994

820-
_cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
995+
_cleanup_(sd_json_variant_unrefp) sd_json_variant *result = NULL;
996+
_cleanup_free_ char *desc = NULL;
821997
int r;
822998

823999
assert_return(c, -EINVAL);
8241000
assert_return(command, -EINVAL);
8251001

826-
/* Drop any reply pinned by a previous qmp_client_call() before we pin a new one. */
827-
qmp_client_clear_current(c);
1002+
/* If we're on a fiber sharing the QMP client's event loop, use the async + suspend path so
1003+
* multiple concurrent qmp_client_call() invocations across fibers don't deadlock each other
1004+
* on the process+wait pump. */
1005+
if (sd_fiber_is_running() && qmp_client_get_event(c) == sd_fiber_get_event())
1006+
return qmp_client_call_suspend(c, command, args, ret_result, reterr_error_desc);
1007+
1008+
_cleanup_(qmp_slot_unrefp) QmpSlot *slot = NULL;
8281009

8291010
/* NULL callback marks this as a synchronous slot: dispatch_reply matches on id like
8301011
* any other slot (so stray unknown-id replies still get logged and dropped), but
@@ -855,18 +1036,24 @@ int qmp_client_call(
8551036
return r;
8561037
}
8571038

858-
sd_json_variant *result = NULL;
859-
const char *desc = NULL;
860-
int error = qmp_parse_response(c->current, &result, &desc);
1039+
_cleanup_(sd_json_variant_unrefp) sd_json_variant *current = TAKE_PTR(c->current);
1040+
r = qmp_parse_response(current, &result, &desc);
1041+
if (r < 0 && r != -EIO)
1042+
return r;
8611043

862-
/* If caller doesn't ask for the error string, surface the error as the return code. */
863-
if (!ret_error_desc && error < 0)
864-
return error;
1044+
/* QMP-level error: copy the description into *ret_error_desc when the caller asked for it,
1045+
* and surface the failure via the return value (matching qmp_client_call_suspend() /
1046+
* sd_bus_call()'s "negative on error" convention). */
1047+
if (desc) {
1048+
if (reterr_error_desc)
1049+
*reterr_error_desc = TAKE_PTR(desc);
1050+
return -EIO;
1051+
}
8651052

8661053
if (ret_result)
867-
*ret_result = result;
868-
if (ret_error_desc)
869-
*ret_error_desc = desc;
1054+
*ret_result = TAKE_PTR(result);
1055+
if (reterr_error_desc)
1056+
*reterr_error_desc = NULL;
8701057

8711058
return 1;
8721059
}

src/shared/qmp-client.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,23 @@ int qmp_client_invoke(
6868
qmp_command_callback_t callback,
6969
void *userdata);
7070

71-
/* Synchronous send + receive. Pumps the event loop until the reply arrives. *ret_result and
72-
* *ret_error_desc are borrowed pointers into the last reply, valid until the next
73-
* qmp_client_call(). Same contract as sd_varlink_call(). */
7471
int qmp_client_call(
7572
QmpClient *client,
7673
const char *command,
7774
QmpClientArgs *args,
7875
sd_json_variant **ret_result,
79-
const char **ret_error_desc);
76+
char **reterr_error_desc);
77+
78+
int qmp_client_call_future(
79+
QmpClient *client,
80+
const char *command,
81+
QmpClientArgs *args,
82+
sd_future **ret);
83+
84+
int future_get_qmp_reply(
85+
sd_future *f,
86+
sd_json_variant **ret_result,
87+
char **reterr_error_desc);
8088

8189
void qmp_client_bind_event(QmpClient *c, qmp_event_callback_t callback, void *userdata);
8290
void qmp_client_bind_disconnect(QmpClient *c, qmp_disconnect_callback_t callback, void *userdata);

0 commit comments

Comments
 (0)