DPDK 24.11.4-rc1
rte_ring_c11_pvt.h
1/* SPDX-License-Identifier: BSD-3-Clause
2 *
3 * Copyright (c) 2017,2018 HXT-semitech Corporation.
4 * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5 * Copyright (c) 2021 Arm Limited
6 * All rights reserved.
7 * Derived from FreeBSD's bufring.h
8 * Used as BSD-3 Licensed with permission from Kip Macy.
9 */
10
11#ifndef _RTE_RING_C11_PVT_H_
12#define _RTE_RING_C11_PVT_H_
13
14static __rte_always_inline void
15__rte_ring_update_tail(struct rte_ring_headtail *ht, uint32_t old_val,
16 uint32_t new_val, uint32_t single, uint32_t enqueue)
17{
18 RTE_SET_USED(enqueue);
19
20 /*
21 * If there are other enqueues/dequeues in progress that preceded us,
22 * we need to wait for them to complete
23 */
24 if (!single)
25 rte_wait_until_equal_32((uint32_t *)(uintptr_t)&ht->tail, old_val,
26 rte_memory_order_relaxed);
27 /*
28 * R0: Establishes a synchronizing edge with load-acquire of
29 * cons_tail at A1 or prod_tail at A4.
30 * Ensures that memory effects by this thread on ring elements array
31 * is observed by a different thread of the other type.
32 */
33 rte_atomic_store_explicit(&ht->tail, new_val, rte_memory_order_release);
34}
35
59static __rte_always_inline unsigned int
60__rte_ring_move_prod_head(struct rte_ring *r, unsigned int is_sp,
61 unsigned int n, enum rte_ring_queue_behavior behavior,
62 uint32_t *old_head, uint32_t *new_head,
63 uint32_t *free_entries)
64{
65 const uint32_t capacity = r->capacity;
66 uint32_t cons_tail;
67 unsigned int max = n;
68 int success;
69
70 /*
71 * A0: Establishes a synchronizing edge with R1.
72 * Ensure that this thread observes same values
73 * to cons_tail observed by the thread that
74 * updated r->prod.head.
75 * If not, an unsafe partial order may ensue.
76 */
77 *old_head = rte_atomic_load_explicit(&r->prod.head, rte_memory_order_acquire);
78 do {
79 /* Reset n to the initial burst count */
80 n = max;
81
82
83 /*
84 * A1: Establishes a synchronizing edge with R0.
85 * Ensures that other thread's memory effects on
86 * ring elements array is observed by the time
87 * this thread observes its tail update.
88 */
89 cons_tail = rte_atomic_load_explicit(&r->cons.tail,
90 rte_memory_order_acquire);
91
92 /* The subtraction is done between two unsigned 32bits value
93 * (the result is always modulo 32 bits even if we have
94 * *old_head > cons_tail). So 'free_entries' is always between 0
95 * and capacity (which is < size).
96 */
97 *free_entries = (capacity + cons_tail - *old_head);
98
99 /* check that we have enough room in ring */
100 if (unlikely(n > *free_entries))
101 n = (behavior == RTE_RING_QUEUE_FIXED) ?
102 0 : *free_entries;
103
104 if (n == 0)
105 return 0;
106
107 *new_head = *old_head + n;
108 if (is_sp) {
109 r->prod.head = *new_head;
110 success = 1;
111 } else
112 /* on failure, *old_head is updated */
113 /*
114 * R1/A2.
115 * R1: Establishes a synchronizing edge with A0 of a
116 * different thread.
117 * A2: Establishes a synchronizing edge with R1 of a
118 * different thread to observe same value for
119 * cons_tail observed by that thread on CAS failure
120 * (to retry with an updated *old_head).
121 */
122 success = rte_atomic_compare_exchange_strong_explicit(&r->prod.head,
123 old_head, *new_head,
124 rte_memory_order_release,
125 rte_memory_order_acquire);
126 } while (unlikely(success == 0));
127 return n;
128}
129
153static __rte_always_inline unsigned int
154__rte_ring_move_cons_head(struct rte_ring *r, int is_sc,
155 unsigned int n, enum rte_ring_queue_behavior behavior,
156 uint32_t *old_head, uint32_t *new_head,
157 uint32_t *entries)
158{
159 unsigned int max = n;
160 uint32_t prod_tail;
161 int success;
162
163 /*
164 * A3: Establishes a synchronizing edge with R2.
165 * Ensure that this thread observes same values
166 * to prod_tail observed by the thread that
167 * updated r->cons.head.
168 * If not, an unsafe partial order may ensue.
169 */
170 *old_head = rte_atomic_load_explicit(&r->cons.head, rte_memory_order_acquire);
171 do {
172 /* Restore n as it may change every loop */
173 n = max;
174
175 /*
176 * A4: Establishes a synchronizing edge with R0.
177 * Ensures that other thread's memory effects on
178 * ring elements array is observed by the time
179 * this thread observes its tail update.
180 */
181 prod_tail = rte_atomic_load_explicit(&r->prod.tail,
182 rte_memory_order_acquire);
183
184 /* The subtraction is done between two unsigned 32bits value
185 * (the result is always modulo 32 bits even if we have
186 * cons_head > prod_tail). So 'entries' is always between 0
187 * and size(ring)-1.
188 */
189 *entries = (prod_tail - *old_head);
190
191 /* Set the actual entries for dequeue */
192 if (n > *entries)
193 n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
194
195 if (unlikely(n == 0))
196 return 0;
197
198 *new_head = *old_head + n;
199 if (is_sc) {
200 r->cons.head = *new_head;
201 success = 1;
202 } else
203 /* on failure, *old_head will be updated */
204 /*
205 * R2/A5.
206 * R2: Establishes a synchronizing edge with A3 of a
207 * different thread.
208 * A5: Establishes a synchronizing edge with R2 of a
209 * different thread to observe same value for
210 * prod_tail observed by that thread on CAS failure
211 * (to retry with an updated *old_head).
212 */
213 success = rte_atomic_compare_exchange_strong_explicit(&r->cons.head,
214 old_head, *new_head,
215 rte_memory_order_release,
216 rte_memory_order_acquire);
217 } while (unlikely(success == 0));
218 return n;
219}
220
221#endif /* _RTE_RING_C11_PVT_H_ */
#define unlikely(x)
#define RTE_SET_USED(x)
Definition: rte_common.h:187
#define __rte_always_inline
Definition: rte_common.h:413
static __rte_always_inline void rte_wait_until_equal_32(volatile uint32_t *addr, uint32_t expected, rte_memory_order memorder)
Definition: rte_pause.h:95
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
@ RTE_RING_QUEUE_FIXED
Definition: rte_ring_core.h:42
uint32_t capacity