DPDK  20.05.0
rte_ring.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  *
3  * Copyright (c) 2010-2020 Intel Corporation
4  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
5  * All rights reserved.
6  * Derived from FreeBSD's bufring.h
7  * Used as BSD-3 Licensed with permission from Kip Macy.
8  */
9 
10 #ifndef _RTE_RING_H_
11 #define _RTE_RING_H_
12 
38 #ifdef __cplusplus
39 extern "C" {
40 #endif
41 
42 #include <rte_ring_core.h>
43 
58 ssize_t rte_ring_get_memsize(unsigned count);
59 
112 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
113  unsigned flags);
114 
172 struct rte_ring *rte_ring_create(const char *name, unsigned count,
173  int socket_id, unsigned flags);
174 
181 void rte_ring_free(struct rte_ring *r);
182 
191 void rte_ring_dump(FILE *f, const struct rte_ring *r);
192 
193 /* the actual enqueue of pointers on the ring.
194  * Placed here since identical code needed in both
195  * single and multi producer enqueue functions */
196 #define ENQUEUE_PTRS(r, ring_start, prod_head, obj_table, n, obj_type) do { \
197  unsigned int i; \
198  const uint32_t size = (r)->size; \
199  uint32_t idx = prod_head & (r)->mask; \
200  obj_type *ring = (obj_type *)ring_start; \
201  if (likely(idx + n < size)) { \
202  for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
203  ring[idx] = obj_table[i]; \
204  ring[idx+1] = obj_table[i+1]; \
205  ring[idx+2] = obj_table[i+2]; \
206  ring[idx+3] = obj_table[i+3]; \
207  } \
208  switch (n & 0x3) { \
209  case 3: \
210  ring[idx++] = obj_table[i++]; /* fallthrough */ \
211  case 2: \
212  ring[idx++] = obj_table[i++]; /* fallthrough */ \
213  case 1: \
214  ring[idx++] = obj_table[i++]; \
215  } \
216  } else { \
217  for (i = 0; idx < size; i++, idx++)\
218  ring[idx] = obj_table[i]; \
219  for (idx = 0; i < n; i++, idx++) \
220  ring[idx] = obj_table[i]; \
221  } \
222 } while (0)
223 
224 /* the actual copy of pointers on the ring to obj_table.
225  * Placed here since identical code needed in both
226  * single and multi consumer dequeue functions */
227 #define DEQUEUE_PTRS(r, ring_start, cons_head, obj_table, n, obj_type) do { \
228  unsigned int i; \
229  uint32_t idx = cons_head & (r)->mask; \
230  const uint32_t size = (r)->size; \
231  obj_type *ring = (obj_type *)ring_start; \
232  if (likely(idx + n < size)) { \
233  for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
234  obj_table[i] = ring[idx]; \
235  obj_table[i+1] = ring[idx+1]; \
236  obj_table[i+2] = ring[idx+2]; \
237  obj_table[i+3] = ring[idx+3]; \
238  } \
239  switch (n & 0x3) { \
240  case 3: \
241  obj_table[i++] = ring[idx++]; /* fallthrough */ \
242  case 2: \
243  obj_table[i++] = ring[idx++]; /* fallthrough */ \
244  case 1: \
245  obj_table[i++] = ring[idx++]; \
246  } \
247  } else { \
248  for (i = 0; idx < size; i++, idx++) \
249  obj_table[i] = ring[idx]; \
250  for (idx = 0; i < n; i++, idx++) \
251  obj_table[i] = ring[idx]; \
252  } \
253 } while (0)
254 
255 /* Between load and load. there might be cpu reorder in weak model
256  * (powerpc/arm).
257  * There are 2 choices for the users
258  * 1.use rmb() memory barrier
259  * 2.use one-direction load_acquire/store_release barrier,defined by
260  * CONFIG_RTE_USE_C11_MEM_MODEL=y
261  * It depends on performance test results.
262  * By default, move common functions to rte_ring_generic.h
263  */
264 #ifdef RTE_USE_C11_MEM_MODEL
265 #include "rte_ring_c11_mem.h"
266 #else
267 #include "rte_ring_generic.h"
268 #endif
269 
290 static __rte_always_inline unsigned int
291 __rte_ring_do_enqueue(struct rte_ring *r, void * const *obj_table,
292  unsigned int n, enum rte_ring_queue_behavior behavior,
293  unsigned int is_sp, unsigned int *free_space)
294 {
295  uint32_t prod_head, prod_next;
296  uint32_t free_entries;
297 
298  n = __rte_ring_move_prod_head(r, is_sp, n, behavior,
299  &prod_head, &prod_next, &free_entries);
300  if (n == 0)
301  goto end;
302 
303  ENQUEUE_PTRS(r, &r[1], prod_head, obj_table, n, void *);
304 
305  update_tail(&r->prod, prod_head, prod_next, is_sp, 1);
306 end:
307  if (free_space != NULL)
308  *free_space = free_entries - n;
309  return n;
310 }
311 
332 static __rte_always_inline unsigned int
333 __rte_ring_do_dequeue(struct rte_ring *r, void **obj_table,
334  unsigned int n, enum rte_ring_queue_behavior behavior,
335  unsigned int is_sc, unsigned int *available)
336 {
337  uint32_t cons_head, cons_next;
338  uint32_t entries;
339 
340  n = __rte_ring_move_cons_head(r, (int)is_sc, n, behavior,
341  &cons_head, &cons_next, &entries);
342  if (n == 0)
343  goto end;
344 
345  DEQUEUE_PTRS(r, &r[1], cons_head, obj_table, n, void *);
346 
347  update_tail(&r->cons, cons_head, cons_next, is_sc, 0);
348 
349 end:
350  if (available != NULL)
351  *available = entries - n;
352  return n;
353 }
354 
373 static __rte_always_inline unsigned int
374 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
375  unsigned int n, unsigned int *free_space)
376 {
377  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
378  RTE_RING_SYNC_MT, free_space);
379 }
380 
396 static __rte_always_inline unsigned int
397 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
398  unsigned int n, unsigned int *free_space)
399 {
400  return __rte_ring_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
401  RTE_RING_SYNC_ST, free_space);
402 }
403 
404 #ifdef ALLOW_EXPERIMENTAL_API
405 #include <rte_ring_elem.h>
406 #endif
407 
427 static __rte_always_inline unsigned int
428 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
429  unsigned int n, unsigned int *free_space)
430 {
431  switch (r->prod.sync_type) {
432  case RTE_RING_SYNC_MT:
433  return rte_ring_mp_enqueue_bulk(r, obj_table, n, free_space);
434  case RTE_RING_SYNC_ST:
435  return rte_ring_sp_enqueue_bulk(r, obj_table, n, free_space);
436 #ifdef ALLOW_EXPERIMENTAL_API
437  case RTE_RING_SYNC_MT_RTS:
438  return rte_ring_mp_rts_enqueue_bulk(r, obj_table, n,
439  free_space);
440  case RTE_RING_SYNC_MT_HTS:
441  return rte_ring_mp_hts_enqueue_bulk(r, obj_table, n,
442  free_space);
443 #endif
444  }
445 
446  /* valid ring should never reach this point */
447  RTE_ASSERT(0);
448  return 0;
449 }
450 
465 static __rte_always_inline int
466 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
467 {
468  return rte_ring_mp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
469 }
470 
482 static __rte_always_inline int
483 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
484 {
485  return rte_ring_sp_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
486 }
487 
503 static __rte_always_inline int
504 rte_ring_enqueue(struct rte_ring *r, void *obj)
505 {
506  return rte_ring_enqueue_bulk(r, &obj, 1, NULL) ? 0 : -ENOBUFS;
507 }
508 
527 static __rte_always_inline unsigned int
528 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table,
529  unsigned int n, unsigned int *available)
530 {
531  return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
532  RTE_RING_SYNC_MT, available);
533 }
534 
551 static __rte_always_inline unsigned int
552 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table,
553  unsigned int n, unsigned int *available)
554 {
555  return __rte_ring_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED,
556  RTE_RING_SYNC_ST, available);
557 }
558 
578 static __rte_always_inline unsigned int
579 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n,
580  unsigned int *available)
581 {
582  switch (r->cons.sync_type) {
583  case RTE_RING_SYNC_MT:
584  return rte_ring_mc_dequeue_bulk(r, obj_table, n, available);
585  case RTE_RING_SYNC_ST:
586  return rte_ring_sc_dequeue_bulk(r, obj_table, n, available);
587 #ifdef ALLOW_EXPERIMENTAL_API
588  case RTE_RING_SYNC_MT_RTS:
589  return rte_ring_mc_rts_dequeue_bulk(r, obj_table, n, available);
590  case RTE_RING_SYNC_MT_HTS:
591  return rte_ring_mc_hts_dequeue_bulk(r, obj_table, n, available);
592 #endif
593  }
594 
595  /* valid ring should never reach this point */
596  RTE_ASSERT(0);
597  return 0;
598 }
599 
615 static __rte_always_inline int
616 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
617 {
618  return rte_ring_mc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
619 }
620 
633 static __rte_always_inline int
634 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
635 {
636  return rte_ring_sc_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
637 }
638 
655 static __rte_always_inline int
656 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
657 {
658  return rte_ring_dequeue_bulk(r, obj_p, 1, NULL) ? 0 : -ENOENT;
659 }
660 
674 __rte_experimental
675 void
676 rte_ring_reset(struct rte_ring *r);
677 
686 static inline unsigned
687 rte_ring_count(const struct rte_ring *r)
688 {
689  uint32_t prod_tail = r->prod.tail;
690  uint32_t cons_tail = r->cons.tail;
691  uint32_t count = (prod_tail - cons_tail) & r->mask;
692  return (count > r->capacity) ? r->capacity : count;
693 }
694 
703 static inline unsigned
705 {
706  return r->capacity - rte_ring_count(r);
707 }
708 
718 static inline int
719 rte_ring_full(const struct rte_ring *r)
720 {
721  return rte_ring_free_count(r) == 0;
722 }
723 
733 static inline int
734 rte_ring_empty(const struct rte_ring *r)
735 {
736  return rte_ring_count(r) == 0;
737 }
738 
749 static inline unsigned int
750 rte_ring_get_size(const struct rte_ring *r)
751 {
752  return r->size;
753 }
754 
763 static inline unsigned int
765 {
766  return r->capacity;
767 }
768 
777 static inline enum rte_ring_sync_type
779 {
780  return r->prod.sync_type;
781 }
782 
791 static inline int
793 {
795 }
796 
805 static inline enum rte_ring_sync_type
807 {
808  return r->cons.sync_type;
809 }
810 
819 static inline int
821 {
823 }
824 
831 void rte_ring_list_dump(FILE *f);
832 
843 struct rte_ring *rte_ring_lookup(const char *name);
844 
863 static __rte_always_inline unsigned
864 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
865  unsigned int n, unsigned int *free_space)
866 {
867  return __rte_ring_do_enqueue(r, obj_table, n,
869 }
870 
886 static __rte_always_inline unsigned
887 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
888  unsigned int n, unsigned int *free_space)
889 {
890  return __rte_ring_do_enqueue(r, obj_table, n,
892 }
893 
913 static __rte_always_inline unsigned
914 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
915  unsigned int n, unsigned int *free_space)
916 {
917  switch (r->prod.sync_type) {
918  case RTE_RING_SYNC_MT:
919  return rte_ring_mp_enqueue_burst(r, obj_table, n, free_space);
920  case RTE_RING_SYNC_ST:
921  return rte_ring_sp_enqueue_burst(r, obj_table, n, free_space);
922 #ifdef ALLOW_EXPERIMENTAL_API
923  case RTE_RING_SYNC_MT_RTS:
924  return rte_ring_mp_rts_enqueue_burst(r, obj_table, n,
925  free_space);
926  case RTE_RING_SYNC_MT_HTS:
927  return rte_ring_mp_hts_enqueue_burst(r, obj_table, n,
928  free_space);
929 #endif
930  }
931 
932  /* valid ring should never reach this point */
933  RTE_ASSERT(0);
934  return 0;
935 }
936 
957 static __rte_always_inline unsigned
958 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table,
959  unsigned int n, unsigned int *available)
960 {
961  return __rte_ring_do_dequeue(r, obj_table, n,
963 }
964 
982 static __rte_always_inline unsigned
983 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table,
984  unsigned int n, unsigned int *available)
985 {
986  return __rte_ring_do_dequeue(r, obj_table, n,
988 }
989 
1009 static __rte_always_inline unsigned
1010 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table,
1011  unsigned int n, unsigned int *available)
1012 {
1013  switch (r->cons.sync_type) {
1014  case RTE_RING_SYNC_MT:
1015  return rte_ring_mc_dequeue_burst(r, obj_table, n, available);
1016  case RTE_RING_SYNC_ST:
1017  return rte_ring_sc_dequeue_burst(r, obj_table, n, available);
1018 #ifdef ALLOW_EXPERIMENTAL_API
1019  case RTE_RING_SYNC_MT_RTS:
1020  return rte_ring_mc_rts_dequeue_burst(r, obj_table, n,
1021  available);
1022  case RTE_RING_SYNC_MT_HTS:
1023  return rte_ring_mc_hts_dequeue_burst(r, obj_table, n,
1024  available);
1025 #endif
1026  }
1027 
1028  /* valid ring should never reach this point */
1029  RTE_ASSERT(0);
1030  return 0;
1031 }
1032 
1033 #ifdef __cplusplus
1034 }
1035 #endif
1036 
1037 #endif /* _RTE_RING_H_ */
static __rte_experimental __rte_always_inline unsigned int rte_ring_mc_rts_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring_rts.h:298
static __rte_experimental __rte_always_inline unsigned int rte_ring_mp_hts_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring_hts.h:247
#define __rte_always_inline
Definition: rte_common.h:193
static __rte_experimental __rte_always_inline unsigned rte_ring_mc_rts_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring_rts.h:348
static __rte_always_inline unsigned int rte_ring_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:428
rte_ring_queue_behavior
Definition: rte_ring_core.h:44
static enum rte_ring_sync_type rte_ring_get_prod_sync_type(const struct rte_ring *r)
Definition: rte_ring.h:778
static __rte_always_inline int rte_ring_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:656
static __rte_experimental __rte_always_inline unsigned int rte_ring_mc_hts_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring_hts.h:271
static __rte_always_inline unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:958
static int rte_ring_empty(const struct rte_ring *r)
Definition: rte_ring.h:734
static __rte_always_inline unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:864
static __rte_experimental __rte_always_inline unsigned rte_ring_mc_hts_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring_hts.h:321
static __rte_always_inline int rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:616
void rte_ring_list_dump(FILE *f)
static __rte_always_inline unsigned int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:374
static __rte_experimental __rte_always_inline unsigned rte_ring_mp_rts_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring_rts.h:322
static __rte_always_inline int rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:483
static __rte_always_inline int rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:466
static unsigned int rte_ring_get_capacity(const struct rte_ring *r)
Definition: rte_ring.h:764
static int rte_ring_is_cons_single(const struct rte_ring *r)
Definition: rte_ring.h:820
static unsigned int rte_ring_get_size(const struct rte_ring *r)
Definition: rte_ring.h:750
static __rte_always_inline int rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:634
uint32_t size
void rte_ring_free(struct rte_ring *r)
static __rte_experimental __rte_always_inline unsigned rte_ring_mp_hts_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring_hts.h:295
static __rte_always_inline unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:887
static __rte_always_inline unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:983
rte_ring_sync_type
Definition: rte_ring_core.h:57
static __rte_experimental __rte_always_inline unsigned int rte_ring_mp_rts_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring_rts.h:274
static int rte_ring_is_prod_single(const struct rte_ring *r)
Definition: rte_ring.h:792
static __rte_always_inline unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:1010
static __rte_always_inline unsigned int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:397
void rte_ring_dump(FILE *f, const struct rte_ring *r)
static unsigned rte_ring_count(const struct rte_ring *r)
Definition: rte_ring.h:687
uint32_t mask
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
static __rte_always_inline unsigned int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:579
static enum rte_ring_sync_type rte_ring_get_cons_sync_type(const struct rte_ring *r)
Definition: rte_ring.h:806
static __rte_always_inline unsigned int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:552
struct rte_ring * rte_ring_lookup(const char *name)
enum rte_ring_sync_type sync_type
Definition: rte_ring_core.h:77
uint32_t capacity
static unsigned rte_ring_free_count(const struct rte_ring *r)
Definition: rte_ring.h:704
static __rte_always_inline int rte_ring_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:504
static __rte_always_inline unsigned rte_ring_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned int n, unsigned int *free_space)
Definition: rte_ring.h:914
__rte_experimental void rte_ring_reset(struct rte_ring *r)
static __rte_always_inline unsigned int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned int n, unsigned int *available)
Definition: rte_ring.h:528
int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags)
static int rte_ring_full(const struct rte_ring *r)
Definition: rte_ring.h:719
ssize_t rte_ring_get_memsize(unsigned count)
volatile uint32_t tail
Definition: rte_ring_core.h:73