@@ -31,6 +31,17 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
3131 * might preceded us, then don't update tail with new value.
3232 */
3333
34+ /*
35+ * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
36+ * The CAS at R0 in same typed thread establishes a happens-before
37+ * relationship with this load acquire. Ensures that this thread
38+ * observes the same or later values for h.raw/h.val.cnt
39+ * observed by the other thread when it updated ht->tail.raw.
40+ * If not, ht->tail.raw may get updated out of sync (e.g. getting
41+ * updated to the same value twice). A0.a makes sure this condition
42+ * holds when CAS succeeds and A0.b when it fails.
43+ */
44+ /* A0.a */
3445 ot .raw = rte_atomic_load_explicit (& ht -> tail .raw , rte_memory_order_acquire );
3546
3647 do {
@@ -40,7 +51,11 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
4051 nt .raw = ot .raw ;
4152 if (++ nt .val .cnt == h .val .cnt )
4253 nt .val .pos = h .val .pos ;
43-
54+ /*
55+ * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
56+ * of a different thread of the same type.
57+ */
58+ /* A0.b */
4459 } while (rte_atomic_compare_exchange_strong_explicit (& ht -> tail .raw ,
4560 (uint64_t * )(uintptr_t )& ot .raw , nt .raw ,
4661 rte_memory_order_release , rte_memory_order_acquire ) == 0 );
@@ -50,18 +65,21 @@ __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
5065 * @internal This function waits till head/tail distance wouldn't
5166 * exceed pre-defined max value.
5267 */
53- static __rte_always_inline void
68+ static __rte_always_inline union __rte_ring_rts_poscnt
5469__rte_ring_rts_head_wait (const struct rte_ring_rts_headtail * ht ,
55- union __rte_ring_rts_poscnt * h )
70+ rte_memory_order memorder )
5671{
57- uint32_t max ;
72+ union __rte_ring_rts_poscnt h ;
73+ uint32_t max = ht -> htd_max ;
5874
59- max = ht -> htd_max ;
75+ h . raw = rte_atomic_load_explicit ( & ht -> head . raw , memorder ) ;
6076
61- while (h -> val .pos - ht -> tail .val .pos > max ) {
77+ while (h . val .pos - ht -> tail .val .pos > max ) {
6278 rte_pause ();
63- h -> raw = rte_atomic_load_explicit (& ht -> head .raw , rte_memory_order_acquire );
79+ h . raw = rte_atomic_load_explicit (& ht -> head .raw , memorder );
6480 }
81+
82+ return h ;
6583}
6684
6785/**
@@ -94,12 +112,9 @@ __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
94112 enum rte_ring_queue_behavior behavior , uint32_t * old_head ,
95113 uint32_t * entries )
96114{
97- uint32_t n ;
115+ uint32_t n , stail ;
98116 union __rte_ring_rts_poscnt nh , oh ;
99117
100- oh .raw = rte_atomic_load_explicit (& d -> head .raw ,
101- rte_memory_order_acquire );
102-
103118 do {
104119 /* Reset n to the initial burst count */
105120 n = num ;
@@ -109,15 +124,28 @@ __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
109124 * make sure that we read prod head *before*
110125 * reading cons tail.
111126 */
112- __rte_ring_rts_head_wait (d , & oh );
127+ /*
128+ * A1 Synchronizes with the CAS at R1.
129+ * Establishes a happens-before relationship with a thread of the same
130+ * type that released the ht.raw, ensuring this thread observes all of
131+ * its memory effects needed to maintain a safe partial order.
132+ */
133+ oh = __rte_ring_rts_head_wait (d , rte_memory_order_acquire );
134+
135+ /*
136+ * A2: Establish a synchronizes-with edge using a store-release at R0.
137+ * This ensures that all memory effects from the preceding opposing
138+ * thread are observed.
139+ */
140+ stail = rte_atomic_load_explicit (& s -> tail , rte_memory_order_acquire );
113141
114142 /*
115143 * The subtraction is done between two unsigned 32bits value
116144 * (the result is always modulo 32 bits even if we have
117145 * *old_head > cons_tail). So 'entries' is always between 0
118146 * and capacity (which is < size).
119147 */
120- * entries = capacity + s -> tail - oh .val .pos ;
148+ * entries = capacity + stail - oh .val .pos ;
121149
122150 /* check that we have enough room in ring */
123151 if (unlikely (n > * entries ))
@@ -131,14 +159,17 @@ __rte_ring_rts_move_head(struct rte_ring_rts_headtail *d,
131159 nh .val .cnt = oh .val .cnt + 1 ;
132160
133161 /*
134- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
135- * - OOO reads of cons tail value
136- * - OOO copy of elems to the ring
162+ * R1: Establishes a synchronizes-with edge with the load-acquire
163+ * of ht.raw at A1. Ensures that the store-release to the tail by
164+ * this thread, if it was of the opposite type, becomes
165+ * visible to another thread of the current type. That thread will
166+ * then observe the updates in the same order, keeping a safe
167+ * partial order.
137168 */
138169 } while (rte_atomic_compare_exchange_strong_explicit (& d -> head .raw ,
139170 (uint64_t * )(uintptr_t )& oh .raw , nh .raw ,
140- rte_memory_order_acquire ,
141- rte_memory_order_acquire ) == 0 );
171+ rte_memory_order_release ,
172+ rte_memory_order_relaxed ) == 0 );
142173
143174 * old_head = oh .val .pos ;
144175 return n ;
0 commit comments