Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions src/runtime/run_chan.c
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

#include "run_scheduler.h"

#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
Expand All @@ -12,7 +11,7 @@
* ======================================================================== */

struct run_chan {
pthread_mutex_t lock;
run_mutex_t lock;

size_t elem_size; /* size of each element in bytes */
size_t buffer_cap; /* buffer capacity (0 = unbuffered) */
Expand All @@ -38,7 +37,7 @@ run_chan_t *run_chan_new(size_t elem_size, size_t buffer_cap) {
abort();
}

pthread_mutex_init(&ch->lock, NULL);
run_mutex_init(&ch->lock);
ch->elem_size = elem_size;
ch->buffer_cap = buffer_cap;
ch->buffer_len = 0;
Expand All @@ -63,7 +62,7 @@ run_chan_t *run_chan_new(size_t elem_size, size_t buffer_cap) {
void run_chan_free(run_chan_t *ch) {
if (!ch)
return;
pthread_mutex_destroy(&ch->lock);
run_mutex_destroy(&ch->lock);
free(ch->buffer);
free(ch);
}
Expand All @@ -79,10 +78,10 @@ void run_chan_free(run_chan_t *ch) {
* ======================================================================== */

void run_chan_send(run_chan_t *ch, const void *data) {
pthread_mutex_lock(&ch->lock);
run_mutex_lock(&ch->lock);

if (ch->closed) {
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
fprintf(stderr, "run: send on closed channel\n");
abort();
}
Expand All @@ -92,7 +91,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
run_g_t *receiver = run_g_queue_pop(&ch->recv_q);
/* Direct copy: data -> receiver's waiting slot */
memcpy(receiver->chan_data_ptr, data, ch->elem_size);
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
run_g_ready(receiver);
return;
}
Expand All @@ -103,7 +102,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
memcpy(slot, data, ch->elem_size);
ch->send_idx = (ch->send_idx + 1) % ch->buffer_cap;
ch->buffer_len++;
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
return;
}

Expand All @@ -112,7 +111,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
if (!g) {
/* Called from main thread before scheduler is running --
* this would deadlock. */
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
fprintf(stderr, "run: channel send would block on main thread\n");
abort();
}
Expand All @@ -121,7 +120,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
g->chan_data_ptr = (void *)data; /* sender's data stays in place */
g->chan_panic = false;
run_g_queue_push(&ch->send_q, g);
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);

run_schedule(); /* context switch to scheduler */
/* Resumed here after a receiver copies our data */
Expand All @@ -138,7 +137,7 @@ void run_chan_send(run_chan_t *ch, const void *data) {
* ======================================================================== */

void run_chan_recv(run_chan_t *ch, void *data) {
pthread_mutex_lock(&ch->lock);
run_mutex_lock(&ch->lock);

/* Fast path: waiting sender exists */
if (ch->send_q.len > 0) {
Expand All @@ -158,7 +157,7 @@ void run_chan_recv(run_chan_t *ch, void *data) {
/* Unbuffered: direct copy from sender */
memcpy(data, sender->chan_data_ptr, ch->elem_size);
}
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
run_g_ready(sender);
return;
}
Expand All @@ -169,21 +168,21 @@ void run_chan_recv(run_chan_t *ch, void *data) {
memcpy(data, slot, ch->elem_size);
ch->recv_idx = (ch->recv_idx + 1) % ch->buffer_cap;
ch->buffer_len--;
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
return;
}

/* Channel is closed and empty */
if (ch->closed) {
memset(data, 0, ch->elem_size); /* zero value */
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
return;
}

/* Must block: buffer empty (or unbuffered with no sender) */
run_g_t *g = run_current_g();
if (!g) {
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
fprintf(stderr, "run: channel recv would block on main thread\n");
abort();
}
Expand All @@ -192,7 +191,7 @@ void run_chan_recv(run_chan_t *ch, void *data) {
g->chan_data_ptr = data; /* receiver provides the destination */
g->chan_panic = false;
run_g_queue_push(&ch->recv_q, g);
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);

run_schedule(); /* context switch to scheduler */
/* Resumed here after a sender copies data to our slot */
Expand All @@ -203,10 +202,10 @@ void run_chan_recv(run_chan_t *ch, void *data) {
* ======================================================================== */

void run_chan_close(run_chan_t *ch) {
pthread_mutex_lock(&ch->lock);
run_mutex_lock(&ch->lock);

if (ch->closed) {
pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);
fprintf(stderr, "run: close of closed channel\n");
abort();
}
Expand All @@ -232,7 +231,7 @@ void run_chan_close(run_chan_t *ch) {
run_g_queue_push(&wake_list, g);
}

pthread_mutex_unlock(&ch->lock);
run_mutex_unlock(&ch->lock);

/* Make all collected Gs runnable (outside the channel lock) */
run_g_t *g;
Expand Down
230 changes: 230 additions & 0 deletions src/runtime/run_platform.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
#ifndef RUN_PLATFORM_H
#define RUN_PLATFORM_H

#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>

#if defined(_WIN32)

#ifndef WIN32_LEAN_AND_MEAN
#define WIN32_LEAN_AND_MEAN
#endif
#ifndef NOMINMAX
#define NOMINMAX
#endif
#include <process.h>
#include <windows.h>

#define RUN_THREAD_LOCAL __declspec(thread)

typedef HANDLE run_thread_t;
typedef SRWLOCK run_mutex_t;
typedef CONDITION_VARIABLE run_cond_t;
typedef void (*run_timer_fn)(void *);

typedef struct {
HANDLE handle;
run_timer_fn fn;
void *arg;
} run_platform_timer_t;

#define RUN_MUTEX_INITIALIZER SRWLOCK_INIT

typedef void *(*run_thread_fn)(void *);

typedef struct {
run_thread_fn fn;
void *arg;
} run_thread_start_t;

static unsigned __stdcall run_thread_trampoline(void *arg) {
run_thread_start_t *start = (run_thread_start_t *)arg;
run_thread_fn fn = start->fn;
void *fn_arg = start->arg;
free(start);
(void)fn(fn_arg);
return 0;
}

static VOID CALLBACK run_timer_trampoline(PVOID arg, BOOLEAN fired) {
(void)fired;
run_platform_timer_t *timer = (run_platform_timer_t *)arg;
timer->fn(timer->arg);
}

static inline void run_mutex_init(run_mutex_t *m) {
InitializeSRWLock(m);
}

static inline void run_mutex_destroy(run_mutex_t *m) {
(void)m;
}

static inline void run_mutex_lock(run_mutex_t *m) {
AcquireSRWLockExclusive(m);
}

static inline void run_mutex_unlock(run_mutex_t *m) {
ReleaseSRWLockExclusive(m);
}

static inline void run_cond_init(run_cond_t *c) {
InitializeConditionVariable(c);
}

static inline void run_cond_destroy(run_cond_t *c) {
(void)c;
}

static inline void run_cond_wait(run_cond_t *c, run_mutex_t *m) {
SleepConditionVariableSRW(c, m, INFINITE, 0);
}

static inline void run_cond_signal(run_cond_t *c) {
WakeConditionVariable(c);
}

static inline int run_thread_create(run_thread_t *thread, run_thread_fn fn, void *arg) {
run_thread_start_t *start = (run_thread_start_t *)malloc(sizeof(run_thread_start_t));
if (start == NULL)
return -1;
start->fn = fn;
start->arg = arg;

uintptr_t handle = _beginthreadex(NULL, 0, run_thread_trampoline, start, 0, NULL);
if (handle == 0) {
free(start);
return -1;
}
*thread = (HANDLE)handle;
return 0;
}

static inline void run_thread_detach(run_thread_t thread) {
CloseHandle(thread);
}

static inline run_thread_t run_thread_self(void) {
return GetCurrentThread();
}

static inline uintptr_t run_thread_seed(void) {
return (uintptr_t)GetCurrentThreadId();
}

static inline long run_cpu_count(void) {
SYSTEM_INFO info;
GetSystemInfo(&info);
return (long)info.dwNumberOfProcessors;
}

static inline bool run_timer_start(run_platform_timer_t *timer, uint32_t interval_us,
run_timer_fn fn, void *arg) {
timer->handle = NULL;
timer->fn = fn;
timer->arg = arg;

DWORD interval_ms = (DWORD)((interval_us + 999u) / 1000u);
if (interval_ms == 0)
interval_ms = 1;

return CreateTimerQueueTimer(&timer->handle, NULL, run_timer_trampoline, timer, interval_ms,
interval_ms, WT_EXECUTEDEFAULT) != 0;
}

static inline void run_timer_stop(run_platform_timer_t *timer) {
if (timer->handle != NULL) {
DeleteTimerQueueTimer(NULL, timer->handle, INVALID_HANDLE_VALUE);
timer->handle = NULL;
}
}

#else

#include <pthread.h>
#include <unistd.h>

#define RUN_THREAD_LOCAL __thread

typedef pthread_t run_thread_t;
typedef pthread_mutex_t run_mutex_t;
typedef pthread_cond_t run_cond_t;
typedef void (*run_timer_fn)(void *);

typedef struct {
int unused;
} run_platform_timer_t;

#define RUN_MUTEX_INITIALIZER PTHREAD_MUTEX_INITIALIZER

typedef void *(*run_thread_fn)(void *);

static inline void run_mutex_init(run_mutex_t *m) {
pthread_mutex_init(m, NULL);
}

static inline void run_mutex_destroy(run_mutex_t *m) {
pthread_mutex_destroy(m);
}

static inline void run_mutex_lock(run_mutex_t *m) {
pthread_mutex_lock(m);
}

static inline void run_mutex_unlock(run_mutex_t *m) {
pthread_mutex_unlock(m);
}

static inline void run_cond_init(run_cond_t *c) {
pthread_cond_init(c, NULL);
}

static inline void run_cond_destroy(run_cond_t *c) {
pthread_cond_destroy(c);
}

static inline void run_cond_wait(run_cond_t *c, run_mutex_t *m) {
pthread_cond_wait(c, m);
}

static inline void run_cond_signal(run_cond_t *c) {
pthread_cond_signal(c);
}

static inline int run_thread_create(run_thread_t *thread, run_thread_fn fn, void *arg) {
return pthread_create(thread, NULL, fn, arg);
}

static inline void run_thread_detach(run_thread_t thread) {
pthread_detach(thread);
}

static inline run_thread_t run_thread_self(void) {
return pthread_self();
}

static inline uintptr_t run_thread_seed(void) {
return (uintptr_t)pthread_self();
}

static inline long run_cpu_count(void) {
return sysconf(_SC_NPROCESSORS_ONLN);
}

static inline bool run_timer_start(run_platform_timer_t *timer, uint32_t interval_us,
run_timer_fn fn, void *arg) {
(void)timer;
(void)interval_us;
(void)fn;
(void)arg;
return false;
}

static inline void run_timer_stop(run_platform_timer_t *timer) {
(void)timer;
}

#endif

#endif
Loading
Loading