Skip to content

Commit f2f185b

Browse files
committed
WiP
1 parent d1b6133 commit f2f185b

File tree

3 files changed

+67
-55
lines changed

3 files changed

+67
-55
lines changed

include/ggl/ipc/client.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,13 @@ GglError ggipc_connect_with_token(GglBuffer socket_path, GglBuffer auth_token);
3636

3737
// IPC calls
3838

39+
/// Handle for referring to a subscripion created by an IPC call
3940
typedef struct {
4041
uint32_t val;
41-
} DESIGNATED_INIT GglIpcSubscriptionHandle;
42+
} GgIpcSubscriptionHandle;
4243

4344
/// Close a subscription returned by an IPC call
44-
void ggipc_close_subscription(GglIpcSubscriptionHandle handle);
45+
void ggipc_close_subscription(GgIpcSubscriptionHandle handle);
4546

4647
/// Publish a message to a local topic in JSON format
4748
GglError ggipc_publish_to_topic_json(GglBuffer topic, GglMap payload);

include/ggl/ipc/client_raw.h

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,14 @@ GglError ggipc_call(
2525
);
2626

2727
typedef GglError GgIpcSubscribeCallback(
28-
void *ctx, GglBuffer service_model_type, GglMap data
28+
void *ctx,
29+
GgIpcSubscriptionHandle handle,
30+
GglBuffer service_model_type,
31+
GglMap data
32+
);
33+
34+
typedef GglError GgIpcSubCloseCallback(
35+
void *ctx, GgIpcSubscriptionHandle handle
2936
);
3037

3138
GglError ggipc_subscribe(
@@ -35,9 +42,10 @@ GglError ggipc_subscribe(
3542
GgIpcResultCallback *result_callback,
3643
GgIpcErrorCallback *error_callback,
3744
void *response_ctx,
38-
GgIpcSubscribeCallback *sub_callback,
39-
void *sub_callback_ctx,
40-
GglIpcSubscriptionHandle *out_sub_handle
45+
GgIpcSubscribeCallback *sub_resp_callback,
46+
GgIpcSubCloseCallback *sub_close_callback,
47+
void *sub_ctx,
48+
GgIpcSubscriptionHandle *sub_handle
4149
);
4250

4351
#endif

src/ipc/client.c

Lines changed: 52 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -47,46 +47,23 @@ static uint8_t
4747
static int epoll_fd = -1;
4848

4949
typedef struct {
50-
void (*fn)(void);
50+
// If null, we're expecting initial response
51+
GgIpcSubscribeCallback *resp_callback;
52+
// If null, skip close callback
53+
GgIpcSubCloseCallback *close_callback;
5154
void *ctx;
52-
} StreamHandler;
55+
} SubscriptionHandlers;
5356

5457
static_assert(
5558
GGL_IPC_MAX_STREAMS <= UINT16_MAX, "Max stream count must fit in 16 bits."
5659
);
5760

58-
static uint16_t stream_state_generation[GGL_IPC_MAX_STREAMS] = { 0 };
5961
static int32_t stream_state_id[GGL_IPC_MAX_STREAMS] = { 0 };
60-
static StreamHandler stream_state_handler[GGL_IPC_MAX_STREAMS] = { 0 };
62+
static uint16_t stream_state_generation[GGL_IPC_MAX_STREAMS] = { 0 };
63+
static SubscriptionHandlers stream_state_handler[GGL_IPC_MAX_STREAMS] = { 0 };
6164

6265
static pthread_mutex_t stream_state_mtx = PTHREAD_MUTEX_INITIALIZER;
6366

