DPDK 24.11.1
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2010-2020 Intel Corporation
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5 * All rights reserved.
6 * Derived from FreeBSD's bufring.h
7 * Used as BSD-3 Licensed with permission from Kip Macy.
8 */
9
10#ifndef _RTE_RING_RTS_ELEM_PVT_H_
11#define _RTE_RING_RTS_ELEM_PVT_H_
12
24static __rte_always_inline void
25__rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26{
27 union __rte_ring_rts_poscnt h, ot, nt;
28
29 /*
30 * If there are other enqueues/dequeues in progress that
31 * might preceded us, then don't update tail with new value.
32 */
33
34 ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
35
36 do {
37 /* on 32-bit systems we have to do atomic read here */
38 h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
39
40 nt.raw = ot.raw;
41 if (++nt.val.cnt == h.val.cnt)
42 nt.val.pos = h.val.pos;
43
44 } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
45 (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
46 rte_memory_order_release, rte_memory_order_acquire) == 0);
47}
48
53static __rte_always_inline void
54__rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
55 union __rte_ring_rts_poscnt *h)
56{
57 uint32_t max;
58
59 max = ht->htd_max;
60
61 while (h->val.pos - ht->tail.val.pos > max) {
62 rte_pause();
63 h->raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_acquire);
64 }
65}
66
70static __rte_always_inline uint32_t
71__rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
72 enum rte_ring_queue_behavior behavior, uint32_t *old_head,
73 uint32_t *free_entries)
74{
75 uint32_t n;
76 union __rte_ring_rts_poscnt nh, oh;
77
78 const uint32_t capacity = r->capacity;
79
80 oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
81
82 do {
83 /* Reset n to the initial burst count */
84 n = num;
85
86 /*
87 * wait for prod head/tail distance,
88 * make sure that we read prod head *before*
89 * reading cons tail.
90 */
91 __rte_ring_rts_head_wait(&r->rts_prod, &oh);
92
93 /*
94 * The subtraction is done between two unsigned 32bits value
95 * (the result is always modulo 32 bits even if we have
96 * *old_head > cons_tail). So 'free_entries' is always between 0
97 * and capacity (which is < size).
98 */
99 *free_entries = capacity + r->cons.tail - oh.val.pos;
100
101 /* check that we have enough room in ring */
102 if (unlikely(n > *free_entries))
103 n = (behavior == RTE_RING_QUEUE_FIXED) ?
104 0 : *free_entries;
105
106 if (n == 0)
107 break;
108
109 nh.val.pos = oh.val.pos + n;
110 nh.val.cnt = oh.val.cnt + 1;
111
112 /*
113 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
114 * - OOO reads of cons tail value
115 * - OOO copy of elems to the ring
116 */
117 } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
118 (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
119 rte_memory_order_acquire, rte_memory_order_acquire) == 0);
120
121 *old_head = oh.val.pos;
122 return n;
123}
124
128static __rte_always_inline unsigned int
129__rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
130 enum rte_ring_queue_behavior behavior, uint32_t *old_head,
131 uint32_t *entries)
132{
133 uint32_t n;
134 union __rte_ring_rts_poscnt nh, oh;
135
136 oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
137
138 /* move cons.head atomically */
139 do {
140 /* Restore n as it may change every loop */
141 n = num;
142
143 /*
144 * wait for cons head/tail distance,
145 * make sure that we read cons head *before*
146 * reading prod tail.
147 */
148 __rte_ring_rts_head_wait(&r->rts_cons, &oh);
149
150 /* The subtraction is done between two unsigned 32bits value
151 * (the result is always modulo 32 bits even if we have
152 * cons_head > prod_tail). So 'entries' is always between 0
153 * and size(ring)-1.
154 */
155 *entries = r->prod.tail - oh.val.pos;
156
157 /* Set the actual entries for dequeue */
158 if (n > *entries)
159 n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
160
161 if (unlikely(n == 0))
162 break;
163
164 nh.val.pos = oh.val.pos + n;
165 nh.val.cnt = oh.val.cnt + 1;
166
167 /*
168 * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
169 * - OOO reads of prod tail value
170 * - OOO copy of elems from the ring
171 */
172 } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
173 (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
174 rte_memory_order_acquire, rte_memory_order_acquire) == 0);
175
176 *old_head = oh.val.pos;
177 return n;
178}
179
202static __rte_always_inline unsigned int
203__rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
204 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
205 uint32_t *free_space)
206{
207 uint32_t free, head;
208
209 n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
210
211 if (n != 0) {
212 __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
213 __rte_ring_rts_update_tail(&r->rts_prod);
214 }
215
216 if (free_space != NULL)
217 *free_space = free - n;
218 return n;
219}
220
243static __rte_always_inline unsigned int
244__rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
245 uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
246 uint32_t *available)
247{
248 uint32_t entries, head;
249
250 n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
251
252 if (n != 0) {
253 __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
254 __rte_ring_rts_update_tail(&r->rts_cons);
255 }
256
257 if (available != NULL)
258 *available = entries - n;
259 return n;
260}
261
262#endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define unlikely(x)
#define __rte_always_inline
Definition: rte_common.h:413
static void rte_pause(void)
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
@ RTE_RING_QUEUE_FIXED
Definition: rte_ring_core.h:42
uint32_t capacity