Skip to content

Commit e8a7fcf

Browse files
committed
Add runtime platform thread primitives
1 parent ee8930f commit e8a7fcf

5 files changed

Lines changed: 357 additions & 88 deletions

File tree

src/runtime/run_chan.c

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
#include "run_scheduler.h"
44

5-
#include <pthread.h>
65
#include <stdio.h>
76
#include <stdlib.h>
87
#include <string.h>
@@ -12,7 +11,7 @@
1211
* ======================================================================== */
1312

1413
struct run_chan {
15-
pthread_mutex_t lock;
14+
run_mutex_t lock;
1615

1716
size_t elem_size; /* size of each element in bytes */
1817
size_t buffer_cap; /* buffer capacity (0 = unbuffered) */
@@ -38,7 +37,7 @@ run_chan_t *run_chan_new(size_t elem_size, size_t buffer_cap) {
3837
abort();
3938
}
4039

41-
pthread_mutex_init(&ch->lock, NULL);
40+
run_mutex_init(&ch->lock);
4241
ch->elem_size = elem_size;
4342
ch->buffer_cap = buffer_cap;
4443
ch->buffer_len = 0;
@@ -63,7 +62,7 @@ run_chan_t *run_chan_new(size_t elem_size, size_t buffer_cap) {
6362
void run_chan_free(run_chan_t *ch) {
6463
if (!ch)
6564
return;
66-
pthread_mutex_destroy(&ch->lock);
65+
run_mutex_destroy(&ch->lock);
6766
free(ch->buffer);
6867
free(ch);
6968
}
@@ -79,10 +78,10 @@ void run_chan_free(run_chan_t *ch) {
7978
* ======================================================================== */
8079

8180
void run_chan_send(run_chan_t *ch, const void *data) {
82-
pthread_mutex_lock(&ch->lock);
81+
run_mutex_lock(&ch->lock);
8382

8483
if (ch->closed) {
85-
pthread_mutex_unlock(&ch->lock);
84+
run_mutex_unlock(&ch->lock);
8685
fprintf(stderr, "run: send on closed channel\n");
8786
abort();
8887
}
@@ -92,7 +91,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
9291
run_g_t *receiver = run_g_queue_pop(&ch->recv_q);
9392
/* Direct copy: data -> receiver's waiting slot */
9493
memcpy(receiver->chan_data_ptr, data, ch->elem_size);
95-
pthread_mutex_unlock(&ch->lock);
94+
run_mutex_unlock(&ch->lock);
9695
run_g_ready(receiver);
9796
return;
9897
}
@@ -103,7 +102,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
103102
memcpy(slot, data, ch->elem_size);
104103
ch->send_idx = (ch->send_idx + 1) % ch->buffer_cap;
105104
ch->buffer_len++;
106-
pthread_mutex_unlock(&ch->lock);
105+
run_mutex_unlock(&ch->lock);
107106
return;
108107
}
109108

@@ -112,7 +111,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
112111
if (!g) {
113112
/* Called from main thread before scheduler is running --
114113
* this would deadlock. */
115-
pthread_mutex_unlock(&ch->lock);
114+
run_mutex_unlock(&ch->lock);
116115
fprintf(stderr, "run: channel send would block on main thread\n");
117116
abort();
118117
}
@@ -121,7 +120,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
121120
g->chan_data_ptr = (void *)data; /* sender's data stays in place */
122121
g->chan_panic = false;
123122
run_g_queue_push(&ch->send_q, g);
124-
pthread_mutex_unlock(&ch->lock);
123+
run_mutex_unlock(&ch->lock);
125124

126125
run_schedule(); /* context switch to scheduler */
127126
/* Resumed here after a receiver copies our data */
@@ -138,7 +137,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
138137
* ======================================================================== */
139138

140139
void run_chan_recv(run_chan_t *ch, void *data) {
141-
pthread_mutex_lock(&ch->lock);
140+
run_mutex_lock(&ch->lock);
142141

143142
/* Fast path: waiting sender exists */
144143
if (ch->send_q.len > 0) {
@@ -158,7 +157,7 @@ void run_chan_recv(run_chan_t *ch, void *data) {
158157
/* Unbuffered: direct copy from sender */
159158
memcpy(data, sender->chan_data_ptr, ch->elem_size);
160159
}
161-
pthread_mutex_unlock(&ch->lock);
160+
run_mutex_unlock(&ch->lock);
162161
run_g_ready(sender);
163162
return;
164163
}
@@ -169,21 +168,21 @@ void run_chan_recv(run_chan_t *ch, void *data) {
169168
memcpy(data, slot, ch->elem_size);
170169
ch->recv_idx = (ch->recv_idx + 1) % ch->buffer_cap;
171170
ch->buffer_len--;
172-
pthread_mutex_unlock(&ch->lock);
171+
run_mutex_unlock(&ch->lock);
173172
return;
174173
}
175174

176175
/* Channel is closed and empty */
177176
if (ch->closed) {
178177
memset(data, 0, ch->elem_size); /* zero value */
179-
pthread_mutex_unlock(&ch->lock);
178+
run_mutex_unlock(&ch->lock);
180179
return;
181180
}
182181

183182
/* Must block: buffer empty (or unbuffered with no sender) */
184183
run_g_t *g = run_current_g();
185184
if (!g) {
186-
pthread_mutex_unlock(&ch->lock);
185+
run_mutex_unlock(&ch->lock);
187186
fprintf(stderr, "run: channel recv would block on main thread\n");
188187
abort();
189188
}
@@ -192,7 +191,7 @@ void run_chan_recv(run_chan_t *ch, void *data) {
192191
g->chan_data_ptr = data; /* receiver provides the destination */
193192
g->chan_panic = false;
194193
run_g_queue_push(&ch->recv_q, g);
195-
pthread_mutex_unlock(&ch->lock);
194+
run_mutex_unlock(&ch->lock);
196195

197196
run_schedule(); /* context switch to scheduler */
198197
/* Resumed here after a sender copies data to our slot */
@@ -203,10 +202,10 @@ void run_chan_recv(run_chan_t *ch, void *data) {
203202
* ======================================================================== */
204203

205204
void run_chan_close(run_chan_t *ch) {
206-
pthread_mutex_lock(&ch->lock);
205+
run_mutex_lock(&ch->lock);
207206

208207
if (ch->closed) {
209-
pthread_mutex_unlock(&ch->lock);
208+
run_mutex_unlock(&ch->lock);
210209
fprintf(stderr, "run: close of closed channel\n");
211210
abort();
212211
}
@@ -232,7 +231,7 @@ void run_chan_close(run_chan_t *ch) {
232231
run_g_queue_push(&wake_list, g);
233232
}
234233

235-
pthread_mutex_unlock(&ch->lock);
234+
run_mutex_unlock(&ch->lock);
236235

237236
/* Make all collected Gs runnable (outside the channel lock) */
238237
run_g_t *g;

src/runtime/run_platform.h

Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
#ifndef RUN_PLATFORM_H
2+
#define RUN_PLATFORM_H
3+
4+
#include <stdbool.h>
5+
#include <stdint.h>
6+
#include <stdlib.h>
7+
8+
#if defined(_WIN32)
9+
10+
#ifndef WIN32_LEAN_AND_MEAN
11+
#define WIN32_LEAN_AND_MEAN
12+
#endif
13+
#ifndef NOMINMAX
14+
#define NOMINMAX
15+
#endif
16+
#include <process.h>
17+
#include <windows.h>
18+
19+
#define RUN_THREAD_LOCAL __declspec(thread)
20+
21+
typedef HANDLE run_thread_t;
22+
typedef SRWLOCK run_mutex_t;
23+
typedef CONDITION_VARIABLE run_cond_t;
24+
typedef void (*run_timer_fn)(void *);
25+
26+
typedef struct {
27+
HANDLE handle;
28+
run_timer_fn fn;
29+
void *arg;
30+
} run_platform_timer_t;
31+
32+
#define RUN_MUTEX_INITIALIZER SRWLOCK_INIT
33+
34+
typedef void *(*run_thread_fn)(void *);
35+
36+
typedef struct {
37+
run_thread_fn fn;
38+
void *arg;
39+
} run_thread_start_t;
40+
41+
static unsigned __stdcall run_thread_trampoline(void *arg) {
42+
run_thread_start_t *start = (run_thread_start_t *)arg;
43+
run_thread_fn fn = start->fn;
44+
void *fn_arg = start->arg;
45+
free(start);
46+
(void)fn(fn_arg);
47+
return 0;
48+
}
49+
50+
static VOID CALLBACK run_timer_trampoline(PVOID arg, BOOLEAN fired) {
51+
(void)fired;
52+
run_platform_timer_t *timer = (run_platform_timer_t *)arg;
53+
timer->fn(timer->arg);
54+
}
55+
56+
static inline void run_mutex_init(run_mutex_t *m) {
57+
InitializeSRWLock(m);
58+
}
59+
60+
static inline void run_mutex_destroy(run_mutex_t *m) {
61+
(void)m;
62+
}
63+
64+
static inline void run_mutex_lock(run_mutex_t *m) {
65+
AcquireSRWLockExclusive(m);
66+
}
67+
68+
static inline void run_mutex_unlock(run_mutex_t *m) {
69+
ReleaseSRWLockExclusive(m);
70+
}
71+
72+
static inline void run_cond_init(run_cond_t *c) {
73+
InitializeConditionVariable(c);
74+
}
75+
76+
static inline void run_cond_destroy(run_cond_t *c) {
77+
(void)c;
78+
}
79+
80+
static inline void run_cond_wait(run_cond_t *c, run_mutex_t *m) {
81+
SleepConditionVariableSRW(c, m, INFINITE, 0);
82+
}
83+
84+
static inline void run_cond_signal(run_cond_t *c) {
85+
WakeConditionVariable(c);
86+
}
87+
88+
static inline int run_thread_create(run_thread_t *thread, run_thread_fn fn, void *arg) {
89+
run_thread_start_t *start = (run_thread_start_t *)malloc(sizeof(run_thread_start_t));
90+
if (start == NULL)
91+
return -1;
92+
start->fn = fn;
93+
start->arg = arg;
94+
95+
uintptr_t handle = _beginthreadex(NULL, 0, run_thread_trampoline, start, 0, NULL);
96+
if (handle == 0) {
97+
free(start);
98+
return -1;
99+
}
100+
*thread = (HANDLE)handle;
101+
return 0;
102+
}
103+
104+
static inline void run_thread_detach(run_thread_t thread) {
105+
CloseHandle(thread);
106+
}
107+
108+
static inline run_thread_t run_thread_self(void) {
109+
return GetCurrentThread();
110+
}
111+
112+
static inline uintptr_t run_thread_seed(void) {
113+
return (uintptr_t)GetCurrentThreadId();
114+
}
115+
116+
static inline long run_cpu_count(void) {
117+
SYSTEM_INFO info;
118+
GetSystemInfo(&info);
119+
return (long)info.dwNumberOfProcessors;
120+
}
121+
122+
static inline bool run_timer_start(run_platform_timer_t *timer, uint32_t interval_us,
123+
run_timer_fn fn, void *arg) {
124+
timer->handle = NULL;
125+
timer->fn = fn;
126+
timer->arg = arg;
127+
128+
DWORD interval_ms = (DWORD)((interval_us + 999u) / 1000u);
129+
if (interval_ms == 0)
130+
interval_ms = 1;
131+
132+
return CreateTimerQueueTimer(&timer->handle, NULL, run_timer_trampoline, timer, interval_ms,
133+
interval_ms, WT_EXECUTEDEFAULT) != 0;
134+
}
135+
136+
static inline void run_timer_stop(run_platform_timer_t *timer) {
137+
if (timer->handle != NULL) {
138+
DeleteTimerQueueTimer(NULL, timer->handle, INVALID_HANDLE_VALUE);
139+
timer->handle = NULL;
140+
}
141+
}
142+
143+
#else
144+
145+
#include <pthread.h>
146+
#include <unistd.h>
147+
148+
#define RUN_THREAD_LOCAL __thread
149+
150+
typedef pthread_t run_thread_t;
151+
typedef pthread_mutex_t run_mutex_t;
152+
typedef pthread_cond_t run_cond_t;
153+
typedef void (*run_timer_fn)(void *);
154+
155+
typedef struct {
156+
int unused;
157+
} run_platform_timer_t;
158+
159+
#define RUN_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER
160+
161+
typedef void *(*run_thread_fn)(void *);
162+
163+
static inline void run_mutex_init(run_mutex_t *m) {
164+
pthread_mutex_init(m, NULL);
165+
}
166+
167+
static inline void run_mutex_destroy(run_mutex_t *m) {
168+
pthread_mutex_destroy(m);
169+
}
170+
171+
static inline void run_mutex_lock(run_mutex_t *m) {
172+
pthread_mutex_lock(m);
173+
}
174+
175+
static inline void run_mutex_unlock(run_mutex_t *m) {
176+
pthread_mutex_unlock(m);
177+
}
178+
179+
static inline void run_cond_init(run_cond_t *c) {
180+
pthread_cond_init(c, NULL);
181+
}
182+
183+
static inline void run_cond_destroy(run_cond_t *c) {
184+
pthread_cond_destroy(c);
185+
}
186+
187+
static inline void run_cond_wait(run_cond_t *c, run_mutex_t *m) {
188+
pthread_cond_wait(c, m);
189+
}
190+
191+
static inline void run_cond_signal(run_cond_t *c) {
192+
pthread_cond_signal(c);
193+
}
194+
195+
static inline int run_thread_create(run_thread_t *thread, run_thread_fn fn, void *arg) {
196+
return pthread_create(thread, NULL, fn, arg);
197+
}
198+
199+
static inline void run_thread_detach(run_thread_t thread) {
200+
pthread_detach(thread);
201+
}
202+
203+
static inline run_thread_t run_thread_self(void) {
204+
return pthread_self();
205+
}
206+
207+
static inline uintptr_t run_thread_seed(void) {
208+
return (uintptr_t)pthread_self();
209+
}
210+
211+
static inline long run_cpu_count(void) {
212+
return sysconf(_SC_NPROCESSORS_ONLN);
213+
}
214+
215+
static inline bool run_timer_start(run_platform_timer_t *timer, uint32_t interval_us,
216+
run_timer_fn fn, void *arg) {
217+
(void)timer;
218+
(void)interval_us;
219+
(void)fn;
220+
(void)arg;
221+
return false;
222+
}
223+
224+
static inline void run_timer_stop(run_platform_timer_t *timer) {
225+
(void)timer;
226+
}
227+
228+
#endif
229+
230+
#endif

0 commit comments

Comments
 (0)