Skip to content

Commit 5ea9ddc

Browse files
cookpatearchigup
authored andcommitted
Enable closing GG IPC subscriptions
1 parent 9154953 commit 5ea9ddc

File tree

3 files changed

+123
-23
lines changed

3 files changed

+123
-23
lines changed

include/ggl/ipc/client.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ GglError ggipc_connect_with_token(GglBuffer socket_path, GglBuffer auth_token);
3636

3737
// IPC calls
3838

39+
typedef struct {
40+
uint32_t val;
41+
} DESIGNATED_INIT GglIpcSubscriptionHandle;
42+
3943
/// Publish a message to a local topic in JSON format
4044
GglError ggipc_publish_to_topic_json(GglBuffer topic, GglMap payload);
4145

@@ -125,4 +129,6 @@ GglError ggipc_update_state(GglComponentState state);
125129
/// Restart a component on the core device
126130
GglError ggipc_restart_component(GglBuffer component_name);
127131

132+
void ggipc_close_subscription(GglIpcSubscriptionHandle handle);
133+
128134
#endif

include/ggl/ipc/client_raw.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
#include <ggl/buffer.h>
99
#include <ggl/error.h>
10+
#include <ggl/ipc/client.h>
1011
#include <ggl/object.h>
1112

1213
typedef GglError GgIpcResultCallback(void *ctx, GglMap result);
@@ -35,7 +36,8 @@ GglError ggipc_subscribe(
3536
GgIpcErrorCallback *error_callback,
3637
void *response_ctx,
3738
GgIpcSubscribeCallback *sub_callback,
38-
void *sub_callback_ctx
39+
void *sub_callback_ctx,
40+
GglIpcSubscriptionHandle *out_sub_handle
3941
);
4042

4143
#endif

src/ipc/client.c

Lines changed: 114 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
#include <sys/types.h>
3434
#include <time.h>
3535
#include <stdbool.h>
36+
#include <stdint.h>
3637
#include <stdlib.h>
3738
#include <stdnoreturn.h>
3839

@@ -50,11 +51,42 @@ typedef struct {
5051
void *ctx;
5152
} StreamHandler;
5253

54+
static_assert(
55+
GGL_IPC_MAX_STREAMS <= UINT16_MAX, "Max stream count must fit in 16 bits."
56+
);
57+
58+
static uint16_t stream_state_generation[GGL_IPC_MAX_STREAMS] = { 0 };
5359
static int32_t stream_state_id[GGL_IPC_MAX_STREAMS] = { 0 };
5460
static StreamHandler stream_state_handler[GGL_IPC_MAX_STREAMS] = { 0 };
5561

5662
static pthread_mutex_t stream_state_mtx = PTHREAD_MUTEX_INITIALIZER;
5763

64+
static GglIpcSubscriptionHandle create_handle(uint16_t index) {
65+
return (GglIpcSubscriptionHandle) {
66+
.val = ((uint32_t) stream_state_generation[index] << 16U) | (index + 1U)
67+
};
68+
}
69+
70+
static GglError validate_handle(
71+
GglIpcSubscriptionHandle handle, uint16_t *out_index
72+
) {
73+
// Underflow ok; UINT16_MAX will fail bounds check
74+
uint16_t handle_index = (uint16_t) ((handle.val & UINT16_MAX) - 1U);
75+
uint16_t handle_generation = (uint16_t) (handle.val >> 16);
76+
77+
if (handle_index >= GGL_IPC_MAX_STREAMS) {
78+
return GGL_ERR_INVALID;
79+
}
80+
81+
if (handle_generation != stream_state_generation[handle_index]) {
82+
GGL_LOGE("Generation mismatch for %" PRIu32 ".", handle.val);
83+
return GGL_ERR_NOENTRY;
84+
}
85+
86+
*out_index = handle_index;
87+
return GGL_ERR_OK;
88+
}
89+
5890
static GglError init_ipc_recv_thread(void);
5991
ACCESS(none, 1)
6092
noreturn static void *recv_thread(void *args);
@@ -82,11 +114,11 @@ static GglError init_ipc_recv_thread(void) {
82114
}
83115

