DPDK  17.02.1
rte_ring.h
Go to the documentation of this file.
1 /*-
2  * BSD LICENSE
3  *
4  * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  * * Neither the name of Intel Corporation nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * Derived from FreeBSD's bufring.h
36  *
37  **************************************************************************
38  *
39  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
40  * All rights reserved.
41  *
42  * Redistribution and use in source and binary forms, with or without
43  * modification, are permitted provided that the following conditions are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. The name of Kip Macy nor the names of other
49  * contributors may be used to endorse or promote products derived from
50  * this software without specific prior written permission.
51  *
52  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
54  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
55  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
56  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
57  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
58  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
59  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
60  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
61  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
62  * POSSIBILITY OF SUCH DAMAGE.
63  *
64  ***************************************************************************/
65 
66 #ifndef _RTE_RING_H_
67 #define _RTE_RING_H_
68 
90 #ifdef __cplusplus
91 extern "C" {
92 #endif
93 
94 #include <stdio.h>
95 #include <stdint.h>
96 #include <sys/queue.h>
97 #include <errno.h>
98 #include <rte_common.h>
99 #include <rte_memory.h>
100 #include <rte_lcore.h>
101 #include <rte_atomic.h>
102 #include <rte_branch_prediction.h>
103 #include <rte_memzone.h>
104 
105 #define RTE_TAILQ_RING_NAME "RTE_RING"
106 
107 enum rte_ring_queue_behavior {
108  RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
109  RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items as possible from ring */
110 };
111 
112 #ifdef RTE_LIBRTE_RING_DEBUG
113 
116 struct rte_ring_debug_stats {
117  uint64_t enq_success_bulk;
118  uint64_t enq_success_objs;
119  uint64_t enq_quota_bulk;
120  uint64_t enq_quota_objs;
121  uint64_t enq_fail_bulk;
122  uint64_t enq_fail_objs;
123  uint64_t deq_success_bulk;
124  uint64_t deq_success_objs;
125  uint64_t deq_fail_bulk;
126  uint64_t deq_fail_objs;
128 #endif
129 
130 #define RTE_RING_MZ_PREFIX "RG_"
131 
132 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
133  sizeof(RTE_RING_MZ_PREFIX) + 1)
134 
135 #ifndef RTE_RING_PAUSE_REP_COUNT
136 #define RTE_RING_PAUSE_REP_COUNT 0
138 #endif
139 
140 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
141 
152 struct rte_ring {
153  /*
154  * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
155  * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
156  * next time the ABI changes
157  */
159  int flags;
160  const struct rte_memzone *memzone;
164  struct prod {
165  uint32_t watermark;
166  uint32_t sp_enqueue;
167  uint32_t size;
168  uint32_t mask;
169  volatile uint32_t head;
170  volatile uint32_t tail;
171  } prod __rte_cache_aligned;
172 
174  struct cons {
175  uint32_t sc_dequeue;
176  uint32_t size;
177  uint32_t mask;
178  volatile uint32_t head;
179  volatile uint32_t tail;
180 #ifdef RTE_RING_SPLIT_PROD_CONS
181  } cons __rte_cache_aligned;
182 #else
183  } cons;
184 #endif
185 
186 #ifdef RTE_LIBRTE_RING_DEBUG
187  struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
188 #endif
189 
190  void *ring[] __rte_cache_aligned;
193 };
194 
195 #define RING_F_SP_ENQ 0x0001
196 #define RING_F_SC_DEQ 0x0002
197 #define RTE_RING_QUOT_EXCEED (1 << 31)
198 #define RTE_RING_SZ_MASK (unsigned)(0x0fffffff)
209 #ifdef RTE_LIBRTE_RING_DEBUG
210 #define __RING_STAT_ADD(r, name, n) do { \
211  unsigned __lcore_id = rte_lcore_id(); \
212  if (__lcore_id < RTE_MAX_LCORE) { \
213  r->stats[__lcore_id].name##_objs += n; \
214  r->stats[__lcore_id].name##_bulk += 1; \
215  } \
216  } while(0)
217 #else
218 #define __RING_STAT_ADD(r, name, n) do {} while(0)
219 #endif
220 
235 ssize_t rte_ring_get_memsize(unsigned count);
236 
271 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
272  unsigned flags);
273 
313 struct rte_ring *rte_ring_create(const char *name, unsigned count,
314  int socket_id, unsigned flags);
321 void rte_ring_free(struct rte_ring *r);
322 
341 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
342 
351 void rte_ring_dump(FILE *f, const struct rte_ring *r);
352 
353 /* the actual enqueue of pointers on the ring.
354  * Placed here since identical code needed in both
355  * single and multi producer enqueue functions */
356 #define ENQUEUE_PTRS() do { \
357  const uint32_t size = r->prod.size; \
358  uint32_t idx = prod_head & mask; \
359  if (likely(idx + n < size)) { \
360  for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
361  r->ring[idx] = obj_table[i]; \
362  r->ring[idx+1] = obj_table[i+1]; \
363  r->ring[idx+2] = obj_table[i+2]; \
364  r->ring[idx+3] = obj_table[i+3]; \
365  } \
366  switch (n & 0x3) { \
367  case 3: r->ring[idx++] = obj_table[i++]; \
368  case 2: r->ring[idx++] = obj_table[i++]; \
369  case 1: r->ring[idx++] = obj_table[i++]; \
370  } \
371  } else { \
372  for (i = 0; idx < size; i++, idx++)\
373  r->ring[idx] = obj_table[i]; \
374  for (idx = 0; i < n; i++, idx++) \
375  r->ring[idx] = obj_table[i]; \
376  } \
377 } while(0)
378 
379 /* the actual copy of pointers on the ring to obj_table.
380  * Placed here since identical code needed in both
381  * single and multi consumer dequeue functions */
382 #define DEQUEUE_PTRS() do { \
383  uint32_t idx = cons_head & mask; \
384  const uint32_t size = r->cons.size; \
385  if (likely(idx + n < size)) { \
386  for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
387  obj_table[i] = r->ring[idx]; \
388  obj_table[i+1] = r->ring[idx+1]; \
389  obj_table[i+2] = r->ring[idx+2]; \
390  obj_table[i+3] = r->ring[idx+3]; \
391  } \
392  switch (n & 0x3) { \
393  case 3: obj_table[i++] = r->ring[idx++]; \
394  case 2: obj_table[i++] = r->ring[idx++]; \
395  case 1: obj_table[i++] = r->ring[idx++]; \
396  } \
397  } else { \
398  for (i = 0; idx < size; i++, idx++) \
399  obj_table[i] = r->ring[idx]; \
400  for (idx = 0; i < n; i++, idx++) \
401  obj_table[i] = r->ring[idx]; \
402  } \
403 } while (0)
404 
430 static inline int __attribute__((always_inline))
431 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
432  unsigned n, enum rte_ring_queue_behavior behavior)
433 {
434  uint32_t prod_head, prod_next;
435  uint32_t cons_tail, free_entries;
436  const unsigned max = n;
437  int success;
438  unsigned i, rep = 0;
439  uint32_t mask = r->prod.mask;
440  int ret;
441 
442  /* Avoid the unnecessary cmpset operation below, which is also
443  * potentially harmful when n equals 0. */
444  if (n == 0)
445  return 0;
446 
447  /* move prod.head atomically */
448  do {
449  /* Reset n to the initial burst count */
450  n = max;
451 
452  prod_head = r->prod.head;
453  cons_tail = r->cons.tail;
454  /* The subtraction is done between two unsigned 32bits value
455  * (the result is always modulo 32 bits even if we have
456  * prod_head > cons_tail). So 'free_entries' is always between 0
457  * and size(ring)-1. */
458  free_entries = (mask + cons_tail - prod_head);
459 
460  /* check that we have enough room in ring */
461  if (unlikely(n > free_entries)) {
462  if (behavior == RTE_RING_QUEUE_FIXED) {
463  __RING_STAT_ADD(r, enq_fail, n);
464  return -ENOBUFS;
465  }
466  else {
467  /* No free entry available */
468  if (unlikely(free_entries == 0)) {
469  __RING_STAT_ADD(r, enq_fail, n);
470  return 0;
471  }
472 
473  n = free_entries;
474  }
475  }
476 
477  prod_next = prod_head + n;
478  success = rte_atomic32_cmpset(&r->prod.head, prod_head,
479  prod_next);
480  } while (unlikely(success == 0));
481 
482  /* write entries in ring */
483  ENQUEUE_PTRS();
484  rte_smp_wmb();
485 
486  /* if we exceed the watermark */
487  if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
488  ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
489  (int)(n | RTE_RING_QUOT_EXCEED);
490  __RING_STAT_ADD(r, enq_quota, n);
491  }
492  else {
493  ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
494  __RING_STAT_ADD(r, enq_success, n);
495  }
496 
497  /*
498  * If there are other enqueues in progress that preceded us,
499  * we need to wait for them to complete
500  */
501  while (unlikely(r->prod.tail != prod_head)) {
502  rte_pause();
503 
504  /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
505  * for other thread finish. It gives pre-empted thread a chance
506  * to proceed and finish with ring dequeue operation. */
508  ++rep == RTE_RING_PAUSE_REP_COUNT) {
509  rep = 0;
510  sched_yield();
511  }
512  }
513  r->prod.tail = prod_next;
514  return ret;
515 }
516 
539 static inline int __attribute__((always_inline))
540 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
541  unsigned n, enum rte_ring_queue_behavior behavior)
542 {
543  uint32_t prod_head, cons_tail;
544  uint32_t prod_next, free_entries;
545  unsigned i;
546  uint32_t mask = r->prod.mask;
547  int ret;
548 
549  prod_head = r->prod.head;
550  cons_tail = r->cons.tail;
551  /* The subtraction is done between two unsigned 32bits value
552  * (the result is always modulo 32 bits even if we have
553  * prod_head > cons_tail). So 'free_entries' is always between 0
554  * and size(ring)-1. */
555  free_entries = mask + cons_tail - prod_head;
556 
557  /* check that we have enough room in ring */
558  if (unlikely(n > free_entries)) {
559  if (behavior == RTE_RING_QUEUE_FIXED) {
560  __RING_STAT_ADD(r, enq_fail, n);
561  return -ENOBUFS;
562  }
563  else {
564  /* No free entry available */
565  if (unlikely(free_entries == 0)) {
566  __RING_STAT_ADD(r, enq_fail, n);
567  return 0;
568  }
569 
570  n = free_entries;
571  }
572  }
573 
574  prod_next = prod_head + n;
575  r->prod.head = prod_next;
576 
577  /* write entries in ring */
578  ENQUEUE_PTRS();
579  rte_smp_wmb();
580 
581  /* if we exceed the watermark */
582  if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
583  ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
584  (int)(n | RTE_RING_QUOT_EXCEED);
585  __RING_STAT_ADD(r, enq_quota, n);
586  }
587  else {
588  ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
589  __RING_STAT_ADD(r, enq_success, n);
590  }
591 
592  r->prod.tail = prod_next;
593  return ret;
594 }
595 
623 static inline int __attribute__((always_inline))
624 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
625  unsigned n, enum rte_ring_queue_behavior behavior)
626 {
627  uint32_t cons_head, prod_tail;
628  uint32_t cons_next, entries;
629  const unsigned max = n;
630  int success;
631  unsigned i, rep = 0;
632  uint32_t mask = r->prod.mask;
633 
634  /* Avoid the unnecessary cmpset operation below, which is also
635  * potentially harmful when n equals 0. */
636  if (n == 0)
637  return 0;
638 
639  /* move cons.head atomically */
640  do {
641  /* Restore n as it may change every loop */
642  n = max;
643 
644  cons_head = r->cons.head;
645  prod_tail = r->prod.tail;
646  /* The subtraction is done between two unsigned 32bits value
647  * (the result is always modulo 32 bits even if we have
648  * cons_head > prod_tail). So 'entries' is always between 0
649  * and size(ring)-1. */
650  entries = (prod_tail - cons_head);
651 
652  /* Set the actual entries for dequeue */
653  if (n > entries) {
654  if (behavior == RTE_RING_QUEUE_FIXED) {
655  __RING_STAT_ADD(r, deq_fail, n);
656  return -ENOENT;
657  }
658  else {
659  if (unlikely(entries == 0)){
660  __RING_STAT_ADD(r, deq_fail, n);
661  return 0;
662  }
663 
664  n = entries;
665  }
666  }
667 
668  cons_next = cons_head + n;
669  success = rte_atomic32_cmpset(&r->cons.head, cons_head,
670  cons_next);
671  } while (unlikely(success == 0));
672 
673  /* copy in table */
674  DEQUEUE_PTRS();
675  rte_smp_rmb();
676 
677  /*
678  * If there are other dequeues in progress that preceded us,
679  * we need to wait for them to complete
680  */
681  while (unlikely(r->cons.tail != cons_head)) {
682  rte_pause();
683 
684  /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
685  * for other thread finish. It gives pre-empted thread a chance
686  * to proceed and finish with ring dequeue operation. */
688  ++rep == RTE_RING_PAUSE_REP_COUNT) {
689  rep = 0;
690  sched_yield();
691  }
692  }
693  __RING_STAT_ADD(r, deq_success, n);
694  r->cons.tail = cons_next;
695 
696  return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
697 }
698 
722 static inline int __attribute__((always_inline))
723 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
724  unsigned n, enum rte_ring_queue_behavior behavior)
725 {
726  uint32_t cons_head, prod_tail;
727  uint32_t cons_next, entries;
728  unsigned i;
729  uint32_t mask = r->prod.mask;
730 
731  cons_head = r->cons.head;
732  prod_tail = r->prod.tail;
733  /* The subtraction is done between two unsigned 32bits value
734  * (the result is always modulo 32 bits even if we have
735  * cons_head > prod_tail). So 'entries' is always between 0
736  * and size(ring)-1. */
737  entries = prod_tail - cons_head;
738 
739  if (n > entries) {
740  if (behavior == RTE_RING_QUEUE_FIXED) {
741  __RING_STAT_ADD(r, deq_fail, n);
742  return -ENOENT;
743  }
744  else {
745  if (unlikely(entries == 0)){
746  __RING_STAT_ADD(r, deq_fail, n);
747  return 0;
748  }
749 
750  n = entries;
751  }
752  }
753 
754  cons_next = cons_head + n;
755  r->cons.head = cons_next;
756 
757  /* copy in table */
758  DEQUEUE_PTRS();
759  rte_smp_rmb();
760 
761  __RING_STAT_ADD(r, deq_success, n);
762  r->cons.tail = cons_next;
763  return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
764 }
765 
784 static inline int __attribute__((always_inline))
785 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
786  unsigned n)
787 {
788  return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
789 }
790 
806 static inline int __attribute__((always_inline))
807 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
808  unsigned n)
809 {
810  return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
811 }
812 
832 static inline int __attribute__((always_inline))
833 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
834  unsigned n)
835 {
836  if (r->prod.sp_enqueue)
837  return rte_ring_sp_enqueue_bulk(r, obj_table, n);
838  else
839  return rte_ring_mp_enqueue_bulk(r, obj_table, n);
840 }
841 
858 static inline int __attribute__((always_inline))
859 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
860 {
861  return rte_ring_mp_enqueue_bulk(r, &obj, 1);
862 }
863 
877 static inline int __attribute__((always_inline))
878 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
879 {
880  return rte_ring_sp_enqueue_bulk(r, &obj, 1);
881 }
882 
900 static inline int __attribute__((always_inline))
901 rte_ring_enqueue(struct rte_ring *r, void *obj)
902 {
903  if (r->prod.sp_enqueue)
904  return rte_ring_sp_enqueue(r, obj);
905  else
906  return rte_ring_mp_enqueue(r, obj);
907 }
908 
926 static inline int __attribute__((always_inline))
927 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
928 {
929  return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
930 }
931 
947 static inline int __attribute__((always_inline))
948 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
949 {
950  return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
951 }
952 
971 static inline int __attribute__((always_inline))
972 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
973 {
974  if (r->cons.sc_dequeue)
975  return rte_ring_sc_dequeue_bulk(r, obj_table, n);
976  else
977  return rte_ring_mc_dequeue_bulk(r, obj_table, n);
978 }
979 
995 static inline int __attribute__((always_inline))
996 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
997 {
998  return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
999 }
1000 
1013 static inline int __attribute__((always_inline))
1014 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
1015 {
1016  return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
1017 }
1018 
1035 static inline int __attribute__((always_inline))
1036 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
1037 {
1038  if (r->cons.sc_dequeue)
1039  return rte_ring_sc_dequeue(r, obj_p);
1040  else
1041  return rte_ring_mc_dequeue(r, obj_p);
1042 }
1043 
1053 static inline int
1054 rte_ring_full(const struct rte_ring *r)
1055 {
1056  uint32_t prod_tail = r->prod.tail;
1057  uint32_t cons_tail = r->cons.tail;
1058  return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
1059 }
1060 
1070 static inline int
1071 rte_ring_empty(const struct rte_ring *r)
1072 {
1073  uint32_t prod_tail = r->prod.tail;
1074  uint32_t cons_tail = r->cons.tail;
1075  return !!(cons_tail == prod_tail);
1076 }
1077 
1086 static inline unsigned
1087 rte_ring_count(const struct rte_ring *r)
1088 {
1089  uint32_t prod_tail = r->prod.tail;
1090  uint32_t cons_tail = r->cons.tail;
1091  return (prod_tail - cons_tail) & r->prod.mask;
1092 }
1093 
1102 static inline unsigned
1104 {
1105  uint32_t prod_tail = r->prod.tail;
1106  uint32_t cons_tail = r->cons.tail;
1107  return (cons_tail - prod_tail - 1) & r->prod.mask;
1108 }
1109 
1116 void rte_ring_list_dump(FILE *f);
1117 
1128 struct rte_ring *rte_ring_lookup(const char *name);
1129 
1145 static inline unsigned __attribute__((always_inline))
1146 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1147  unsigned n)
1148 {
1149  return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1150 }
1151 
1164 static inline unsigned __attribute__((always_inline))
1165 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1166  unsigned n)
1167 {
1168  return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1169 }
1170 
1187 static inline unsigned __attribute__((always_inline))
1188 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1189  unsigned n)
1190 {
1191  if (r->prod.sp_enqueue)
1192  return rte_ring_sp_enqueue_burst(r, obj_table, n);
1193  else
1194  return rte_ring_mp_enqueue_burst(r, obj_table, n);
1195 }
1196 
1214 static inline unsigned __attribute__((always_inline))
1215 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1216 {
1217  return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1218 }
1219 
1234 static inline unsigned __attribute__((always_inline))
1235 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1236 {
1237  return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1238 }
1239 
1256 static inline unsigned __attribute__((always_inline))
1257 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1258 {
1259  if (r->cons.sc_dequeue)
1260  return rte_ring_sc_dequeue_burst(r, obj_table, n);
1261  else
1262  return rte_ring_mc_dequeue_burst(r, obj_table, n);
1263 }
1264 
1265 #ifdef __cplusplus
1266 }
1267 #endif
1268 
1269 #endif /* _RTE_RING_H_ */