Skip to content

prov/shm: new shm architecture v2 #10907

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions include/ofi.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ ofi_poll_del(struct fid_poll *pollset, struct fid *event_fid, uint64_t flags)
_a > _b ? _a : _b; })
#endif

#define MIN3(a, b, c) MIN(MIN(a, b), c)

#define ofi_div_ceil(a, b) ((a + b - 1) / b)

static inline int ofi_val64_gt(uint64_t x, uint64_t y) {
Expand Down
16 changes: 16 additions & 0 deletions include/ofi_atom.h
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,28 @@ typedef atomic_long ofi_atomic_int64_t;
memory_order_acq_rel) + val; \
} \
static inline \
int##radix##_t ofi_atomic_add_explicit##radix(ofi_atomic##radix##_t *atomic, \
int##radix##_t val, \
int memmodel) \
{ \
ATOMIC_IS_INITIALIZED(atomic); \
return (int##radix##_t)atomic_fetch_add_explicit(&atomic->val, val, memmodel); \
} \
static inline \
int##radix##_t ofi_atomic_sub##radix(ofi_atomic##radix##_t *atomic, int##radix##_t val) \
{ \
ATOMIC_IS_INITIALIZED(atomic); \
return (int##radix##_t)atomic_fetch_sub_explicit(&atomic->val, val, \
memory_order_acq_rel) - val; \
} \
static inline \
int##radix##_t ofi_atomic_sub_explicit##radix(ofi_atomic##radix##_t *atomic, \
int##radix##_t val, \
int memmodel) \
{ \
ATOMIC_IS_INITIALIZED(atomic); \
return (int##radix##_t)atomic_fetch_sub_explicit(&atomic->val, val, memmodel); \
} \
/** \
* Compare and swap, strong version \
* \
Expand Down
193 changes: 114 additions & 79 deletions include/ofi_atomic_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,147 +94,182 @@ extern "C" {

#define OFI_CACHE_LINE_SIZE (64)

typedef void (*ofi_aq_init_fn)(void *);
enum {
OFI_AQ_FREE = 0,
OFI_AQ_READY,
OFI_AQ_NOOP,
};

/*
* Base address of atomic queue must be cache line aligned to maximize atomic
* value perforamnce benefits
*/
#define OFI_DECLARE_ATOMIC_Q(entrytype, name) \
struct name ## _entry { \
ofi_atomic64_t seq; \
bool noop; \
ofi_atomic64_t state; \
entrytype buf; \
} __attribute__((__aligned__(64))); \
\
struct name { \
ofi_atomic64_t write_pos; \
uint8_t pad0[OFI_CACHE_LINE_SIZE - \
sizeof(ofi_atomic64_t)]; \
ofi_atomic64_t read_pos; \
int64_t read_pos; \
ofi_aq_init_fn init_fn; \
uint8_t pad1[OFI_CACHE_LINE_SIZE - \
(sizeof(int64_t) + \
sizeof(ofi_aq_init_fn))]; \
ofi_atomic64_t claim_avail; \
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
sizeof(ofi_atomic64_t)]; \
ofi_atomic64_t discard_avail; \
uint8_t pad3[OFI_CACHE_LINE_SIZE - \
sizeof(ofi_atomic64_t)]; \
int size; \
int size_mask; \
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
uint8_t pad4[OFI_CACHE_LINE_SIZE - \
(sizeof(int) * 2)]; \
struct name ## _entry entry[]; \
} __attribute__((__aligned__(64))); \
\
static inline void name ## _init(struct name *aq, size_t size) \
static inline void name ## _init(struct name *aq, size_t size, \
ofi_aq_init_fn init_fn) \
{ \
size_t i; \
assert(size == roundup_power_of_two(size)); \
assert(!((uintptr_t) aq % OFI_CACHE_LINE_SIZE)); \
aq->size = size; \
aq->size_mask = aq->size - 1; \
aq->init_fn = init_fn; \
ofi_atomic_initialize64(&aq->write_pos, 0); \
ofi_atomic_initialize64(&aq->read_pos, 0); \
for (i = 0; i < size; i++) \
ofi_atomic_initialize64(&aq->entry[i].seq, i); \
aq->read_pos = 0; \
ofi_atomic_initialize64(&aq->discard_avail, 0); \
ofi_atomic_initialize64(&aq->claim_avail, size); \
for (i = 0; i < size; i++) { \
if (aq->init_fn) \
aq->init_fn(&aq->entry[i].buf); \
ofi_atomic_initialize64(&aq->entry[i].state, OFI_AQ_FREE);\
} \
} \
\
static inline struct name * name ## _create(size_t size) \
{ \
struct name *aq; \
aq = (struct name*) calloc(1, sizeof(*aq) + \
sizeof(struct name ## _entry) * \
(roundup_power_of_two(size))); \
aq = (struct name *) aligned_alloc( \
OFI_CACHE_LINE_SIZE, sizeof(*aq) + \
sizeof(struct name ## _entry) * \
(roundup_power_of_two(size))); \
if (aq) \
name ##_init(aq, roundup_power_of_two(size)); \
name ##_init(aq, roundup_power_of_two(size), \
NULL); \
return aq; \
} \
\
static inline void name ## _free(struct name *aq) \
{ \
free(aq); \
} \
static inline int name ## _next(struct name *aq, \
entrytype **buf, int64_t *pos) \
static inline bool name ## _claim(struct name *aq) \
{ \
struct name ## _entry *ce; \
int64_t diff, seq; \
*pos = ofi_atomic_load_explicit64(&aq->write_pos, \
memory_order_relaxed); \
for (;;) { \
ce = &aq->entry[*pos & aq->size_mask]; \
seq = ofi_atomic_load_explicit64(&(ce->seq), \
memory_order_acquire); \
diff = seq - *pos; \
if (diff == 0) { \
if (ofi_atomic_compare_exchange_weak64( \
&aq->write_pos, pos, \
*pos + 1)) \
break; \
} else if (diff < 0) { \
return -FI_ENOENT; \
} else { \
*pos = ofi_atomic_load_explicit64( \
&aq->write_pos, \
memory_order_relaxed); \
} \
int64_t avail, discard_avail; \
avail = ofi_atomic_sub_explicit64(&aq->claim_avail, 1, \
memory_order_relaxed);\
if (avail > 0) \
return true; \
\
discard_avail = ofi_atomic_load_explicit64( \
&aq->discard_avail, \
memory_order_acquire); \
if (discard_avail) { \
if (!ofi_atomic_compare_exchange_weak64( \
&aq->discard_avail, \
&discard_avail, 0)) \
goto out; \
ofi_atomic_add_explicit64(&aq->claim_avail, \
discard_avail, \
memory_order_relaxed);\
} \
*buf = &ce->buf; \
return FI_SUCCESS; \
out: \
ofi_atomic_add_explicit64(&aq->claim_avail, 1, \
memory_order_relaxed); \
return false; \
\
} \
static inline entrytype *name ## _assign(struct name *aq) \
{ \
int64_t pos; \
while (1) { \
pos = ofi_atomic_load_explicit64( \
&aq->write_pos, \
memory_order_acquire); \
if (ofi_atomic_compare_exchange_weak64( \
&aq->write_pos, &pos, \
pos + 1)) \
break; \
} \
return &aq->entry[pos & aq->size_mask].buf; \
} \
static inline entrytype *name ## _claim_assign(struct name *aq) \
{ \
if (name ## _claim(aq)) { \
return name ## _assign(aq); \
} \
return NULL; \
} \
static inline void name ## _release(struct name *aq, \
entrytype *buf, \
int64_t pos) \
entrytype *buf) \
{ \
int64_t state = OFI_AQ_FREE; \
struct name ## _entry *ce; \
ce = container_of(buf, struct name ## _entry, buf); \
ofi_atomic_store_explicit64(&ce->seq, \
pos + aq->size, \
memory_order_release); \
if (aq->init_fn) \
aq->init_fn(&ce->buf); \
ofi_atomic_store_explicit64(&ce->state, state, \
memory_order_release); \
aq->read_pos++; \
} \
static inline int name ## _head(struct name *aq, \
entrytype **buf, int64_t *pos) \
static inline void name ## _discard(struct name *aq) \
{ \
ofi_atomic_add_explicit64(&aq->discard_avail, 1, \
memory_order_relaxed); \
} \
static inline void name ## _release_discard(struct name *aq, \
entrytype *buf) \
{ \
name ## _release(aq, buf); \
name ## _discard(aq); \
} \
static inline entrytype *name ## _head(struct name *aq) \
{ \
int64_t diff, seq; \
struct name ## _entry *ce; \
int64_t state; \
again: \
*pos = ofi_atomic_load_explicit64(&aq->read_pos, \
memory_order_relaxed); \
for (;;) { \
ce = &aq->entry[*pos & aq->size_mask]; \
seq = ofi_atomic_load_explicit64(&(ce->seq), \
memory_order_acquire); \
diff = seq - (*pos + 1); \
if (diff == 0) { \
if (ofi_atomic_compare_exchange_weak64( \
&aq->read_pos, pos, \
*pos + 1)) \
break; \
} else if (diff < 0) { \
return -FI_ENOENT; \
} else { \
*pos = ofi_atomic_load_explicit64( \
&aq->read_pos, \
memory_order_relaxed); \
} \
} \
*buf = &ce->buf; \
if (ce->noop) { \
ce->noop = false; \
name ##_release(aq, *buf, *pos); \
ce = &aq->entry[aq->read_pos & aq->size_mask]; \
state = ofi_atomic_load_explicit64(&ce->state, \
memory_order_acquire);\
if (state == OFI_AQ_FREE) \
return NULL; \
if (state == OFI_AQ_NOOP) { \
name ## _release_discard(aq, &ce->buf); \
goto again; \
} \
return FI_SUCCESS; \
return &ce->buf; \
} \
static inline void name ## _commit(entrytype *buf, \
int64_t pos) \
static inline void name ## _commit(entrytype *buf) \
{ \
struct name ## _entry *ce; \
int64_t state = OFI_AQ_READY; \
ce = container_of(buf, struct name ## _entry, buf); \
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
memory_order_release); \
ofi_atomic_store_explicit64(&ce->state, state, \
memory_order_release); \
} \
static inline void name ## _discard(entrytype *buf, \
int64_t pos) \
static inline void name ## _cancel(entrytype *buf) \
{ \
struct name ## _entry *ce; \
ce = container_of(buf, struct name ## _entry, buf); \
ce->noop = true; \
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
memory_order_release); \
ofi_atomic_store_explicit64(&ce->state, OFI_AQ_NOOP, \
memory_order_release); \
} \
void dummy ## name (void) /* work-around global ; scope */

