DPDK 22.11.11-rc1
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2010-2020 Intel Corporation
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5 * All rights reserved.
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
8 */
9
10#ifndef _RTE_RING_RTS_ELEM_PVT_H_
11#define _RTE_RING_RTS_ELEM_PVT_H_
12
24static __rte_always_inline void
25__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26{
27 union __rte_ring_rts_poscnt h, ot, nt;
28
29 /*
30 * If there are other enqueues/dequeues in progress that
31 * might preceded us, then don't update tail with new value.
32 */
33
34 /*
35 * A0 = {A0.a, A0.b}: Synchronizes with the CAS at R0.
36 * The CAS at R0 in same typed thread establishes a happens-before
37 * relationship with this load acquire. Ensures that this thread
38 * observes the same or later values for h.raw/h.val.cnt
39 * observed by the other thread when it updated ht->tail.raw.
40 * If not, ht->tail.raw may get updated out of sync (e.g. getting
41 * updated to the same value twice). A0.a makes sure this condition
42 * holds when CAS succeeds and A0.b when it fails.
43 */
44 /* A0.a */
45 ot.raw = __atomic_load_n(&ht->tail.raw, __ATOMIC_ACQUIRE);
46
47 do {
48 /* on 32-bit systems we have to do atomic read here */
49 h.raw = __atomic_load_n(&ht->head.raw, __ATOMIC_RELAXED);
50
51 nt.raw = ot.raw;
52 if (++nt.val.cnt == h.val.cnt)
53 nt.val.pos = h.val.pos;
54 /*
55 * R0: Synchronizes with A2 of a different thread of the opposite type and A0.b
56 * of a different thread of the same type.
57 */
58 /* A0.b */
59 } while (__atomic_compare_exchange_n(&ht->tail.raw, &ot.raw, nt.raw,
60 0, __ATOMIC_RELEASE, __ATOMIC_ACQUIRE) == 0);
61}
62
67static __rte_always_inline union __rte_ring_rts_poscnt
68__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
69 int memorder)
70{
71 union __rte_ring_rts_poscnt h;
72 uint32_t max = ht->htd_max;
73
74
75 h.raw = __atomic_load_n(&ht->head.raw, memorder);
76
77 while (h.val.pos - ht->tail.val.pos > max) {
78 rte_pause();
79 h.raw = __atomic_load_n(&ht->head.raw, memorder);
80 }
81
82 return h;
83}
84
88static __rte_always_inline uint32_t
89__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
90 enum rte_ring_queue_behavior behavior, uint32_t *old_head,
91 uint32_t *free_entries)
92{
93 uint32_t n, cons_tail;
94 union __rte_ring_rts_poscnt nh, oh;
95
96 const uint32_t capacity = r->capacity;
97
98 do {
99 /* Reset n to the initial burst count */
100 n = num;
101
102 /*
103 * wait for prod head/tail distance,
104 * make sure that we read prod head *before*
105 * reading cons tail.
106 */
107 /*
108 * A1 Synchronizes with the CAS at R1.
109 * Establishes a happens-before relationship with a thread of the same
110 * type that released the ht.raw, ensuring this thread observes all of
111 * its memory effects needed to maintain a safe partial order.
112 */
113 oh = __rte_ring_rts_head_wait(&r->rts_prod, __ATOMIC_ACQUIRE);
114
115 /*
116 * A2: Establish a synchronizes-with edge using a store-release at R0.
117 * This ensures that all memory effects from the preceding opposing
118 * thread are observed.
119 */
120 cons_tail = __atomic_load_n(&r->cons.tail, __ATOMIC_ACQUIRE);
121
122 /*
123 * The subtraction is done between two unsigned 32bits value
124 * (the result is always modulo 32 bits even if we have
125 * *old_head > cons_tail). So 'free_entries' is always between 0
126 * and capacity (which is < size).
127 */
128 *free_entries = capacity + cons_tail - oh.val.pos;
129
130 /* check that we have enough room in ring */
131 if (unlikely(n > *free_entries))
132 n = (behavior == RTE_RING_QUEUE_FIXED) ?
133 0 : *free_entries;
134
135 if (n == 0)
136 break;
137
138 nh.val.pos = oh.val.pos + n;
139 nh.val.cnt = oh.val.cnt + 1;
140
141 /*
142 * R1: Establishes a synchronizes-with edge with the load-acquire
143 * of ht.raw at A1. Ensures that the store-release to the tail by
144 * this thread, if it was of the opposite type, becomes
145 * visible to another thread of the current type. That thread will
146 * then observe the updates in the same order, keeping a safe
147 * partial order.
148 */
149 } while (__atomic_compare_exchange_n(&r->rts_prod.head.raw,
150 &oh.raw, nh.raw,
151 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
152
153 *old_head = oh.val.pos;
154 return n;
155}
156
160static __rte_always_inline unsigned int
161__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
162 enum rte_ring_queue_behavior behavior, uint32_t *old_head,
163 uint32_t *entries)
164{
165 uint32_t n, prod_tail;
166 union __rte_ring_rts_poscnt nh, oh;
167
168 /* move cons.head atomically */
169 do {
170 /* Restore n as it may change every loop */
171 n = num;
172
173 /*
174 * wait for cons head/tail distance,
175 * make sure that we read cons head *before*
176 * reading prod tail.
177 */
178 /*
179 * A3: Synchronizes with the CAS at R2.
180 * Establishes a happens-before relationship with a thread of the same
181 * type that released the ht.raw, ensuring this thread observes all of
182 * its memory effects needed to maintain a safe partial order.
183 */
184 oh = __rte_ring_rts_head_wait(&r->rts_cons, __ATOMIC_ACQUIRE);
185
186 /*
187 * A4: Establish a synchronizes-with edge using a store-release at R0.
188 * This ensures that all memory effects from the preceding opposing
189 * thread are observed.
190 */
191 prod_tail = __atomic_load_n(&r->prod.tail, __ATOMIC_ACQUIRE);
192
193 /* The subtraction is done between two unsigned 32bits value
194 * (the result is always modulo 32 bits even if we have
195 * cons_head > prod_tail). So 'entries' is always between 0
196 * and size(ring)-1.
197 */
198 *entries = prod_tail - oh.val.pos;
199
200 /* Set the actual entries for dequeue */
201 if (n > *entries)
202 n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
203
204 if (unlikely(n == 0))
205 break;
206
207 nh.val.pos = oh.val.pos + n;
208 nh.val.cnt = oh.val.cnt + 1;
209
210 /*
211 * R2: Establishes a synchronizes-with edge with the load-acquire
212 * of ht.raw at A3. Ensures that the store-release to the tail by
213 * this thread, if it was of the opposite type, becomes
214 * visible to another thread of the current type. That thread will
215 * then observe the updates in the same order, keeping a safe
216 * partial order.
217 */
218 } while (__atomic_compare_exchange_n(&r->rts_cons.head.raw,
219 &oh.raw, nh.raw,
220 0, __ATOMIC_RELEASE, __ATOMIC_RELAXED) == 0);
221
222 *old_head = oh.val.pos;
223 return n;
224}
225
248static __rte_always_inline unsigned int
249__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
250 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
251 uint32_t *free_space)
252{
253 uint32_t free, head;
254
255 n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
256
257 if (n != 0) {
258 __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
259 __rte_ring_rts_update_tail(&r->rts_prod);
260 }
261
262 if (free_space != NULL)
263 *free_space = free - n;
264 return n;
265}
266
289static __rte_always_inline unsigned int
290__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
291 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
292 uint32_t *available)
293{
294 uint32_t entries, head;
295
296 n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
297
298 if (n != 0) {
299 __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
300 __rte_ring_rts_update_tail(&r->rts_cons);
301 }
302
303 if (available != NULL)
304 *available = entries - n;
305 return n;
306}
307
308#endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define unlikely(x)
#define __rte_always_inline
Definition: rte_common.h:255
static void rte_pause(void)
rte_ring_queue_behavior
Definition: rte_ring_core.h:43
@ RTE_RING_QUEUE_FIXED
Definition: rte_ring_core.h:45
volatile uint32_t tail
Definition: rte_ring_core.h:70
uint32_t capacity