64-
static GglIpcSubscriptionHandle create_handle(uint16_t index) {
65-
return (GglIpcSubscriptionHandle) {
66-
.val = ((uint32_t) stream_state_generation[index] << 16U) | (index + 1U)
67-
};
68-
}
69-
70-
static GglError validate_handle(
71-
GglIpcSubscriptionHandle handle, uint16_t *out_index
72-
) {
73-
// Underflow ok; UINT16_MAX will fail bounds check
74-
uint16_t handle_index = (uint16_t) ((handle.val & UINT16_MAX) - 1U);
75-
uint16_t handle_generation = (uint16_t) (handle.val >> 16);
76-
77-
if (handle_index >= GGL_IPC_MAX_STREAMS) {
78-
return GGL_ERR_INVALID;
79-
}
80-
81-
if (handle_generation != stream_state_generation[handle_index]) {
82-
GGL_LOGE("Generation mismatch for %" PRIu32 ".", handle.val);
83-
return GGL_ERR_NOENTRY;
84-
}
85-
86-
*out_index = handle_index;
87-
return GGL_ERR_OK;
88-
}
89-
9067
static GglError init_ipc_recv_thread(void);
9168
ACCESS(none, 1)
9269
noreturn static void *recv_thread(void *args);
@@ -114,8 +91,34 @@ static GglError init_ipc_recv_thread(void) {
11491
}
11592