Expand Down
3 changes: 3 additions & 0 deletions include/ofi_hmem.h
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,9 @@ ssize_t ofi_copy_from_mr_iov(void *dest, size_t size, struct ofi_mr **mr,
ssize_t ofi_copy_to_mr_iov(struct ofi_mr **mr, const struct iovec *iov,
size_t iov_count, uint64_t iov_offset,
const void *src, size_t size);
ssize_t ofi_copy_mr_iov(struct ofi_mr **mr, const struct iovec *iov,
size_t iov_count, size_t offset, void *buf,
size_t size, int dir);

int ofi_hmem_get_handle(enum fi_hmem_iface iface, void *base_addr,
size_t size, void **handle);
Expand Down
14 changes: 14 additions & 0 deletions include/ofi_mem.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,14 @@ static inline void smr_freestack_push_by_offset(struct smr_freestack *fs,
fs->object_size);
}

/* Get entry index in fs */
static inline int16_t smr_freestack_get_index(struct smr_freestack *fs,
char *local_p)
{
uint64_t offset = ((char*) local_p - (char*) fs);
return (offset - fs->entry_base_offset) / fs->object_size;
}

/* Push by object */
static inline void smr_freestack_push(struct smr_freestack *fs, void *local_p)
{
Expand Down Expand Up @@ -318,6 +326,12 @@ static inline void* smr_freestack_pop(struct smr_freestack *fs)
{
return (void *) ( ((char*)fs) + smr_freestack_pop_by_offset(fs) );
}

static inline int16_t smr_freestack_avail(struct smr_freestack *fs)
{
return fs->free;
}

/*
* Buffer Pool
*/
Expand Down
2 changes: 1 addition & 1 deletion include/ofi_xpmem.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ typedef int64_t xpmem_segid_t;
#endif /* HAVE_XPMEM */

struct ofi_xpmem_client {
uint8_t cap;
bool avail;
xpmem_apid_t apid;
uintptr_t addr_max;
};
Expand Down
13 changes: 8 additions & 5 deletions man/fi_shm.7.md
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,6 @@ No support for counters.

The *shm* provider checks for the following environment variables:

*FI_SHM_SAR_THRESHOLD*
: Maximum message size to use segmentation protocol before switching
to mmap (only valid when CMA is not available). Default: SIZE_MAX
(18446744073709551615)

*FI_SHM_TX_SIZE*
: Maximum number of outstanding tx operations. Default 1024

Expand All @@ -175,6 +170,14 @@ The *shm* provider checks for the following environment variables:
chunks. This environment variable is provided to fine tune performance
on different systems. Default 262144

*FI_SHM_BUFFER_THRESHOLD*
: When to start requesting forced unexpected messaging buffering. When this
threshold is reached, the sender will notify the receiver to force buffering
of the entire message if it is unexpected. If the message is matched when
received, it has no effect. Requesting unexpected message buffering allows
shm to support unlimited unexpected messaging (memory permitting).
Default: 1

# SEE ALSO

[`fabric`(7)](fabric.7.html),
Expand Down
Loading