DPDK  16.11.11
rte_ring.h
Go to the documentation of this file.
1 /*-
2  * BSD LICENSE
3  *
4  * Copyright(c) 2010-2014 Intel Corporation. All rights reserved.
5  * All rights reserved.
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * * Redistributions of source code must retain the above copyright
12  * notice, this list of conditions and the following disclaimer.
13  * * Redistributions in binary form must reproduce the above copyright
14  * notice, this list of conditions and the following disclaimer in
15  * the documentation and/or other materials provided with the
16  * distribution.
17  * * Neither the name of Intel Corporation nor the names of its
18  * contributors may be used to endorse or promote products derived
19  * from this software without specific prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
24  * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
25  * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
26  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
27  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
28  * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
29  * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
30  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
31  * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
32  */
33 
34 /*
35  * Derived from FreeBSD's bufring.h
36  *
37  **************************************************************************
38  *
39  * Copyright (c) 2007-2009 Kip Macy kmacy@freebsd.org
40  * All rights reserved.
41  *
42  * Redistribution and use in source and binary forms, with or without
43  * modification, are permitted provided that the following conditions are met:
44  *
45  * 1. Redistributions of source code must retain the above copyright notice,
46  * this list of conditions and the following disclaimer.
47  *
48  * 2. The name of Kip Macy nor the names of other
49  * contributors may be used to endorse or promote products derived from
50  * this software without specific prior written permission.
51  *
52  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
53  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
54  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
55  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
56  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
57  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
58  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
59  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
60  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
61  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
62  * POSSIBILITY OF SUCH DAMAGE.
63  *
64  ***************************************************************************/
65 
66 #ifndef _RTE_RING_H_
67 #define _RTE_RING_H_
68 
90 #ifdef __cplusplus
91 extern "C" {
92 #endif
93 
94 #include <stdio.h>
95 #include <stdint.h>
96 #include <sys/queue.h>
97 #include <errno.h>
98 #include <rte_common.h>
99 #include <rte_memory.h>
100 #include <rte_lcore.h>
101 #include <rte_atomic.h>
102 #include <rte_branch_prediction.h>
103 #include <rte_memzone.h>
104 
105 #define RTE_TAILQ_RING_NAME "RTE_RING"
106 
107 enum rte_ring_queue_behavior {
108  RTE_RING_QUEUE_FIXED = 0, /* Enq/Deq a fixed number of items from a ring */
109  RTE_RING_QUEUE_VARIABLE /* Enq/Deq as many items a possible from ring */
110 };
111 
112 #ifdef RTE_LIBRTE_RING_DEBUG
113 
116 struct rte_ring_debug_stats {
117  uint64_t enq_success_bulk;
118  uint64_t enq_success_objs;
119  uint64_t enq_quota_bulk;
120  uint64_t enq_quota_objs;
121  uint64_t enq_fail_bulk;
122  uint64_t enq_fail_objs;
123  uint64_t deq_success_bulk;
124  uint64_t deq_success_objs;
125  uint64_t deq_fail_bulk;
126  uint64_t deq_fail_objs;
128 #endif
129 
130 #define RTE_RING_MZ_PREFIX "RG_"
131 
132 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
133  sizeof(RTE_RING_MZ_PREFIX) + 1)
134 
135 #ifndef RTE_RING_PAUSE_REP_COUNT
136 #define RTE_RING_PAUSE_REP_COUNT 0
138 #endif
139 
140 struct rte_memzone; /* forward declaration, so as not to require memzone.h */
141 
152 struct rte_ring {
153  /*
154  * Note: this field kept the RTE_MEMZONE_NAMESIZE size due to ABI
155  * compatibility requirements, it could be changed to RTE_RING_NAMESIZE
156  * next time the ABI changes
157  */
158  char name[RTE_MEMZONE_NAMESIZE];
159  int flags;
160  const struct rte_memzone *memzone;
164  struct prod {
165  uint32_t watermark;
166  uint32_t sp_enqueue;
167  uint32_t size;
168  uint32_t mask;
169  volatile uint32_t head;
170  volatile uint32_t tail;
171  } prod __rte_cache_aligned;
172 
174  struct cons {
175  uint32_t sc_dequeue;
176  uint32_t size;
177  uint32_t mask;
178  volatile uint32_t head;
179  volatile uint32_t tail;
180 #ifdef RTE_RING_SPLIT_PROD_CONS
181  } cons __rte_cache_aligned;
182 #else
183  } cons;
184 #endif
185 
186 #ifdef RTE_LIBRTE_RING_DEBUG
187  struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
188 #endif
189 
190  void *ring[] __rte_cache_aligned;
193 };
194 
195 #define RING_F_SP_ENQ 0x0001
196 #define RING_F_SC_DEQ 0x0002
197 #define RTE_RING_QUOT_EXCEED (1 << 31)
198 #define RTE_RING_SZ_MASK (unsigned)(0x0fffffff)
209 #ifdef RTE_LIBRTE_RING_DEBUG
210 #define __RING_STAT_ADD(r, name, n) do { \
211  unsigned __lcore_id = rte_lcore_id(); \
212  if (__lcore_id < RTE_MAX_LCORE) { \
213  r->stats[__lcore_id].name##_objs += n; \
214  r->stats[__lcore_id].name##_bulk += 1; \
215  } \
216  } while(0)
217 #else
218 #define __RING_STAT_ADD(r, name, n) do {} while(0)
219 #endif
220 
235 ssize_t rte_ring_get_memsize(unsigned count);
236 
271 int rte_ring_init(struct rte_ring *r, const char *name, unsigned count,
272  unsigned flags);
273 
313 struct rte_ring *rte_ring_create(const char *name, unsigned count,
314  int socket_id, unsigned flags);
321 void rte_ring_free(struct rte_ring *r);
322 
341 int rte_ring_set_water_mark(struct rte_ring *r, unsigned count);
342 
351 void rte_ring_dump(FILE *f, const struct rte_ring *r);
352 
353 /* the actual enqueue of pointers on the ring.
354  * Placed here since identical code needed in both
355  * single and multi producer enqueue functions */
356 #define ENQUEUE_PTRS() do { \
357  const uint32_t size = r->prod.size; \
358  uint32_t idx = prod_head & mask; \
359  if (likely(idx + n < size)) { \
360  for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
361  r->ring[idx] = obj_table[i]; \
362  r->ring[idx+1] = obj_table[i+1]; \
363  r->ring[idx+2] = obj_table[i+2]; \
364  r->ring[idx+3] = obj_table[i+3]; \
365  } \
366  switch (n & 0x3) { \
367  case 3: r->ring[idx++] = obj_table[i++]; \
368  case 2: r->ring[idx++] = obj_table[i++]; \
369  case 1: r->ring[idx++] = obj_table[i++]; \
370  } \
371  } else { \
372  for (i = 0; idx < size; i++, idx++)\
373  r->ring[idx] = obj_table[i]; \
374  for (idx = 0; i < n; i++, idx++) \
375  r->ring[idx] = obj_table[i]; \
376  } \
377 } while(0)
378 
379 /* the actual copy of pointers on the ring to obj_table.
380  * Placed here since identical code needed in both
381  * single and multi consumer dequeue functions */
382 #define DEQUEUE_PTRS() do { \
383  uint32_t idx = cons_head & mask; \
384  const uint32_t size = r->cons.size; \
385  if (likely(idx + n < size)) { \
386  for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
387  obj_table[i] = r->ring[idx]; \
388  obj_table[i+1] = r->ring[idx+1]; \
389  obj_table[i+2] = r->ring[idx+2]; \
390  obj_table[i+3] = r->ring[idx+3]; \
391  } \
392  switch (n & 0x3) { \
393  case 3: obj_table[i++] = r->ring[idx++]; \
394  case 2: obj_table[i++] = r->ring[idx++]; \
395  case 1: obj_table[i++] = r->ring[idx++]; \
396  } \
397  } else { \
398  for (i = 0; idx < size; i++, idx++) \
399  obj_table[i] = r->ring[idx]; \
400  for (idx = 0; i < n; i++, idx++) \
401  obj_table[i] = r->ring[idx]; \
402  } \
403 } while (0)
404 
430 static inline int __attribute__((always_inline))
431 __rte_ring_mp_do_enqueue(struct rte_ring *r, void * const *obj_table,
432  unsigned n, enum rte_ring_queue_behavior behavior)
433 {
434  uint32_t prod_head, prod_next;
435  uint32_t cons_tail, free_entries;
436  const unsigned max = n;
437  int success;
438  unsigned i, rep = 0;
439  uint32_t mask = r->prod.mask;
440  int ret;
441 
442  /* Avoid the unnecessary cmpset operation below, which is also
443  * potentially harmful when n equals 0. */
444  if (n == 0)
445  return 0;
446 
447  /* move prod.head atomically */
448  do {
449  /* Reset n to the initial burst count */
450  n = max;
451 
452  prod_head = r->prod.head;
453 
454  /* add rmb barrier to avoid load/load reorder in weak
455  * memory model. It is noop on x86
456  */
457  rte_smp_rmb();
458 
459  cons_tail = r->cons.tail;
460  /* The subtraction is done between two unsigned 32bits value
461  * (the result is always modulo 32 bits even if we have
462  * prod_head > cons_tail). So 'free_entries' is always between 0
463  * and size(ring)-1. */
464  free_entries = (mask + cons_tail - prod_head);
465 
466  /* check that we have enough room in ring */
467  if (unlikely(n > free_entries)) {
468  if (behavior == RTE_RING_QUEUE_FIXED) {
469  __RING_STAT_ADD(r, enq_fail, n);
470  return -ENOBUFS;
471  }
472  else {
473  /* No free entry available */
474  if (unlikely(free_entries == 0)) {
475  __RING_STAT_ADD(r, enq_fail, n);
476  return 0;
477  }
478 
479  n = free_entries;
480  }
481  }
482 
483  prod_next = prod_head + n;
484  success = rte_atomic32_cmpset(&r->prod.head, prod_head,
485  prod_next);
486  } while (unlikely(success == 0));
487 
488  /* write entries in ring */
489  ENQUEUE_PTRS();
490  rte_smp_wmb();
491 
492  /* if we exceed the watermark */
493  if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
494  ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
495  (int)(n | RTE_RING_QUOT_EXCEED);
496  __RING_STAT_ADD(r, enq_quota, n);
497  }
498  else {
499  ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
500  __RING_STAT_ADD(r, enq_success, n);
501  }
502 
503  /*
504  * If there are other enqueues in progress that preceded us,
505  * we need to wait for them to complete
506  */
507  while (unlikely(r->prod.tail != prod_head)) {
508  rte_pause();
509 
510  /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
511  * for other thread finish. It gives pre-empted thread a chance
512  * to proceed and finish with ring dequeue operation. */
514  ++rep == RTE_RING_PAUSE_REP_COUNT) {
515  rep = 0;
516  sched_yield();
517  }
518  }
519  r->prod.tail = prod_next;
520  return ret;
521 }
522 
545 static inline int __attribute__((always_inline))
546 __rte_ring_sp_do_enqueue(struct rte_ring *r, void * const *obj_table,
547  unsigned n, enum rte_ring_queue_behavior behavior)
548 {
549  uint32_t prod_head, cons_tail;
550  uint32_t prod_next, free_entries;
551  unsigned i;
552  uint32_t mask = r->prod.mask;
553  int ret;
554 
555  prod_head = r->prod.head;
556  cons_tail = r->cons.tail;
557  /* The subtraction is done between two unsigned 32bits value
558  * (the result is always modulo 32 bits even if we have
559  * prod_head > cons_tail). So 'free_entries' is always between 0
560  * and size(ring)-1. */
561  free_entries = mask + cons_tail - prod_head;
562 
563  /* check that we have enough room in ring */
564  if (unlikely(n > free_entries)) {
565  if (behavior == RTE_RING_QUEUE_FIXED) {
566  __RING_STAT_ADD(r, enq_fail, n);
567  return -ENOBUFS;
568  }
569  else {
570  /* No free entry available */
571  if (unlikely(free_entries == 0)) {
572  __RING_STAT_ADD(r, enq_fail, n);
573  return 0;
574  }
575 
576  n = free_entries;
577  }
578  }
579 
580  prod_next = prod_head + n;
581  r->prod.head = prod_next;
582 
583  /* write entries in ring */
584  ENQUEUE_PTRS();
585  rte_smp_wmb();
586 
587  /* if we exceed the watermark */
588  if (unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
589  ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
590  (int)(n | RTE_RING_QUOT_EXCEED);
591  __RING_STAT_ADD(r, enq_quota, n);
592  }
593  else {
594  ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
595  __RING_STAT_ADD(r, enq_success, n);
596  }
597 
598  r->prod.tail = prod_next;
599  return ret;
600 }
601 
629 static inline int __attribute__((always_inline))
630 __rte_ring_mc_do_dequeue(struct rte_ring *r, void **obj_table,
631  unsigned n, enum rte_ring_queue_behavior behavior)
632 {
633  uint32_t cons_head, prod_tail;
634  uint32_t cons_next, entries;
635  const unsigned max = n;
636  int success;
637  unsigned i, rep = 0;
638  uint32_t mask = r->prod.mask;
639 
640  /* Avoid the unnecessary cmpset operation below, which is also
641  * potentially harmful when n equals 0. */
642  if (n == 0)
643  return 0;
644 
645  /* move cons.head atomically */
646  do {
647  /* Restore n as it may change every loop */
648  n = max;
649 
650  cons_head = r->cons.head;
651 
652  /* add rmb barrier to avoid load/load reorder in weak
653  * memory model. It is noop on x86
654  */
655  rte_smp_rmb();
656 
657  prod_tail = r->prod.tail;
658  /* The subtraction is done between two unsigned 32bits value
659  * (the result is always modulo 32 bits even if we have
660  * cons_head > prod_tail). So 'entries' is always between 0
661  * and size(ring)-1. */
662  entries = (prod_tail - cons_head);
663 
664  /* Set the actual entries for dequeue */
665  if (n > entries) {
666  if (behavior == RTE_RING_QUEUE_FIXED) {
667  __RING_STAT_ADD(r, deq_fail, n);
668  return -ENOENT;
669  }
670  else {
671  if (unlikely(entries == 0)){
672  __RING_STAT_ADD(r, deq_fail, n);
673  return 0;
674  }
675 
676  n = entries;
677  }
678  }
679 
680  cons_next = cons_head + n;
681  success = rte_atomic32_cmpset(&r->cons.head, cons_head,
682  cons_next);
683  } while (unlikely(success == 0));
684 
685  /* copy in table */
686  DEQUEUE_PTRS();
687  rte_smp_rmb();
688 
689  /*
690  * If there are other dequeues in progress that preceded us,
691  * we need to wait for them to complete
692  */
693  while (unlikely(r->cons.tail != cons_head)) {
694  rte_pause();
695 
696  /* Set RTE_RING_PAUSE_REP_COUNT to avoid spin too long waiting
697  * for other thread finish. It gives pre-empted thread a chance
698  * to proceed and finish with ring dequeue operation. */
700  ++rep == RTE_RING_PAUSE_REP_COUNT) {
701  rep = 0;
702  sched_yield();
703  }
704  }
705  __RING_STAT_ADD(r, deq_success, n);
706  r->cons.tail = cons_next;
707 
708  return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
709 }
710 
734 static inline int __attribute__((always_inline))
735 __rte_ring_sc_do_dequeue(struct rte_ring *r, void **obj_table,
736  unsigned n, enum rte_ring_queue_behavior behavior)
737 {
738  uint32_t cons_head, prod_tail;
739  uint32_t cons_next, entries;
740  unsigned i;
741  uint32_t mask = r->prod.mask;
742 
743  cons_head = r->cons.head;
744  prod_tail = r->prod.tail;
745  /* The subtraction is done between two unsigned 32bits value
746  * (the result is always modulo 32 bits even if we have
747  * cons_head > prod_tail). So 'entries' is always between 0
748  * and size(ring)-1. */
749  entries = prod_tail - cons_head;
750 
751  if (n > entries) {
752  if (behavior == RTE_RING_QUEUE_FIXED) {
753  __RING_STAT_ADD(r, deq_fail, n);
754  return -ENOENT;
755  }
756  else {
757  if (unlikely(entries == 0)){
758  __RING_STAT_ADD(r, deq_fail, n);
759  return 0;
760  }
761 
762  n = entries;
763  }
764  }
765 
766  cons_next = cons_head + n;
767  r->cons.head = cons_next;
768 
769  /* copy in table */
770  DEQUEUE_PTRS();
771  rte_smp_rmb();
772 
773  __RING_STAT_ADD(r, deq_success, n);
774  r->cons.tail = cons_next;
775  return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
776 }
777 
796 static inline int __attribute__((always_inline))
797 rte_ring_mp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
798  unsigned n)
799 {
800  return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
801 }
802 
818 static inline int __attribute__((always_inline))
819 rte_ring_sp_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
820  unsigned n)
821 {
822  return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
823 }
824 
844 static inline int __attribute__((always_inline))
845 rte_ring_enqueue_bulk(struct rte_ring *r, void * const *obj_table,
846  unsigned n)
847 {
848  if (r->prod.sp_enqueue)
849  return rte_ring_sp_enqueue_bulk(r, obj_table, n);
850  else
851  return rte_ring_mp_enqueue_bulk(r, obj_table, n);
852 }
853 
870 static inline int __attribute__((always_inline))
871 rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
872 {
873  return rte_ring_mp_enqueue_bulk(r, &obj, 1);
874 }
875 
889 static inline int __attribute__((always_inline))
890 rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
891 {
892  return rte_ring_sp_enqueue_bulk(r, &obj, 1);
893 }
894 
912 static inline int __attribute__((always_inline))
913 rte_ring_enqueue(struct rte_ring *r, void *obj)
914 {
915  if (r->prod.sp_enqueue)
916  return rte_ring_sp_enqueue(r, obj);
917  else
918  return rte_ring_mp_enqueue(r, obj);
919 }
920 
938 static inline int __attribute__((always_inline))
939 rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
940 {
941  return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
942 }
943 
959 static inline int __attribute__((always_inline))
960 rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
961 {
962  return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
963 }
964 
983 static inline int __attribute__((always_inline))
984 rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
985 {
986  if (r->cons.sc_dequeue)
987  return rte_ring_sc_dequeue_bulk(r, obj_table, n);
988  else
989  return rte_ring_mc_dequeue_bulk(r, obj_table, n);
990 }
991 
1007 static inline int __attribute__((always_inline))
1008 rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
1010  return rte_ring_mc_dequeue_bulk(r, obj_p, 1);
1011 }
1012 
1025 static inline int __attribute__((always_inline))
1026 rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
1028  return rte_ring_sc_dequeue_bulk(r, obj_p, 1);
1029 }
1030 
1047 static inline int __attribute__((always_inline))
1048 rte_ring_dequeue(struct rte_ring *r, void **obj_p)
1050  if (r->cons.sc_dequeue)
1051  return rte_ring_sc_dequeue(r, obj_p);
1052  else
1053  return rte_ring_mc_dequeue(r, obj_p);
1054 }
1055 
1065 static inline int
1066 rte_ring_full(const struct rte_ring *r)
1068  uint32_t prod_tail = r->prod.tail;
1069  uint32_t cons_tail = r->cons.tail;
1070  return ((cons_tail - prod_tail - 1) & r->prod.mask) == 0;
1071 }
1072 
1082 static inline int
1083 rte_ring_empty(const struct rte_ring *r)
1085  uint32_t prod_tail = r->prod.tail;
1086  uint32_t cons_tail = r->cons.tail;
1087  return !!(cons_tail == prod_tail);
1088 }
1089 
1098 static inline unsigned
1099 rte_ring_count(const struct rte_ring *r)
1101  uint32_t prod_tail = r->prod.tail;
1102  uint32_t cons_tail = r->cons.tail;
1103  return (prod_tail - cons_tail) & r->prod.mask;
1104 }
1105 
1114 static inline unsigned
1115 rte_ring_free_count(const struct rte_ring *r)
1117  uint32_t prod_tail = r->prod.tail;
1118  uint32_t cons_tail = r->cons.tail;
1119  return (cons_tail - prod_tail - 1) & r->prod.mask;
1120 }
1121 
1128 void rte_ring_list_dump(FILE *f);
1129 
1140 struct rte_ring *rte_ring_lookup(const char *name);
1141 
1157 static inline unsigned __attribute__((always_inline))
1158 rte_ring_mp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1159  unsigned n)
1160 {
1161  return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1162 }
1163 
1176 static inline unsigned __attribute__((always_inline))
1177 rte_ring_sp_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1178  unsigned n)
1179 {
1180  return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1181 }
1182 
1199 static inline unsigned __attribute__((always_inline))
1200 rte_ring_enqueue_burst(struct rte_ring *r, void * const *obj_table,
1201  unsigned n)
1202 {
1203  if (r->prod.sp_enqueue)
1204  return rte_ring_sp_enqueue_burst(r, obj_table, n);
1205  else
1206  return rte_ring_mp_enqueue_burst(r, obj_table, n);
1207 }
1208 
1226 static inline unsigned __attribute__((always_inline))
1227 rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1229  return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1230 }
1231 
1246 static inline unsigned __attribute__((always_inline))
1247 rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1249  return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1250 }
1251 
1268 static inline unsigned __attribute__((always_inline))
1269 rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
1271  if (r->cons.sc_dequeue)
1272  return rte_ring_sc_dequeue_burst(r, obj_table, n);
1273  else
1274  return rte_ring_mc_dequeue_burst(r, obj_table, n);
1275 }
1276 
1277 #ifdef __cplusplus
1278 }
1279 #endif
1280 
1281 #endif /* _RTE_RING_H_ */
static void rte_smp_rmb(void)
static int rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
const struct rte_memzone * memzone
Definition: rte_ring.h:161
uint32_t watermark
Definition: rte_ring.h:166
static int rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:1027
static int rte_ring_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:1049
int flags
Definition: rte_ring.h:160
static int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:798
volatile uint32_t tail
Definition: rte_ring.h:180
uint32_t size
Definition: rte_ring.h:177
int rte_ring_set_water_mark(struct rte_ring *r, unsigned count)
static int rte_ring_empty(const struct rte_ring *r)
Definition: rte_ring.h:1084
static int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:820
volatile uint32_t tail
Definition: rte_ring.h:171
void rte_ring_list_dump(FILE *f)
#define RTE_RING_QUOT_EXCEED
Definition: rte_ring.h:198
static int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:940
static int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:961
#define unlikely(x)
#define RTE_RING_PAUSE_REP_COUNT
Definition: rte_ring.h:136
static int rte_ring_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:846
volatile uint32_t head
Definition: rte_ring.h:179
void rte_ring_free(struct rte_ring *r)
static unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:1248
static unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:1178
static void rte_smp_wmb(void)
volatile uint32_t head
Definition: rte_ring.h:170
char name[RTE_MEMZONE_NAMESIZE]
Definition: rte_ring.h:159
static int rte_ring_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:914
uint32_t size
Definition: rte_ring.h:168
static unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:1228
static unsigned rte_ring_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:1201
static int rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:891
uint32_t mask
Definition: rte_ring.h:169
void rte_ring_dump(FILE *f, const struct rte_ring *r)
uint32_t sp_enqueue
Definition: rte_ring.h:167
static unsigned rte_ring_count(const struct rte_ring *r)
Definition: rte_ring.h:1100
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
static int rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
Definition: rte_ring.h:872
#define __rte_cache_aligned
Definition: rte_memory.h:96
static unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
Definition: rte_ring.h:1159
struct rte_ring * rte_ring_lookup(const char *name)
static unsigned rte_ring_free_count(const struct rte_ring *r)
Definition: rte_ring.h:1116
static int rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
Definition: rte_ring.h:1009
static unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:1270
uint32_t sc_dequeue
Definition: rte_ring.h:176
uint32_t mask
Definition: rte_ring.h:178
int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags)
static int rte_ring_full(const struct rte_ring *r)
Definition: rte_ring.h:1067
static int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
Definition: rte_ring.h:985
ssize_t rte_ring_get_memsize(unsigned count)
#define RTE_MEMZONE_NAMESIZE
Definition: rte_memzone.h:78