Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions ext/semian/atomic_ops.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Lock-free atomic operations for shared memory using C11 stdatomic.h
*/
#ifndef SEMIAN_ATOMIC_OPS_H
#define SEMIAN_ATOMIC_OPS_H

#ifdef HAVE_STDATOMIC_H
#include <stdatomic.h>

static inline int
atomic_int_load(atomic_int *ptr)
{
return atomic_load(ptr);
}

static inline void
atomic_int_store(atomic_int *ptr, int val)
{
atomic_store(ptr, val);
}

static inline int
atomic_int_fetch_add(atomic_int *ptr, int val)
{
return atomic_fetch_add(ptr, val);
}

static inline int
atomic_int_exchange(atomic_int *ptr, int val)
{
return atomic_exchange(ptr, val);
}

static inline double
atomic_double_load(_Atomic double *ptr)
{
return atomic_load(ptr);
}

static inline void
atomic_double_store(_Atomic double *ptr, double val)
{
atomic_store(ptr, val);
}

static inline double
atomic_double_exchange(_Atomic double *ptr, double val)
{
return atomic_exchange(ptr, val);
}

#else
#error "stdatomic.h not available - C11 compiler required"
#endif

#endif // SEMIAN_ATOMIC_OPS_H
3 changes: 3 additions & 0 deletions ext/semian/extconf.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@

have_header "sys/ipc.h"
have_header "sys/sem.h"
have_header "sys/shm.h"
have_header "sys/types.h"

have_func "rb_thread_blocking_region"
have_func "rb_thread_call_without_gvl"

have_header "stdatomic.h"

$CFLAGS = "-D_GNU_SOURCE -Werror -Wall "
$CFLAGS += if ENV.key?("DEBUG")
"-O0 -g -DDEBUG"
Expand Down
208 changes: 208 additions & 0 deletions ext/semian/resource.c
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
#include "resource.h"
#include "sysv_shared_memory.h"
#include "atomic_ops.h"

// Ruby variables
ID id_wait_time;
Expand Down Expand Up @@ -366,6 +368,212 @@ ms_to_timespec(long ms, struct timespec *ts)
ts->tv_nsec = (ms % 1000) * 1000000;
}

VALUE
semian_resource_create_shared_memory(VALUE self, VALUE v_key, VALUE v_size)
{
key_t key;
size_t size;
int shm_id;
int created = 0;

Check_Type(v_key, T_FIXNUM);
Check_Type(v_size, T_FIXNUM);

key = (key_t)FIX2LONG(v_key);
size = (size_t)FIX2LONG(v_size);

shm_id = create_or_attach_shared_memory(key, size, &created);

if (shm_id == -1) {
raise_semian_syscall_error("shmget()", errno);
}

return rb_ary_new_from_args(2, INT2FIX(shm_id), created ? Qtrue : Qfalse);
}

VALUE
semian_resource_attach_shared_memory(VALUE self, VALUE v_shm_id)
{
int shm_id;
void *addr;

Check_Type(v_shm_id, T_FIXNUM);
shm_id = FIX2INT(v_shm_id);

addr = attach_shared_memory(shm_id);

if (addr == (void *)-1) {
raise_semian_syscall_error("shmat()", errno);
}

return ULL2NUM((unsigned long long)(uintptr_t)addr);
}

VALUE
semian_resource_detach_shared_memory(VALUE self, VALUE v_addr)
{
void *addr;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}

addr = (void *)(uintptr_t)NUM2ULL(v_addr);

if (detach_shared_memory(addr) == -1) {
raise_semian_syscall_error("shmdt()", errno);
}

return Qnil;
}

VALUE
semian_resource_destroy_shared_memory(VALUE self, VALUE v_shm_id)
{
int shm_id;

Check_Type(v_shm_id, T_FIXNUM);
shm_id = FIX2INT(v_shm_id);

if (destroy_shared_memory(shm_id) == -1) {
raise_semian_syscall_error("shmctl()", errno);
}

return Qtrue;
}

VALUE
semian_atomic_int_load(VALUE self, VALUE v_addr)
{
atomic_int *ptr;
int value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
ptr = (atomic_int *)(uintptr_t)NUM2ULL(v_addr);
value = atomic_int_load(ptr);

return INT2NUM(value);
}

VALUE
semian_atomic_int_store(VALUE self, VALUE v_addr, VALUE v_value)
{
atomic_int *ptr;
int value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
Check_Type(v_value, T_FIXNUM);

ptr = (atomic_int *)(uintptr_t)NUM2ULL(v_addr);
value = FIX2INT(v_value);

atomic_int_store(ptr, value);

return Qnil;
}

VALUE
semian_atomic_int_fetch_add(VALUE self, VALUE v_addr, VALUE v_value)
{
atomic_int *ptr;
int value;
int old_value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
Check_Type(v_value, T_FIXNUM);

ptr = (atomic_int *)(uintptr_t)NUM2ULL(v_addr);
value = FIX2INT(v_value);

old_value = atomic_int_fetch_add(ptr, value);

return INT2NUM(old_value);
}