11693
// Requires holding stream_state_mtx
117-
static bool get_stream_index_from_id(int32_t stream_id, uint16_t *index) {
118-
if (stream_id == 0) {
94+
static GglError validate_handle(
95+
GgIpcSubscriptionHandle handle, uint16_t *index, const char *location
96+
) {
97+
// Underflow ok; UINT16_MAX will fail bounds check
98+
uint16_t handle_index = (uint16_t) ((handle.val & UINT16_MAX) - 1U);
99+
uint16_t handle_generation = (uint16_t) (handle.val >> 16);
100+
101+
if (handle_index >= GGL_IPC_MAX_STREAMS) {
102+
GGL_LOGE("Invalid handle %u in %s.", handle.val, location);
103+
return GGL_ERR_INVALID;
104+
}
105+
106+
if (handle_generation != stream_state_generation[handle_index]) {
107+
GGL_LOGE(
108+
"Generation mismatch for handle %" PRIu32 " in %s.",
109+
handle.val,
110+
location
111+
);
112+
return GGL_ERR_NOENTRY;
113+
}
114+
115+
*index = handle_index;
116+
return GGL_ERR_OK;
117+
}
118+
119+
// Requires holding stream_state_mtx
120+
static bool get_state_index_from_stream_id(int32_t stream_id, uint16_t *index) {
121+
if (stream_id <= 0) {
119122
return false;
120123
}
121124
for (uint16_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
@@ -128,11 +131,11 @@ static bool get_stream_index_from_id(int32_t stream_id, uint16_t *index) {
128131
}
129132

130133
// Requires holding stream_state_mtx
131-
static bool claim_stream_index(uint16_t *index) {
134+
static bool claim_state_index(uint16_t *index) {
132135
for (uint16_t i = 0; i < GGL_IPC_MAX_STREAMS; i++) {
133136
if (stream_state_id[i] == 0) {
134-
stream_state_generation[i]++;
135137
stream_state_id[i] = -1;
138+
stream_state_generation[i] += 1;
136139
*index = i;
137140
return true;
138141
}
@@ -141,17 +144,17 @@ static bool claim_stream_index(uint16_t *index) {
141144
}
142145

143146
// Requires holding stream_state_mtx
144-
static void set_stream_index(
145-
uint16_t index, int32_t stream_id, StreamHandler handler
147+
static void set_state_index(
148+
uint16_t index, int32_t stream_id, SubscriptionHandlers handler
146149
) {
147150
stream_state_id[index] = stream_id;
148151
stream_state_handler[index] = handler;
149152
}
150153

151154
// Requires holding stream_state_mtx
152-
static void clear_stream_index(uint16_t index, int32_t stream_id) {
155+
static void clear_state_index(uint16_t index, int32_t stream_id) {
153156
if (stream_state_id[index] == stream_id) {
154-
stream_state_generation[index]++;
157+
stream_state_generation[index] += 1;
155158
stream_state_id[index] = 0;
156159
stream_state_handler[index] = (StreamHandler) { 0 };
157160
}
@@ -446,7 +449,7 @@ static GglError response_handler(
446449
);
447450

448451
if ((call_ctx->sub_callback == NULL) || (call_ctx->ret != GGL_ERR_OK)) {
449-
clear_stream_index(index, common_headers.stream_id);
452+
clear_state_index(index, common_headers.stream_id);
450453
} else {
451454
if ((common_headers.message_flags & EVENTSTREAM_TERMINATE_STREAM)
452455
!= 0) {
@@ -455,7 +458,7 @@ static GglError response_handler(
455458
" for initial subscription response.",
456459
common_headers.stream_id
457460
);
458-
clear_stream_index(index, common_headers.stream_id);
461+
clear_state_index(index, common_headers.stream_id);
459462
call_ctx->ret = GGL_ERR_FAILURE;
460463
} else {
461464
set_stream_index(
@@ -505,7 +508,7 @@ GglError ggipc_subscribe(
505508
void *response_ctx,
506509
GgIpcSubscribeCallback *sub_callback,
507510
void *sub_callback_ctx,
508-
GglIpcSubscriptionHandle *out_sub_handle
511+
GglIpcClientSubHandle *out_sub_handle
509512
) {
510513
if (!connected()) {
511514
return GGL_ERR_NOCONN;
@@ -535,7 +538,7 @@ GglError ggipc_subscribe(
535538

536539
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
537540

538-
bool index_available = claim_stream_index(&stream_index);
541+
bool index_available = claim_state_index(&stream_index);
539542
if (!index_available) {
540543
GGL_LOGE("GG-IPC request failed to get available stream slot.");
541544
return GGL_ERR_NOMEM;
@@ -572,7 +575,7 @@ GglError ggipc_subscribe(
572575

573576
if (ret != GGL_ERR_OK) {
574577
GGL_LOGE("Failed to send EventStream packet.");
575-
clear_stream_index(stream_index, stream_id);
578+
clear_state_index(stream_index, stream_id);
576579
return ret;
577580
}
578581

@@ -586,7 +589,7 @@ GglError ggipc_subscribe(
586589
if ((cond_ret != 0) && (cond_ret != EINTR)) {
587590
assert(cond_ret == ETIMEDOUT);
588591
GGL_LOGW("Timed out waiting for a response.");
589-
clear_stream_index(stream_index, stream_id);
592+
clear_state_index(stream_index, stream_id);
590593
return GGL_ERR_TIMEOUT;
591594
}
592595
}
@@ -713,7 +716,7 @@ static GglError dispatch_incoming_packet(
713716
{
714717
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
715718

716-
bool found = get_stream_index_from_id(stream_id, &index);
719+
bool found = get_state_index_from_stream_id(stream_id, &index);
717720

718721
if (!found) {
719722
GGL_LOGE(
@@ -740,7 +743,7 @@ static GglError dispatch_incoming_packet(
740743
if ((common_headers.message_flags & EVENTSTREAM_TERMINATE_STREAM)
741744
!= 0) {
742745
GGL_LOGD("Closing stream %" PRIi32 " for %d", stream_id, conn);
743-
clear_stream_index(index, stream_state_id[index]);
746+
clear_state_index(index, stream_state_id[index]);
744747
return GGL_ERR_OK;
745748
}
746749
}
@@ -755,7 +758,7 @@ static GglError dispatch_incoming_packet(
755758
// TODO: Terminate stream if flag set
756759
}
757760

758-
void ggipc_close_subscription(GglIpcSubscriptionHandle handle) {
761+
void ggipc_close_subscription(GglIpcClientSubHandle handle) {
759762
uint16_t index;
760763

761764
{
@@ -764,7 +767,7 @@ void ggipc_close_subscription(GglIpcSubscriptionHandle handle) {
764767
if (ret != GGL_ERR_OK) {
765768
return;
766769
}
767-
clear_stream_index(index, stream_state_id[index]);
770+
clear_state_index(index, stream_state_id[index]);
768771
}
769772
}
770773

@@ -789,7 +792,7 @@ static GglError data_ready_callback(void *ctx, uint64_t data) {
789792
(void) ggl_close(ipc_conn_fd);
790793
} else if (index != 0) {
791794
GGL_MTX_SCOPE_GUARD(&stream_state_mtx);
792-
clear_stream_index(index, stream_id);
795+
clear_state_index(index, stream_id);
793796
}
794797
}
795798

0 commit comments

Comments
 (0)