DPDK  18.02.2
rte_ring.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2017 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_H_
11 #define _RTE_RING_H_
12 
34 #ifdef __cplusplus
35 extern "C" {
36 #endif
37 
38 #include <stdio.h>
39 #include <stdint.h>
40 #include <sys/queue.h>
41 #include <errno.h>
42 #include <rte_common.h>
43 #include <rte_config.h>
44 #include <rte_memory.h>
45 #include <rte_lcore.h>
46 #include <rte_atomic.h>
47 #include <rte_branch_prediction.h>
48 #include <rte_memzone.h>
49 #include <rte_pause.h>
50 
51 #define RTE_TAILQ_RING_NAME "RTE_RING"
52 
53 enum rte_ring_queue_behavior {
54  RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
55  RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */
56 };
57 
58 #define RTE_RING_MZ_PREFIX "RG_"
59 
60 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
61  sizeof(RTE_RING_MZ_PREFIX) + 1)
62 
63 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
64 
65 #if RTE_CACHE_LINE_SIZE < 128
66 #define PROD_ALIGN (RTE_CACHE_LINE_SIZE * 2)
67 #define CONS_ALIGN (RTE_CACHE_LINE_SIZE * 2)
68 #else
69 #define PROD_ALIGN RTE_CACHE_LINE_SIZE
70 #define CONS_ALIGN RTE_CACHE_LINE_SIZE
71 #endif
72 
73 /* structure to hold a pair of head/tail values and other metadata */
74 struct rte_ring_headtail {
75  volatile uint32_t head;
76  volatile uint32_t tail;
77  uint32_t single;
78 };
79 
90 struct rte_ring {
91  /*
92  * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
93  * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
94  * next time the ABI changes
95  */
97  int flags;
98  const struct rte_memzone *memzone;
100  uint32_t size;
101  uint32_t mask;
102  uint32_t capacity;
105  struct rte_ring_headtail prod __rte_aligned(PROD_ALIGN);
106 
108  struct rte_ring_headtail cons __rte_aligned(CONS_ALIGN);
109 };
110 
111 #define RING_F_SP_ENQ 0x0001
112 #define RING_F_SC_DEQ 0x0002
121 #define RING_F_EXACT_SZ 0x0004
122 #define RTE_RING_SZ_MASK (0x7fffffffU)
124 /* @internal defines for passing to the enqueue dequeue worker functions */
125 #define __IS_SP 1
126 #define __IS_MP 0
127 #define __IS_SC 1
128 #define __IS_MC 0
129 
144 ssize_t rte_ring_get_memsize(unsigned count);
145 
180 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
181  unsigned flags);
182 
222 struct rte_ring *rte_ring_create(const char *name, unsigned count,
223  int socket_id, unsigned flags);
230 void rte_ring_free(struct rte_ring *r);
231 
240 void rte_ring_dump(FILE *f, const struct rte_ring *r);
241 
242 /* the actual enqueue of pointers on the ring.
243  * Placed here since identical code needed in both
244  * single and multi producer enqueue functions */
245 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
246  unsigned int i; \
247  const uint32_t size = (r)->size; \
248  uint32_t idx = prod_head & (r)->mask; \
249  obj_type *ring = (obj_type *)ring_start; \
250  if (likely(idx + n < size)) { \
251  for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
252  ring[idx] = obj_table[i]; \
253  ring[idx+1] = obj_table[i+1]; \
254  ring[idx+2] = obj_table[i+2]; \
255  ring[idx+3] = obj_table[i+3]; \
256  } \
257  switch (n & 0x3) { \
258  case 3: \
259  ring[idx++] = obj_table[i++]; /* fallthrough */ \
260  case 2: \
261  ring[idx++] = obj_table[i++]; /* fallthrough */ \
262  case 1: \
263  ring[idx++] = obj_table[i++]; \
264  } \
265  } else { \
266  for (i = 0; idx < size; i++, idx++)\
267  ring[idx] = obj_table[i]; \
268  for (idx = 0; i < n; i++, idx++) \
269  ring[idx] = obj_table[i]; \
270  } \
271 } while (0)
272 
273 /* the actual copy of pointers on the ring to obj_table.
274  * Placed here since identical code needed in both
275  * single and multi consumer dequeue functions */
276 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
277  unsigned int i; \
278  uint32_t idx = cons_head & (r)->mask; \
279  const uint32_t size = (r)->size; \
280  obj_type *ring = (obj_type *)ring_start; \
281  if (likely(idx + n < size)) { \
282  for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
283  obj_table[i] = ring[idx]; \
284  obj_table[i+1] = ring[idx+1]; \
285  obj_table[i+2] = ring[idx+2]; \
286  obj_table[i+3] = ring[idx+3]; \
287  } \
288  switch (n & 0x3) { \
289  case 3: \
290  obj_table[i++] = ring[idx++]; /* fallthrough */ \
291  case 2: \
292  obj_table[i++] = ring[idx++]; /* fallthrough */ \
293  case 1: \
294  obj_table[i++] = ring[idx++]; \
295  } \
296  } else { \
297  for (i = 0; idx < size; i++, idx++) \
298  obj_table[i] = ring[idx]; \
299  for (idx = 0; i < n; i++, idx++) \
300  obj_table[i] = ring[idx]; \
301  } \
302 } while (0)
303 
304 /* Between load and load. there might be cpu reorder in weak model
305  * (powerpc/arm).
306  * There are 2 choices for the users
307  * 1.use rmb() memory barrier
308  * 2.use one-direcion load_acquire/store_release barrier,defined by
309  * CONFIG_RTE_RING_USE_C11_MEM_MODEL=y
310  * It depends on performance test results.
311  * By default, move common functions to rte_ring_generic.h
312  */
313 #ifdef RTE_RING_USE_C11_MEM_MODEL
314 #include "rte_ring_c11_mem.h"
315 #else
316 #include "rte_ring_generic.h"
317 #endif
318 
339 static __rte_always_inline unsigned int
340 __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
341  unsigned int n, enum rte_ring_queue_behavior behavior,
342  unsigned int is_sp, unsigned int *free_space)
343 {
344  uint32_t prod_head, prod_next;
345  uint32_t free_entries;
346 
347  n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
348  &prod_head, &prod_next, &free_entries);
349  if (n == 0)
350  goto end;
351 
352  ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
353 
354  update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
355 end:
356  if (free_space != NULL)
357  *free_space = free_entries - n;
358  return n;
359 }
360 
381 static __rte_always_inline unsigned int
382 __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
383  unsigned int n, enum rte_ring_queue_behavior behavior,
384  unsigned int is_sc, unsigned int *available)
385 {
386  uint32_t cons_head, cons_next;
387  uint32_t entries;
388 
389  n = __rte_ring_move_cons_head(r, is_sc, n, behavior,
390  &cons_head, &cons_next, &entries);
391  if (n == 0)
392  goto end;
393 
394  DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
395 
396  update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
397 
398 end:
399  if (available != NULL)
400  *available = entries - n;
401  return n;
402 }
403 
422 static __rte_always_inline unsigned int
423 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
424  unsigned int n, unsigned int *free_space)
425 {
426  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
427  __IS_MP, free_space);
428 }
429 
445 static __rte_always_inline unsigned int
446 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
447  unsigned int n, unsigned int *free_space)
448 {
449  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
450  __IS_SP, free_space);
451 }
452 
472 static __rte_always_inline unsigned int
473 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
474  unsigned int n, unsigned int *free_space)
475 {
476  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
477  r->prod.single, free_space);
478 }
479 
494 static __rte_always_inline int
495 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
496 {
497  return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
498 }
499 
511 static __rte_always_inline int
512 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
513 {
514  return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
515 }
516 
532 static __rte_always_inline int
533 rte_ring_enqueue(struct rte_ring *r, void *obj)
534 {
535  return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
536 }
537 
556 static __rte_always_inline unsigned int
557 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
558  unsigned int n, unsigned int *available)
559 {
560  return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
561  __IS_MC, available);
562 }
563 
580 static __rte_always_inline unsigned int
581 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
582  unsigned int n, unsigned int *available)
583 {
584  return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
585  __IS_SC, available);
586 }
587 
607 static __rte_always_inline unsigned int
608 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
609  unsigned int *available)
610 {
611  return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
612  r->cons.single, available);
613 }
614 
630 static __rte_always_inline int
631 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
632 {
633  return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
634 }
635 
648 static __rte_always_inline int
649 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
650 {
651  return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
652 }
653 
670 static __rte_always_inline int
671 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
672 {
673  return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
674 }
675 
684 static inline unsigned
685 rte_ring_count(const struct rte_ring *r)
686 {
687  uint32_t prod_tail = r->prod.tail;
688  uint32_t cons_tail = r->cons.tail;
689  uint32_t count = (prod_tail - cons_tail) & r->mask;
690  return (count > r->capacity) ? r->capacity : count;
691 }
692 
701 static inline unsigned
703 {
704  return r->capacity - rte_ring_count(r);
705 }
706 
716 static inline int
717 rte_ring_full(const struct rte_ring *r)
718 {
719  return rte_ring_free_count(r) == 0;
720 }
721 
731 static inline int
732 rte_ring_empty(const struct rte_ring *r)
733 {
734  return rte_ring_count(r) == 0;
735 }
736 
747 static inline unsigned int
748 rte_ring_get_size(const struct rte_ring *r)
749 {
750  return r->size;
751 }
752 
761 static inline unsigned int
763 {
764  return r->capacity;
765 }
766 
773 void rte_ring_list_dump(FILE *f);
774 
785 struct rte_ring *rte_ring_lookup(const char *name);
786 
805 static __rte_always_inline unsigned
806 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
807  unsigned int n, unsigned int *free_space)
808 {
809  return __rte_ring_do_enqueue(r, obj_table, n,
810  RTE_RING_QUEUE_VARIABLE, __IS_MP, free_space);
811 }
812 
828 static __rte_always_inline unsigned
829 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
830  unsigned int n, unsigned int *free_space)
831 {
832  return __rte_ring_do_enqueue(r, obj_table, n,
833  RTE_RING_QUEUE_VARIABLE, __IS_SP, free_space);
834 }
835 
855 static __rte_always_inline unsigned
856 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
857  unsigned int n, unsigned int *free_space)
858 {
859  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE,
860  r->prod.single, free_space);
861 }
862 
883 static __rte_always_inline unsigned
884 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
885  unsigned int n, unsigned int *available)
886 {
887  return __rte_ring_do_dequeue(r, obj_table, n,
888  RTE_RING_QUEUE_VARIABLE, __IS_MC, available);
889 }
890 
908 static __rte_always_inline unsigned
909 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
910  unsigned int n, unsigned int *available)
911 {
912  return __rte_ring_do_dequeue(r, obj_table, n,
913  RTE_RING_QUEUE_VARIABLE, __IS_SC, available);
914 }
915 
935 static __rte_always_inline unsigned
936 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
937  unsigned int n, unsigned int *available)
938 {
939  return __rte_ring_do_dequeue(r, obj_table, n,
940  RTE_RING_QUEUE_VARIABLE,
941  r->cons.single, available);
942 }
943 
944 #ifdef __cplusplus
945 }
946 #endif
947 
948 #endif /* _RTE_RING_H_ */