84116
// Requires holding stream_state_mtx
85-
static bool get_stream_index_from_id(int32_t stream_id, size_t *index) {
117+
static bool get_stream_index_from_id(int32_t stream_id, uint16_t *index) {
86118
if (stream_id == 0) {
87119
return false;
88120
}
89-
for (size_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
121+
for (uint16_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
90122
if (stream_state_id[i] == stream_id) {
91123
*index = i;
92124
return true;
@@ -96,9 +128,10 @@ static bool get_stream_index_from_id(int32_t stream_id, size_t *index) {
96128
}
97129

98130
// Requires holding stream_state_mtx
99-
static bool claim_stream_index(size_t *index) {
100-
for (size_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
131+
static bool claim_stream_index(uint16_t *index) {
132+
for (uint16_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
101133
if (stream_state_id[i] == 0) {
134+
stream_state_generation[i]++;
102135
stream_state_id[i] = -1;
103136
*index = i;
104137
return true;
@@ -109,15 +142,16 @@ static bool claim_stream_index(size_t *index) {
109142

110143
// Requires holding stream_state_mtx
111144
static void set_stream_index(
112-
size_t index, int32_t stream_id, StreamHandler handler
145+
uint16_t index, int32_t stream_id, StreamHandler handler
113146
) {
114147
stream_state_id[index] = stream_id;
115148
stream_state_handler[index] = handler;
116149
}
117150

118151
// Requires holding stream_state_mtx
119-
static void clear_stream_index(size_t index, int32_t stream_id) {
152+
static void clear_stream_index(uint16_t index, int32_t stream_id) {
120153
if (stream_state_id[index] == stream_id) {
154+
stream_state_generation[index]++;
121155
stream_state_id[index] = 0;
122156
stream_state_handler[index] = (StreamHandler) { 0 };
123157
}
@@ -396,7 +430,7 @@ typedef struct {
396430

397431
// Must hold stream_state_mtx
398432
static GglError response_handler(
399-
size_t index,
433+
uint16_t index,
400434
void *ctx,
401435
EventStreamCommonHeaders common_headers,
402436
EventStreamMessage msg
@@ -414,14 +448,25 @@ static GglError response_handler(
414448
if ((call_ctx->sub_callback == NULL) || (call_ctx->ret != GGL_ERR_OK)) {
415449
clear_stream_index(index, common_headers.stream_id);
416450
} else {
417-
set_stream_index(
418-
index,
419-
common_headers.stream_id,
420-
(StreamHandler) {
421-
.fn = (void (*)(void)) call_ctx->sub_callback,
422-
.ctx = call_ctx->sub_callback_ctx,
423-
}
424-
);
451+
if ((common_headers.message_flags & EVENTSTREAM_TERMINATE_STREAM)
452+
!= 0) {
453+
GGL_LOGE(
454+
"Terminate stream received on stream_id %" PRIi32
455+
" for initial subscription response.",
456+
common_headers.stream_id
457+
);
458+
clear_stream_index(index, common_headers.stream_id);
459+
call_ctx->ret = GGL_ERR_FAILURE;
460+
} else {
461+
set_stream_index(
462+
index,
463+
common_headers.stream_id,
464+
(StreamHandler) {
465+
.fn = (void (*)(void)) call_ctx->sub_callback,
466+
.ctx = call_ctx->sub_callback_ctx,
467+
}
468+
);
469+
}
425470
}
426471

427472
call_ctx->ready = true;
@@ -446,6 +491,7 @@ GglError ggipc_call(
446491
error_callback,
447492
response_ctx,
448493
NULL,
494+
NULL,
449495
NULL
450496
);
451497
}
@@ -458,7 +504,8 @@ GglError ggipc_subscribe(
458504
GgIpcErrorCallback *error_callback,
459505
void *response_ctx,
460506
GgIpcSubscribeCallback *sub_callback,
461-
void *sub_callback_ctx
507+
void *sub_callback_ctx,
508+
GglIpcSubscriptionHandle *out_sub_handle
462509
) {
463510
if (!connected()) {
464511
return GGL_ERR_NOCONN;
@@ -483,7 +530,7 @@ GglError ggipc_subscribe(
483530
.sub_callback_ctx = sub_callback_ctx,
484531
};
485532

486-
size_t stream_index;
533+
uint16_t stream_index;
487534
int32_t stream_id = -1;
488535

489536
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
@@ -544,7 +591,16 @@ GglError ggipc_subscribe(
544591
}
545592
}
546593

547-
return response_handler_ctx.ret;
594+
if (response_handler_ctx.ret != GGL_ERR_OK) {
595+
out_sub_handle->val = 0;
596+
return response_handler_ctx.ret;
597+
}
598+
599+
if (out_sub_handle != NULL) {
600+
*out_sub_handle = create_handle(stream_index);
601+
}
602+
603+
return GGL_ERR_OK;
548604
}
549605

550606
static GglError call_sub_callback(
@@ -624,7 +680,11 @@ static GglError call_sub_callback(
624680
);
625681
}
626682

627-
static GglError dispatch_incoming_packet(int conn) {
683+
static GglError dispatch_incoming_packet(
684+
int conn, uint16_t *out_index, int32_t *out_stream_id
685+
) {
686+
assert(out_stream_id != NULL);
687+
628688
EventStreamMessage msg;
629689
GglError ret = eventsteam_get_packet(
630690
ggl_socket_reader(&conn), &msg, GGL_BUF(ipc_recv_mem)
@@ -648,7 +708,7 @@ static GglError dispatch_incoming_packet(int conn) {
648708
return GGL_ERR_FAILURE;
649709
}
650710

651-
size_t index;
711+
uint16_t index;
652712

653713
{
654714
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
@@ -664,6 +724,9 @@ static GglError dispatch_incoming_packet(int conn) {
664724
return GGL_ERR_OK;
665725
}
666726

727+
*out_stream_id = stream_id;
728+
*out_index = index;
729+
667730
assert(stream_state_handler[index].fn != NULL);
668731

669732
if (stream_state_handler[index].fn
@@ -673,6 +736,13 @@ static GglError dispatch_incoming_packet(int conn) {
673736
index, stream_state_handler[index].ctx, common_headers, msg
674737
);
675738
}
739+
740+
if ((common_headers.message_flags & EVENTSTREAM_TERMINATE_STREAM)
741+
!= 0) {
742+
GGL_LOGD("Closing stream %" PRIi32 " for %d", stream_id, conn);
743+
clear_stream_index(index, stream_state_id[index]);
744+
return GGL_ERR_OK;
745+
}
676746
}
677747

678748
return call_sub_callback(
@@ -685,20 +755,42 @@ static GglError dispatch_incoming_packet(int conn) {
685755
// TODO: Terminate stream if flag set
686756
}
687757

758+
void ggipc_close_subscription(GglIpcSubscriptionHandle handle) {
759+
uint16_t index;
760+
761+
{
762+
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
763+
GglError ret = validate_handle(handle, &index);
764+
if (ret != GGL_ERR_OK) {
765+
return;
766+
}
767+
clear_stream_index(index, stream_state_id[index]);
768+
}
769+
}
770+
688771
ACCESS(none, 1)
689772
static GglError data_ready_callback(void *ctx, uint64_t data) {
690773
(void) ctx;
691774
(void) data;
692775

693-
GglError ret = dispatch_incoming_packet(ipc_conn_fd);
776+
uint16_t index = 0;
777+
int32_t stream_id = 0;
778+
779+
GglError ret = dispatch_incoming_packet(ipc_conn_fd, &index, &stream_id);
694780

695781
if (ret != GGL_ERR_OK) {
696782
GGL_LOGE(
697783
"Error receiving from GG-IPC connection on fd %d. Closing "
698784
"connection.",
699785
ipc_conn_fd
700786
);
701-
(void) ggl_close(ipc_conn_fd);
787+
788+
if (stream_id == 0) {
789+
(void) ggl_close(ipc_conn_fd);
790+
} else if (index != 0) {
791+
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
792+
clear_stream_index(index, stream_id);
793+
}
702794
}
703795

704796
return ret;

0 commit comments

Comments
 (0)