VALUE
semian_atomic_int_exchange(VALUE self, VALUE v_addr, VALUE v_value)
{
atomic_int *ptr;
int value;
int old_value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
Check_Type(v_value, T_FIXNUM);

ptr = (atomic_int *)(uintptr_t)NUM2ULL(v_addr);
value = FIX2INT(v_value);

old_value = atomic_int_exchange(ptr, value);

return INT2NUM(old_value);
}

VALUE
semian_atomic_double_load(VALUE self, VALUE v_addr)
{
_Atomic double *ptr;
double value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
ptr = (_Atomic double *)(uintptr_t)NUM2ULL(v_addr);
value = atomic_double_load(ptr);

return DBL2NUM(value);
}

VALUE
semian_atomic_double_store(VALUE self, VALUE v_addr, VALUE v_value)
{
_Atomic double *ptr;
double value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
if (TYPE(v_value) != T_FLOAT && TYPE(v_value) != T_FIXNUM) {
rb_raise(rb_eTypeError, "value must be a number");
}

ptr = (_Atomic double *)(uintptr_t)NUM2ULL(v_addr);
value = NUM2DBL(v_value);

atomic_double_store(ptr, value);

return Qnil;
}

VALUE
semian_atomic_double_exchange(VALUE self, VALUE v_addr, VALUE v_value)
{
_Atomic double *ptr;
double value;
double old_value;

if (TYPE(v_addr) != T_FIXNUM && TYPE(v_addr) != T_BIGNUM) {
rb_raise(rb_eTypeError, "address must be an integer");
}
if (TYPE(v_value) != T_FLOAT && TYPE(v_value) != T_FIXNUM) {
rb_raise(rb_eTypeError, "value must be a number");
}

ptr = (_Atomic double *)(uintptr_t)NUM2ULL(v_addr);
value = NUM2DBL(v_value);

old_value = atomic_double_exchange(ptr, value);

return DBL2NUM(old_value);
}

static void
semian_resource_free(void *ptr)
{
Expand Down
35 changes: 35 additions & 0 deletions ext/semian/resource.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,39 @@ semian_resource_alloc(VALUE klass);
VALUE
semian_resource_in_use(VALUE self);

// Shared memory methods
VALUE
semian_resource_create_shared_memory(VALUE self, VALUE key, VALUE size);

VALUE
semian_resource_attach_shared_memory(VALUE self, VALUE shm_id);

VALUE
semian_resource_detach_shared_memory(VALUE self, VALUE addr);

VALUE
semian_resource_destroy_shared_memory(VALUE self, VALUE shm_id);

// Atomic operations methods
VALUE
semian_atomic_int_load(VALUE self, VALUE addr);

VALUE
semian_atomic_int_store(VALUE self, VALUE addr, VALUE value);

VALUE
semian_atomic_int_fetch_add(VALUE self, VALUE addr, VALUE value);

VALUE
semian_atomic_int_exchange(VALUE self, VALUE addr, VALUE value);

VALUE
semian_atomic_double_load(VALUE self, VALUE addr);

VALUE
semian_atomic_double_store(VALUE self, VALUE addr, VALUE value);

VALUE
semian_atomic_double_exchange(VALUE self, VALUE addr, VALUE value);

#endif //SEMIAN_RESOURCE_H
11 changes: 11 additions & 0 deletions ext/semian/semian.c
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,17 @@ void Init_semian()
rb_define_method(cResource, "reset_registered_workers!", semian_resource_reset_workers, 0);
rb_define_method(cResource, "unregister_worker", semian_resource_unregister_worker, 0);
rb_define_method(cResource, "in_use?", semian_resource_in_use, 0);
rb_define_method(cResource, "create_shared_memory", semian_resource_create_shared_memory, 2);
rb_define_method(cResource, "attach_shared_memory", semian_resource_attach_shared_memory, 1);
rb_define_method(cResource, "detach_shared_memory", semian_resource_detach_shared_memory, 1);
rb_define_method(cResource, "destroy_shared_memory", semian_resource_destroy_shared_memory, 1);
rb_define_method(cResource, "atomic_int_load", semian_atomic_int_load, 1);
rb_define_method(cResource, "atomic_int_store", semian_atomic_int_store, 2);
rb_define_method(cResource, "atomic_int_fetch_add", semian_atomic_int_fetch_add, 2);
rb_define_method(cResource, "atomic_int_exchange", semian_atomic_int_exchange, 2);
rb_define_method(cResource, "atomic_double_load", semian_atomic_double_load, 1);
rb_define_method(cResource, "atomic_double_store", semian_atomic_double_store, 2);
rb_define_method(cResource, "atomic_double_exchange", semian_atomic_double_exchange, 2);

id_wait_time = rb_intern("wait_time");
id_timeout = rb_intern("timeout");
Expand Down
Loading
Loading