DPDK  24.11.0-rc3
rte_ring_rts_elem_pvt.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_RTS_ELEM_PVT_H_
11 #define _RTE_RING_RTS_ELEM_PVT_H_
12 
24 static __rte_always_inline void
25 __rte_ring_rts_update_tail(struct rte_ring_rts_headtail *ht)
26 {
27  union __rte_ring_rts_poscnt h, ot, nt;
28 
29  /*
30  * If there are other enqueues/dequeues in progress that
31  * might preceded us, then don't update tail with new value.
32  */
33 
34  ot.raw = rte_atomic_load_explicit(&ht->tail.raw, rte_memory_order_acquire);
35 
36  do {
37  /* on 32-bit systems we have to do atomic read here */
38  h.raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_relaxed);
39 
40  nt.raw = ot.raw;
41  if (++nt.val.cnt == h.val.cnt)
42  nt.val.pos = h.val.pos;
43 
44  } while (rte_atomic_compare_exchange_strong_explicit(&ht->tail.raw,
45  (uint64_t *)(uintptr_t)&ot.raw, nt.raw,
46  rte_memory_order_release, rte_memory_order_acquire) == 0);
47 }
48 
53 static __rte_always_inline void
54 __rte_ring_rts_head_wait(const struct rte_ring_rts_headtail *ht,
55  union __rte_ring_rts_poscnt *h)
56 {
57  uint32_t max;
58 
59  max = ht->htd_max;
60 
61  while (h->val.pos - ht->tail.val.pos > max) {
62  rte_pause();
63  h->raw = rte_atomic_load_explicit(&ht->head.raw, rte_memory_order_acquire);
64  }
65 }
66 
70 static __rte_always_inline uint32_t
71 __rte_ring_rts_move_prod_head(struct rte_ring *r, uint32_t num,
72  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
73  uint32_t *free_entries)
74 {
75  uint32_t n;
76  union __rte_ring_rts_poscnt nh, oh;
77 
78  const uint32_t capacity = r->capacity;
79 
80  oh.raw = rte_atomic_load_explicit(&r->rts_prod.head.raw, rte_memory_order_acquire);
81 
82  do {
83  /* Reset n to the initial burst count */
84  n = num;
85 
86  /*
87  * wait for prod head/tail distance,
88  * make sure that we read prod head *before*
89  * reading cons tail.
90  */
91  __rte_ring_rts_head_wait(&r->rts_prod, &oh);
92 
93  /*
94  * The subtraction is done between two unsigned 32bits value
95  * (the result is always modulo 32 bits even if we have
96  * *old_head > cons_tail). So 'free_entries' is always between 0
97  * and capacity (which is < size).
98  */
99  *free_entries = capacity + r->cons.tail - oh.val.pos;
100 
101  /* check that we have enough room in ring */
102  if (unlikely(n > *free_entries))
103  n = (behavior == RTE_RING_QUEUE_FIXED) ?
104  0 : *free_entries;
105 
106  if (n == 0)
107  break;
108 
109  nh.val.pos = oh.val.pos + n;
110  nh.val.cnt = oh.val.cnt + 1;
111 
112  /*
113  * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
114  * - OOO reads of cons tail value
115  * - OOO copy of elems to the ring
116  */
117  } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_prod.head.raw,
118  (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
119  rte_memory_order_acquire, rte_memory_order_acquire) == 0);
120 
121  *old_head = oh.val.pos;
122  return n;
123 }
124 
128 static __rte_always_inline unsigned int
129 __rte_ring_rts_move_cons_head(struct rte_ring *r, uint32_t num,
130  enum rte_ring_queue_behavior behavior, uint32_t *old_head,
131  uint32_t *entries)
132 {
133  uint32_t n;
134  union __rte_ring_rts_poscnt nh, oh;
135 
136  oh.raw = rte_atomic_load_explicit(&r->rts_cons.head.raw, rte_memory_order_acquire);
137 
138  /* move cons.head atomically */
139  do {
140  /* Restore n as it may change every loop */
141  n = num;
142 
143  /*
144  * wait for cons head/tail distance,
145  * make sure that we read cons head *before*
146  * reading prod tail.
147  */
148  __rte_ring_rts_head_wait(&r->rts_cons, &oh);
149 
150  /* The subtraction is done between two unsigned 32bits value
151  * (the result is always modulo 32 bits even if we have
152  * cons_head > prod_tail). So 'entries' is always between 0
153  * and size(ring)-1.
154  */
155  *entries = r->prod.tail - oh.val.pos;
156 
157  /* Set the actual entries for dequeue */
158  if (n > *entries)
159  n = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : *entries;
160 
161  if (unlikely(n == 0))
162  break;
163 
164  nh.val.pos = oh.val.pos + n;
165  nh.val.cnt = oh.val.cnt + 1;
166 
167  /*
168  * this CAS(ACQUIRE, ACQUIRE) serves as a hoist barrier to prevent:
169  * - OOO reads of prod tail value
170  * - OOO copy of elems from the ring
171  */
172  } while (rte_atomic_compare_exchange_strong_explicit(&r->rts_cons.head.raw,
173  (uint64_t *)(uintptr_t)&oh.raw, nh.raw,
174  rte_memory_order_acquire, rte_memory_order_acquire) == 0);
175 
176  *old_head = oh.val.pos;
177  return n;
178 }
179 
202 static __rte_always_inline unsigned int
203 __rte_ring_do_rts_enqueue_elem(struct rte_ring *r, const void *obj_table,
204  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
205  uint32_t *free_space)
206 {
207  uint32_t free, head;
208 
209  n = __rte_ring_rts_move_prod_head(r, n, behavior, &head, &free);
210 
211  if (n != 0) {
212  __rte_ring_enqueue_elems(r, head, obj_table, esize, n);
213  __rte_ring_rts_update_tail(&r->rts_prod);
214  }
215 
216  if (free_space != NULL)
217  *free_space = free - n;
218  return n;
219 }
220 
243 static __rte_always_inline unsigned int
244 __rte_ring_do_rts_dequeue_elem(struct rte_ring *r, void *obj_table,
245  uint32_t esize, uint32_t n, enum rte_ring_queue_behavior behavior,
246  uint32_t *available)
247 {
248  uint32_t entries, head;
249 
250  n = __rte_ring_rts_move_cons_head(r, n, behavior, &head, &entries);
251 
252  if (n != 0) {
253  __rte_ring_dequeue_elems(r, head, obj_table, esize, n);
254  __rte_ring_rts_update_tail(&r->rts_cons);
255  }
256 
257  if (available != NULL)
258  *available = entries - n;
259  return n;
260 }
261 
262 #endif /* _RTE_RING_RTS_ELEM_PVT_H_ */
#define __rte_always_inline
Definition: rte_common.h:413
rte_ring_queue_behavior
Definition: rte_ring_core.h:40
#define unlikely(x)
static void rte_pause(void)
uint32_t capacity