@@ -32,22 +32,40 @@ __rte_ring_hts_update_tail(struct rte_ring_hts_headtail *ht, uint32_t old_tail,
3232 RTE_SET_USED (enqueue );
3333
3434 tail = old_tail + num ;
35+
36+ /*
37+ * R0: Release the tail update. Establishes a synchronization edge with
38+ * the load-acquire at A1. This release ensures that all updates to *ht
39+ * and the ring array made by this thread become visible to the opposing
40+ * thread once the tail value written here is observed.
41+ */
3542 rte_atomic_store_explicit (& ht -> ht .pos .tail , tail , rte_memory_order_release );
3643}
3744
3845/**
39- * @internal waits till tail will become equal to head.
40- * Means no writer/reader is active for that ring.
41- * Suppose to work as serialization point.
46+ * @internal
47+ * Waits until the tail becomes equal to the head.
48+ * This indicates that another thread has finished its transaction, and there
49+ * is a chance that we could be the next writer or reader in line.
50+ *
51+ * Returns ht.raw at this point. The value may be imprecise, since another
52+ * thread might change the state before we observe ht.raw, but that does not
53+ * matter. The function __rte_ring_hts_move_head() can detect and recall this
54+ * function when it reaches the linearization point (CAS).
4255 */
43- static __rte_always_inline void
56+ static __rte_always_inline union __rte_ring_hts_pos
4457__rte_ring_hts_head_wait (const struct rte_ring_hts_headtail * ht ,
45- union __rte_ring_hts_pos * p )
58+ rte_memory_order memorder )
4659{
47- while (p -> pos .head != p -> pos .tail ) {
60+ union __rte_ring_hts_pos p ;
61+ p .raw = rte_atomic_load_explicit (& ht -> ht .raw , memorder );
62+
63+ while (p .pos .head != p .pos .tail ) {
4864 rte_pause ();
49- p -> raw = rte_atomic_load_explicit (& ht -> ht .raw , rte_memory_order_acquire );
65+ p . raw = rte_atomic_load_explicit (& ht -> ht .raw , memorder );
5066 }
67+
68+ return p ;
5169}
5270
5371/**
@@ -80,11 +98,9 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
8098 enum rte_ring_queue_behavior behavior , uint32_t * old_head ,
8199 uint32_t * entries )
82100{
83- uint32_t n ;
101+ uint32_t n , stail ;
84102 union __rte_ring_hts_pos np , op ;
85103
86- op .raw = rte_atomic_load_explicit (& d -> ht .raw , rte_memory_order_acquire );
87-
88104 do {
89105 /* Reset n to the initial burst count */
90106 n = num ;
@@ -94,15 +110,28 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
94110 * make sure that we read prod head/tail *before*
95111 * reading cons tail.
96112 */
97- __rte_ring_hts_head_wait (d , & op );
113+ /*
114+ * A0: Synchronizes with the CAS at R1.
115+ * Establishes a happens-before relationship with a thread of the same
116+ * type that released the ht.raw, ensuring this thread observes all of
117+ * its memory effects needed to maintain a safe partial order.
118+ */
119+ op = __rte_ring_hts_head_wait (d , rte_memory_order_acquire );
120+
121+ /*
122+ * A1: Establish a synchronizes-with edge using a store-release at R0.
123+ * This ensures that all memory effects from the preceding opposing
124+ * thread are observed.
125+ */
126+ stail = rte_atomic_load_explicit (& s -> tail , rte_memory_order_acquire );
98127
99128 /*
100129 * The subtraction is done between two unsigned 32bits value
101130 * (the result is always modulo 32 bits even if we have
102131 * *old_head > cons_tail). So 'entries' is always between 0
103132 * and capacity (which is < size).
104133 */
105- * entries = capacity + s -> tail - op .pos .head ;
134+ * entries = capacity + stail - op .pos .head ;
106135
107136 /* check that we have enough room in ring */
108137 if (unlikely (n > * entries ))
@@ -116,14 +145,17 @@ __rte_ring_hts_move_head(struct rte_ring_hts_headtail *d,
116145 np .pos .head = op .pos .head + n ;
117146
118147 /*
119- * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
120- * - OOO reads of cons tail value
121- * - OOO copy of elems from the ring
148+ * R1: Establishes a synchronizes-with edge with the load-acquire
149+ * of ht.raw at A0. This makes sure that the store-release to the
150+ * tail by this thread, if it was of the opposite type, becomes
151+ * visible to another thread of the current type. That thread will
152+ * then observe the updates in the same order, keeping a safe
153+ * partial order.
122154 */
123155 } while (rte_atomic_compare_exchange_strong_explicit (& d -> ht .raw ,
124156 (uint64_t * )(uintptr_t )& op .raw , np .raw ,
125- rte_memory_order_acquire ,
126- rte_memory_order_acquire ) == 0 );
157+ rte_memory_order_release ,
158+ rte_memory_order_relaxed ) == 0 );
127159
128160 * old_head = op .pos .head ;
129161 return n ;
0 commit comments