Skip to content

Commit 17a2128

Browse files
committed
prov/shm: new shm architecture
Replacement of shm protocols with new architecture. Significant changes: - Turn response queue into return queue for local commands. Inline commands are still receive side. All commands have an inline option but a common ptr to the command being used for remote commands. These commands have to be returned to the sender but the receive side can hold onto them as long as needed for the lifetime of the message - shm has self and peer caps for each p2p interface (right now just CMA and xpmem). The support for each of these interfaces is saved in separate fields which causes a lot of wasted memory and is confusing. This merges these into two fields (one for self and one for peer) which holds the information for all p2p interfaces and is accessed by the P2P type enums. CMA also needs a flag to know wether CMA support has been queried yet or not. - Move some shm fields around for alignment - Simplifies access to the map to remove need for container - There is a 1:1 relationship with the av and map so just reuse the util av lock for access to the map as well. This requires some reorganizing of the locking semantics - There is nothing in smr_fabric. Remove and just use the util_fabric directly - Just like on the send side, make the progress functions be an array of function pointers accessible by the command proto. This cleans up the parameters of the progress calls and streamlines the calls - Merge tx and pend entries for simple management of pending operations - Redefinition of cmd and header for simplicty and easier reading. Also removes and adds fields for new architecture - Refactor async ipc list and turn it into a generic async list to track asynchronous copies which can be used for any accelerator (GPU or DSA) that copies locally asynchronously. - Cleanup naming and organization for readibility. Shorten some names to help with line length and organization - Fix weird header dependency smr_util.c->smr.h->smr_util.h so that smr_util.c is only dependent on smr_util.h and is isolated to solely shm region and protocol definitions. This separates the shm utilities from being dependent on the provider leaving the door open for reuse of the shm utilities if needed Signed-off-by: Alexia Ingerson <[email protected]>
1 parent 2af907e commit 17a2128

20 files changed

+2533
-2763
lines changed

include/ofi_atomic_queue.h

