Skip to content

Commit 07d9f9f

Browse files
committed
prov/shm: modify queue for claim
Signed-off-by: Alexia Ingerson <[email protected]>
1 parent 3761e31 commit 07d9f9f

File tree

8 files changed

+167
-175
lines changed

8 files changed

+167
-175
lines changed

include/ofi_atomic_queue.h

Lines changed: 96 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -95,29 +95,40 @@ extern "C" {
9595
#define OFI_CACHE_LINE_SIZE (64)
9696

9797
typedef void (*ofi_aq_init_fn)(void *);
98+
enum {
99+
OFI_AQ_FREE = 0,
100+
OFI_AQ_READY,
101+
OFI_AQ_NOOP,
102+
};
103+
98104
/*
99105
* Base address of atomic queue must be cache line aligned to maximize atomic
100106
* value perforamnce benefits
101107
*/
102108
#define OFI_DECLARE_ATOMIC_Q(entrytype, name) \
103109
struct name ## _entry { \
104-
ofi_atomic64_t seq; \
105-
bool noop; \
110+
ofi_atomic64_t state; \
106111
entrytype buf; \
107112
} __attribute__((__aligned__(64))); \
108113
\
109114
struct name { \
110115
ofi_atomic64_t write_pos; \
111116
uint8_t pad0[OFI_CACHE_LINE_SIZE - \
112117
sizeof(ofi_atomic64_t)]; \
113-
ofi_atomic64_t read_pos; \
118+
int64_t read_pos; \
114119
ofi_aq_init_fn init_fn; \
115120
uint8_t pad1[OFI_CACHE_LINE_SIZE - \
116-
(sizeof(ofi_atomic64_t) + \
121+
(sizeof(int64_t) + \
117122
sizeof(ofi_aq_init_fn))]; \
123+
ofi_atomic64_t claim_avail; \
124+
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
125+
sizeof(ofi_atomic64_t)]; \
126+
ofi_atomic64_t discard_avail; \
127+
uint8_t pad3[OFI_CACHE_LINE_SIZE - \
128+
sizeof(ofi_atomic64_t)]; \
118129
int size; \
119130
int size_mask; \
120-
uint8_t pad2[OFI_CACHE_LINE_SIZE - \
131+
uint8_t pad4[OFI_CACHE_LINE_SIZE - \
121132
(sizeof(int) * 2)]; \
122133
struct name ## _entry entry[]; \
123134
} __attribute__((__aligned__(64))); \
@@ -132,12 +143,13 @@ static inline void name ## _init(struct name *aq, size_t size, \
132143
aq->size_mask = aq->size - 1; \
133144
aq->init_fn = init_fn; \
134145
ofi_atomic_initialize64(&aq->write_pos, 0); \
135-
ofi_atomic_initialize64(&aq->read_pos, 0); \
146+
aq->read_pos = 0; \
147+
ofi_atomic_initialize64(&aq->discard_avail, 0); \
148+
ofi_atomic_initialize64(&aq->claim_avail, size); \
136149
for (i = 0; i < size; i++) { \
137-
ofi_atomic_initialize64(&aq->entry[i].seq, i); \
138150
if (aq->init_fn) \
139151
aq->init_fn(&aq->entry[i].buf); \
140-
aq->entry[i].noop = false; \
152+
ofi_atomic_initialize64(&aq->entry[i].state, OFI_AQ_FREE);\
141153
} \
142154
} \
143155
\
@@ -158,96 +170,106 @@ static inline void name ## _free(struct name *aq) \
158170
{ \
159171
free(aq); \
160172
} \
161-
static inline int name ## _next(struct name *aq, \
162-
entrytype **buf, int64_t *pos) \
173+
static inline bool name ## _claim(struct name *aq) \
163174
{ \
164-
struct name ## _entry *ce; \
165-
int64_t diff, seq; \
166-
*pos = ofi_atomic_load_explicit64(&aq->write_pos, \
167-
memory_order_relaxed); \
168-
for (;;) { \
169-
ce = &aq->entry[*pos & aq->size_mask]; \
170-
seq = ofi_atomic_load_explicit64(&(ce->seq), \
171-
memory_order_acquire); \
172-
diff = seq - *pos; \
173-
if (diff == 0) { \
174-
if (ofi_atomic_compare_exchange_weak64( \
175-
&aq->write_pos, pos, \
176-
*pos + 1)) \
177-
break; \
178-
} else if (diff < 0) { \
179-
return -FI_ENOENT; \
180-
} else { \
181-
*pos = ofi_atomic_load_explicit64( \
182-
&aq->write_pos, \
183-
memory_order_relaxed); \
184-
} \
175+
int64_t avail, discard_avail; \
176+
avail = ofi_atomic_sub_explicit64(&aq->claim_avail, 1, \
177+
memory_order_relaxed);\
178+
if (avail > 0) \
179+
return true; \
180+
\
181+
discard_avail = ofi_atomic_load_explicit64( \
182+
&aq->discard_avail, \
183+
memory_order_acquire); \
184+
if (discard_avail) { \
185+
if (!ofi_atomic_compare_exchange_weak64( \
186+
&aq->discard_avail, \
187+
&discard_avail, 0)) \
188+
goto out; \
189+
ofi_atomic_add_explicit64(&aq->claim_avail, \
190+
discard_avail, \
191+
memory_order_relaxed);\
192+
} \
193+
out: \
194+
ofi_atomic_add_explicit64(&aq->claim_avail, 1, \
195+
memory_order_relaxed); \
196+
return false; \
197+
\
198+
} \
199+
static inline entrytype *name ## _assign(struct name *aq) \
200+
{ \
201+
int64_t pos; \
202+
while (1) { \
203+
pos = ofi_atomic_load_explicit64( \
204+
&aq->write_pos, \
205+
memory_order_acquire); \
206+
if (ofi_atomic_compare_exchange_weak64( \
207+
&aq->write_pos, &pos, \
208+
pos + 1)) \
209+
break; \
185210
} \
186-
*buf = &ce->buf; \
187-
return FI_SUCCESS; \
211+
return &aq->entry[pos & aq->size_mask].buf; \
212+
} \
213+
static inline entrytype *name ## _claim_assign(struct name *aq) \
214+
{ \
215+
if (name ## _claim(aq)) { \
216+
return name ## _assign(aq); \
217+
} \
218+
return NULL; \
188219
} \
189220
static inline void name ## _release(struct name *aq, \
190-
entrytype *buf, \
191-
int64_t pos) \
221+
entrytype *buf) \
192222
{ \
223+
int64_t state = OFI_AQ_FREE; \
193224
struct name ## _entry *ce; \
194225
ce = container_of(buf, struct name ## _entry, buf); \
195226
if (aq->init_fn) \
196227
aq->init_fn(&ce->buf); \
197-
ofi_atomic_store_explicit64(&ce->seq, \
198-
pos + aq->size, \
228+
ofi_atomic_store_explicit64(&ce->state, state, \
199229
memory_order_release); \
230+
aq->read_pos++; \
231+
} \
232+
static inline void name ## _discard(struct name *aq) \
233+
{ \
234+
ofi_atomic_add_explicit64(&aq->discard_avail, 1, \
235+
memory_order_relaxed); \
200236
} \
201-
static inline int name ## _head(struct name *aq, \
202-
entrytype **buf, int64_t *pos) \
237+
static inline void name ## _release_discard(struct name *aq, \
238+
entrytype *buf) \
239+
{ \
240+
name ## _release(aq, buf); \
241+
name ## _discard(aq); \
242+
} \
243+
static inline entrytype *name ## _head(struct name *aq) \
203244
{ \
204-
int64_t diff, seq; \
205245
struct name ## _entry *ce; \
246+
int64_t state; \
206247
again: \
207-
*pos = ofi_atomic_load_explicit64(&aq->read_pos, \
208-
memory_order_relaxed); \
209-
for (;;) { \
210-
ce = &aq->entry[*pos & aq->size_mask]; \
211-
seq = ofi_atomic_load_explicit64(&(ce->seq), \
212-
memory_order_acquire); \
213-
diff = seq - (*pos + 1); \
214-
if (diff == 0) { \
215-
if (ofi_atomic_compare_exchange_weak64( \
216-
&aq->read_pos, pos, \
217-
*pos + 1)) \
218-
break; \
219-
} else if (diff < 0) { \
220-
return -FI_ENOENT; \
221-
} else { \
222-
*pos = ofi_atomic_load_explicit64( \
223-
&aq->read_pos, \
224-
memory_order_relaxed); \
225-
} \
226-
} \
227-
*buf = &ce->buf; \
228-
if (ce->noop) { \
229-
ce->noop = false; \
230-
name ##_release(aq, *buf, *pos); \
248+
ce = &aq->entry[aq->read_pos & aq->size_mask]; \
249+
state = ofi_atomic_load_explicit64(&ce->state, \
250+
memory_order_acquire);\
251+
if (state == OFI_AQ_FREE) \
252+
return NULL; \
253+
if (state == OFI_AQ_NOOP) { \
254+
name ## _release_discard(aq, &ce->buf); \
231255
goto again; \
232256
} \
233-
return FI_SUCCESS; \
257+
return &ce->buf; \
234258
} \
235-
static inline void name ## _commit(entrytype *buf, \
236-
int64_t pos) \
259+
static inline void name ## _commit(entrytype *buf) \
237260
{ \
238261
struct name ## _entry *ce; \
262+
int64_t state = OFI_AQ_READY; \
239263
ce = container_of(buf, struct name ## _entry, buf); \
240-
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
241-
memory_order_release); \
264+
ofi_atomic_store_explicit64(&ce->state, state, \
265+
memory_order_release); \
242266
} \
243-
static inline void name ## _discard(entrytype *buf, \
244-
int64_t pos) \
267+
static inline void name ## _cancel(entrytype *buf) \
245268
{ \
246269
struct name ## _entry *ce; \
247270
ce = container_of(buf, struct name ## _entry, buf); \
248-
ce->noop = true; \
249-
ofi_atomic_store_explicit64(&ce->seq, pos + 1, \
250-
memory_order_release); \
271+
ofi_atomic_store_explicit64(&ce->state, OFI_AQ_NOOP, \
272+
memory_order_release); \
251273
} \
252274
void dummy ## name (void) /* work-around global ; scope */
253275

prov/shm/src/smr.h

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ struct smr_ep {
5050
struct ofi_bufpool *unexp_buf_pool;
5151
struct ofi_bufpool *pend_pool;
5252

53-
struct slist overflow_list;
5453
struct dlist_entry sar_list;
5554
struct dlist_entry async_cpy_list;
5655
struct dlist_entry unexp_cmd_list;
@@ -121,13 +120,11 @@ static inline void smr_return_cmd(struct smr_ep *ep, struct smr_cmd *cmd)
121120
{
122121
struct smr_region *peer_smr = smr_peer_region(ep, cmd->hdr.rx_id);
123122
uintptr_t peer_ptr;
124-
int64_t pos;
125123
struct smr_return_entry *queue_entry;
126-
int ret;
127124

128-
ret = smr_return_queue_next(smr_return_queue(peer_smr), &queue_entry,
129-
&pos);
130-
if (ret == -FI_ENOENT) {
125+
//return queue has built in claim
126+
queue_entry = smr_return_queue_assign(smr_return_queue(peer_smr));
127+
if (!queue_entry) {
131128
/* return queue runs in parallel to command stack
132129
* ie we will never run out of space
133130
*/
@@ -141,9 +138,11 @@ static inline void smr_return_cmd(struct smr_ep *ep, struct smr_cmd *cmd)
141138
peer_smr->total_size);
142139
queue_entry->ptr = peer_ptr;
143140

144-
smr_return_queue_commit(queue_entry, pos);
141+
smr_return_queue_commit(queue_entry);
145142
}
146143

144+
void smr_resend_cmd(struct smr_ep *ep, struct smr_cmd *cmd);
145+
147146
struct smr_env {
148147
int disable_cma;
149148
int use_dsa_sar;
@@ -339,8 +338,6 @@ static inline struct smr_freestack *smr_pend_sar_pool(
339338
void smr_free_sar_bufs(struct smr_ep *ep, struct smr_cmd *cmd,
340339
struct smr_pend_entry *pending);
341340

342-
void smr_try_send_cmd(struct smr_ep *ep, struct smr_cmd *cmd);
343-
344341
int smr_unexp_start(struct fi_peer_rx_entry *rx_entry);
345342

346343
void smr_progress_async(struct smr_ep *ep);

prov/shm/src/smr_atomic.c

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ static ssize_t smr_generic_atomic(
175175
struct iovec compare_iov[SMR_IOV_LIMIT];
176176
struct iovec result_iov[SMR_IOV_LIMIT];
177177
uint16_t smr_flags = 0;
178-
int64_t tx_id, rx_id, pos;
178+
int64_t tx_id, rx_id;
179179
int proto;
180180
ssize_t ret = -FI_EAGAIN;
181181
size_t total_len;
@@ -196,8 +196,8 @@ static ssize_t smr_generic_atomic(
196196
if (smr_peer_data(ep->region)[tx_id].sar_status)
197197
goto unlock;
198198

199-
ret = smr_cmd_queue_next(smr_cmd_queue(peer_smr), &ce, &pos);
200-
if (ret == -FI_ENOENT) {
199+
ce = smr_cmd_queue_claim_assign(smr_cmd_queue(peer_smr));
200+
if (!ce) {
201201
ret = -FI_EAGAIN;
202202
goto unlock;
203203
}
@@ -240,7 +240,7 @@ static ssize_t smr_generic_atomic(
240240
total_len, cmd);
241241
} else {
242242
if (smr_freestack_isempty(smr_cmd_stack(ep->region))) {
243-
smr_cmd_queue_discard(ce, pos);
243+
smr_cmd_queue_cancel(ce);
244244
ret = -FI_EAGAIN;
245245
goto unlock;
246246
}
@@ -258,7 +258,7 @@ static ssize_t smr_generic_atomic(
258258
compare_iov, compare_count,
259259
total_len, context, smr_flags, cmd);
260260
if (ret) {
261-
smr_cmd_queue_discard(ce, pos);
261+
smr_cmd_queue_cancel(ce);
262262
goto unlock;
263263
}
264264
}
@@ -272,7 +272,7 @@ static ssize_t smr_generic_atomic(
272272
}
273273

274274
smr_format_rma_ioc(cmd, rma_ioc, rma_count);
275-
smr_cmd_queue_commit(ce, pos);
275+
smr_cmd_queue_commit(ce);
276276
unlock:
277277
ofi_genlock_unlock(&ep->util_ep.lock);
278278
return ret;
@@ -350,7 +350,7 @@ static ssize_t smr_atomic_inject(
350350
struct smr_region *peer_smr;
351351
struct iovec iov;
352352
struct fi_rma_ioc rma_ioc;
353-
int64_t id, peer_id, pos;
353+
int64_t id, peer_id;
354354
ssize_t ret = -FI_EAGAIN;
355355
size_t total_len;
356356
int proto;
@@ -368,8 +368,9 @@ static ssize_t smr_atomic_inject(
368368
if (smr_peer_data(ep->region)[id].sar_status)
369369
goto unlock;
370370

371-
ret = smr_cmd_queue_next(smr_cmd_queue(peer_smr), &ce, &pos);
372-
if (ret == -FI_ENOENT) {
371+
372+
ce = smr_cmd_queue_claim_assign(smr_cmd_queue(peer_smr));
373+
if (!ce) {
373374
ret = -FI_EAGAIN;
374375
goto unlock;
375376
}
@@ -393,7 +394,7 @@ static ssize_t smr_atomic_inject(
393394
} else {
394395
proto = smr_proto_inject;
395396
if (smr_freestack_isempty(smr_cmd_stack(ep->region))) {
396-
smr_cmd_queue_discard(ce, pos);
397+
smr_cmd_queue_cancel(ce);
397398
ret = -FI_EAGAIN;
398399
goto unlock;
399400
}
@@ -407,13 +408,13 @@ static ssize_t smr_atomic_inject(
407408
&iov, 1, NULL, NULL, 0, NULL, NULL,
408409
0, total_len, NULL, 0, cmd);
409410
if (ret) {
410-
smr_cmd_queue_discard(ce, pos);
411+
smr_cmd_queue_cancel(ce);
411412
goto unlock;
412413
}
413414
}
414415

415416
smr_format_rma_ioc(cmd, &rma_ioc, 1);
416-
smr_cmd_queue_commit(ce, pos);
417+
smr_cmd_queue_commit(ce);
417418

418419
if (proto == smr_proto_inline)
419420
ofi_ep_peer_tx_cntr_inc(&ep->util_ep, ofi_op_atomic);

prov/shm/src/smr_dsa.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -645,7 +645,7 @@ static void dsa_complete_tx_work(struct smr_ep *ep, struct smr_pend_entry *pend)
645645
ofi_buf_free(pend);
646646
return;
647647
} else {
648-
smr_try_send_cmd(ep, pend->cmd);
648+
smr_resend_cmd(ep, pend->cmd);
649649
}
650650
}
651651

0 commit comments

Comments
 (0)