Lines changed: 97 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -94,27 +94,38 @@ extern "C" {
9494

9595
#define OFI_CACHE_LINE_SIZE (64)
9696

97+
enum {
98+
OFI_AQ_FREE = 0,
99+
OFI_AQ_READY,
100+
OFI_AQ_NOOP,
101+
};
102+
97103
/*
98104
* Base address of atomic queue must be cache line aligned to maximize atomic
99105
* value perforamnce benefits
100106
*/
101107
#define OFI_DECLARE_ATOMIC_Q(entrytype, name) \
102108
struct name ## _entry { \
103-
ofi_atomic64_t seq; \
104-
bool noop; \
109+
ofi_atomic64_t state; \
105110
entrytype buf; \
106111
} __attribute__((__aligned__(64))); \
107112
\
108113
struct name { \
109114
ofi_atomic64_t write_pos; \
110115
uint8_t pad0[OFI_CACHE_LINE_SIZE - \
111116
sizeof(ofi_atomic64_t)]; \
112-
ofi_atomic64_t read_pos; \
117+
int64_t read_pos; \
113118
uint8_t pad1[OFI_CACHE_LINE_SIZE - \
119+
sizeof(int64_t)]; \
120+
ofi_atomic64_t claim_avail; \
121+
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
122+
sizeof(ofi_atomic64_t)]; \
123+
ofi_atomic64_t discard_avail; \
124+
uint8_t pad3[OFI_CACHE_LINE_SIZE - \
114125
sizeof(ofi_atomic64_t)]; \
115126
int size; \
116127
int size_mask; \
117-
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
128+
uint8_t pad4[OFI_CACHE_LINE_SIZE - \
118129
(sizeof(int) * 2)]; \
119130
struct name ## _entry entry[]; \
120131
} __attribute__((__aligned__(64))); \
@@ -127,11 +138,11 @@ static inline void name ## _init(struct name *aq, size_t size) \
127138
aq->size = size; \
128139
aq->size_mask = aq->size - 1; \
129140
ofi_atomic_initialize64(&aq->write_pos, 0); \
130-
ofi_atomic_initialize64(&aq->read_pos, 0); \
131-
for (i = 0; i < size; i++) { \
132-
ofi_atomic_initialize64(&aq->entry[i].seq, i); \
133-
aq->entry[i].noop = false; \
134-
} \
141+
aq->read_pos = 0; \
142+
ofi_atomic_initialize64(&aq->discard_avail, 0); \
143+
ofi_atomic_initialize64(&aq->claim_avail, size); \
144+
for (i = 0; i < size; i++) \
145+
ofi_atomic_initialize64(&aq->entry[i].state, OFI_AQ_FREE);\
135146
} \
136147
\
137148
static inline struct name * name ## _create(size_t size) \
@@ -150,94 +161,104 @@ static inline void name ## _free(struct name *aq) \
150161
{ \
151162
free(aq); \
152163
} \
153-
static inline int name ## _next(struct name *aq, \
154-
entrytype **buf, int64_t *pos) \
164+
static inline bool name ## _claim(struct name *aq) \
155165
{ \
156-
struct name ## _entry *ce; \
157-
int64_t diff, seq; \
158-
*pos = ofi_atomic_load_explicit64(&aq->write_pos, \
159-
memory_order_relaxed); \
160-
for (;;) { \
161-
ce = &aq->entry[*pos & aq->size_mask]; \
162-
seq = ofi_atomic_load_explicit64(&(ce->seq), \
163-
memory_order_acquire); \
164-
diff = seq - *pos; \
165-
if (diff == 0) { \
166-
if (ofi_atomic_compare_exchange_weak64( \
167-
&aq->write_pos, pos, \
168-
*pos + 1)) \
169-
break; \
170-
} else if (diff < 0) { \
171-
return -FI_ENOENT; \
172-
} else { \
173-
*pos = ofi_atomic_load_explicit64( \
174-
&aq->write_pos, \
175-
memory_order_relaxed); \
176-
} \
166+
int64_t avail, discard_avail; \
167+
avail = ofi_atomic_sub_explicit64(&aq->claim_avail, 1, \
168+
memory_order_relaxed);\
169+
if (avail > 0) \
170+
return true; \
171+
\
172+
discard_avail = ofi_atomic_load_explicit64( \
173+
&aq->discard_avail, \
174+
memory_order_acquire); \
175+
if (discard_avail) { \
176+
if (!ofi_atomic_compare_exchange_weak64( \
177+
&aq->discard_avail, \
178+
&discard_avail, 0)) \
179+
goto out; \
180+
ofi_atomic_add_explicit64(&aq->claim_avail, \
181+
discard_avail, \
182+
memory_order_relaxed);\
177183
} \
178-
*buf = &ce->buf; \
179-
return FI_SUCCESS; \
184+
out: \
185+
ofi_atomic_add_explicit64(&aq->claim_avail, 1, \
186+
memory_order_relaxed); \
187+
return false; \
188+
\
189+
} \
190+
static inline entrytype *name ## _assign(struct name *aq) \
191+
{ \
192+
int64_t pos; \
193+
while (1) { \
194+
pos = ofi_atomic_load_explicit64( \
195+
&aq->write_pos, \
196+
memory_order_acquire); \
197+
if (ofi_atomic_compare_exchange_weak64( \
198+
&aq->write_pos, &pos, \
199+
pos + 1)) \
200+
break; \
201+
} \
202+
return &aq->entry[pos & aq->size_mask].buf; \
203+
} \
204+
static inline entrytype *name ## _claim_assign(struct name *aq) \
205+
{ \
206+
if (name ## _claim(aq)) { \
207+
return name ## _assign(aq); \
208+
} \
209+
return NULL; \
180210
} \
181211
static inline void name ## _release(struct name *aq, \
182-
entrytype *buf, \
183-
int64_t pos) \
212+
entrytype *buf) \
184213
{ \
214+
int64_t state = OFI_AQ_FREE; \
185215
struct name ## _entry *ce; \
186216
ce = container_of(buf, struct name ## _entry, buf); \
187-
ofi_atomic_store_explicit64(&ce->seq, \
188-
pos + aq->size, \
189-
memory_order_release); \
217+
ofi_atomic_store_explicit64(&ce->state, state, \
218+
memory_order_release); \
219+
aq->read_pos++; \
190220
} \
191-
static inline int name ## _head(struct name *aq, \
192-
entrytype **buf, int64_t *pos) \
221+
static inline void name ## _discard(struct name *aq) \
222+
{ \
223+
ofi_atomic_add_explicit64(&aq->discard_avail, 1, \
224+
memory_order_relaxed); \
225+
} \
226+
static inline void name ## _release_discard(struct name *aq, \
227+
entrytype *buf) \
228+
{ \
229+
name ## _release(aq, buf); \
230+
name ## _discard(aq); \
231+
} \
232+
static inline entrytype *name ## _head(struct name *aq) \
193233
{ \
194-
int64_t diff, seq; \
195234
struct name ## _entry *ce; \
235+
int64_t state; \
196236
again: \
197-
*pos = ofi_atomic_load_explicit64(&aq->read_pos, \
198-
memory_order_relaxed); \
199-
for (;;) { \
200-
ce = &aq->entry[*pos & aq->size_mask]; \
201-
seq = ofi_atomic_load_explicit64(&(ce->seq), \
202-
memory_order_acquire); \
203-
diff = seq - (*pos + 1); \
204-
if (diff == 0) { \
205-
if (ofi_atomic_compare_exchange_weak64( \
206-
&aq->read_pos, pos, \
207-
*pos + 1)) \
208-
break; \
209-
} else if (diff < 0) { \
210-
return -FI_ENOENT; \
211-
} else { \
212-
*pos = ofi_atomic_load_explicit64( \
213-
&aq->read_pos, \
214-
memory_order_relaxed); \
215-
} \
216-
} \
217-
*buf = &ce->buf; \
218-
if (ce->noop) { \
219-
ce->noop = false; \
220-
name ##_release(aq, *buf, *pos); \
237+
ce = &aq->entry[aq->read_pos & aq->size_mask]; \
238+
state = ofi_atomic_load_explicit64(&ce->state, \
239+
memory_order_acquire);\
240+
if (state == OFI_AQ_FREE) \
241+
return NULL; \
242+
if (state == OFI_AQ_NOOP) { \
243+
name ## _release_discard(aq, &ce->buf); \
221244
goto again; \
222245
} \
223-
return FI_SUCCESS; \
246+
return &ce->buf; \
224247
} \
225-
static inline void name ## _commit(entrytype *buf, \
226-
int64_t pos) \
248+
static inline void name ## _commit(entrytype *buf) \
227249
{ \
228250
struct name ## _entry *ce; \
251+
int64_t state = OFI_AQ_READY; \
229252
ce = container_of(buf, struct name ## _entry, buf); \
230-
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
231-
memory_order_release); \
253+
ofi_atomic_store_explicit64(&ce->state, state, \
254+
memory_order_release); \
232255
} \
233-
static inline void name ## _discard(entrytype *buf, \
234-
int64_t pos) \
256+
static inline void name ## _cancel(entrytype *buf) \
235257
{ \
236258
struct name ## _entry *ce; \
237259
ce = container_of(buf, struct name ## _entry, buf); \
238-
ce->noop = true; \
239-
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
240-
memory_order_release); \
260+
ofi_atomic_store_explicit64(&ce->state, OFI_AQ_NOOP, \
261+
memory_order_release); \
241262
} \
242263
void dummy ## name (void) /* work-around global ; scope */
243264

0 commit comments

Comments
 (0)