96 #include <sys/queue.h>
105 #define RTE_TAILQ_RING_NAME "RTE_RING"
107 enum rte_ring_queue_behavior {
108 RTE_RING_QUEUE_FIXED = 0,
109 RTE_RING_QUEUE_VARIABLE
112 #ifdef RTE_LIBRTE_RING_DEBUG
116 struct rte_ring_debug_stats {
117 uint64_t enq_success_bulk;
118 uint64_t enq_success_objs;
119 uint64_t enq_quota_bulk;
120 uint64_t enq_quota_objs;
121 uint64_t enq_fail_bulk;
122 uint64_t enq_fail_objs;
123 uint64_t deq_success_bulk;
124 uint64_t deq_success_objs;
125 uint64_t deq_fail_bulk;
126 uint64_t deq_fail_objs;
130 #define RTE_RING_MZ_PREFIX "RG_"
132 #define RTE_RING_NAMESIZE (RTE_MEMZONE_NAMESIZE - \
133 sizeof(RTE_RING_MZ_PREFIX) + 1)
135 #ifndef RTE_RING_PAUSE_REP_COUNT
136 #define RTE_RING_PAUSE_REP_COUNT 0
180 #ifdef RTE_RING_SPLIT_PROD_CONS
181 }
cons __rte_cache_aligned;
186 #ifdef RTE_LIBRTE_RING_DEBUG
187 struct rte_ring_debug_stats stats[RTE_MAX_LCORE];
195 #define RING_F_SP_ENQ 0x0001
196 #define RING_F_SC_DEQ 0x0002
197 #define RTE_RING_QUOT_EXCEED (1 << 31)
198 #define RTE_RING_SZ_MASK (unsigned)(0x0fffffff)
209 #ifdef RTE_LIBRTE_RING_DEBUG
210 #define __RING_STAT_ADD(r, name, n) do { \
211 unsigned __lcore_id = rte_lcore_id(); \
212 if (__lcore_id < RTE_MAX_LCORE) { \
213 r->stats[__lcore_id].name##_objs += n; \
214 r->stats[__lcore_id].name##_bulk += 1; \
218 #define __RING_STAT_ADD(r, name, n) do {} while(0)
314 int socket_id,
unsigned flags);
356 #define ENQUEUE_PTRS() do { \
357 const uint32_t size = r->prod.size; \
358 uint32_t idx = prod_head & mask; \
359 if (likely(idx + n < size)) { \
360 for (i = 0; i < (n & ((~(unsigned)0x3))); i+=4, idx+=4) { \
361 r->ring[idx] = obj_table[i]; \
362 r->ring[idx+1] = obj_table[i+1]; \
363 r->ring[idx+2] = obj_table[i+2]; \
364 r->ring[idx+3] = obj_table[i+3]; \
367 case 3: r->ring[idx++] = obj_table[i++]; \
368 case 2: r->ring[idx++] = obj_table[i++]; \
369 case 1: r->ring[idx++] = obj_table[i++]; \
372 for (i = 0; idx < size; i++, idx++)\
373 r->ring[idx] = obj_table[i]; \
374 for (idx = 0; i < n; i++, idx++) \
375 r->ring[idx] = obj_table[i]; \
382 #define DEQUEUE_PTRS() do { \
383 uint32_t idx = cons_head & mask; \
384 const uint32_t size = r->cons.size; \
385 if (likely(idx + n < size)) { \
386 for (i = 0; i < (n & (~(unsigned)0x3)); i+=4, idx+=4) {\
387 obj_table[i] = r->ring[idx]; \
388 obj_table[i+1] = r->ring[idx+1]; \
389 obj_table[i+2] = r->ring[idx+2]; \
390 obj_table[i+3] = r->ring[idx+3]; \
393 case 3: obj_table[i++] = r->ring[idx++]; \
394 case 2: obj_table[i++] = r->ring[idx++]; \
395 case 1: obj_table[i++] = r->ring[idx++]; \
398 for (i = 0; idx < size; i++, idx++) \
399 obj_table[i] = r->ring[idx]; \
400 for (idx = 0; i < n; i++, idx++) \
401 obj_table[i] = r->ring[idx]; \
430 static inline int __attribute__((always_inline))
431 __rte_ring_mp_do_enqueue(struct
rte_ring *r,
void * const *obj_table,
432 unsigned n, enum rte_ring_queue_behavior behavior)
434 uint32_t prod_head, prod_next;
435 uint32_t cons_tail, free_entries;
436 const unsigned max = n;
439 uint32_t mask = r->prod.mask;
452 prod_head = r->prod.head;
459 cons_tail = r->cons.tail;
464 free_entries = (mask + cons_tail - prod_head);
468 if (behavior == RTE_RING_QUEUE_FIXED) {
469 __RING_STAT_ADD(r, enq_fail, n);
475 __RING_STAT_ADD(r, enq_fail, n);
483 prod_next = prod_head + n;
493 if (
unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
494 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
496 __RING_STAT_ADD(r, enq_quota, n);
499 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
500 __RING_STAT_ADD(r, enq_success, n);
507 while (
unlikely(r->prod.tail != prod_head)) {
519 r->prod.tail = prod_next;
545 static inline int __attribute__((always_inline))
546 __rte_ring_sp_do_enqueue(struct
rte_ring *r,
void * const *obj_table,
547 unsigned n, enum rte_ring_queue_behavior behavior)
549 uint32_t prod_head, cons_tail;
550 uint32_t prod_next, free_entries;
552 uint32_t mask = r->prod.mask;
555 prod_head = r->prod.head;
556 cons_tail = r->cons.tail;
561 free_entries = mask + cons_tail - prod_head;
565 if (behavior == RTE_RING_QUEUE_FIXED) {
566 __RING_STAT_ADD(r, enq_fail, n);
572 __RING_STAT_ADD(r, enq_fail, n);
580 prod_next = prod_head + n;
581 r->prod.head = prod_next;
588 if (
unlikely(((mask + 1) - free_entries + n) > r->prod.watermark)) {
589 ret = (behavior == RTE_RING_QUEUE_FIXED) ? -EDQUOT :
591 __RING_STAT_ADD(r, enq_quota, n);
594 ret = (behavior == RTE_RING_QUEUE_FIXED) ? 0 : n;
595 __RING_STAT_ADD(r, enq_success, n);
598 r->prod.tail = prod_next;
629 static inline int __attribute__((always_inline))
630 __rte_ring_mc_do_dequeue(struct
rte_ring *r,
void **obj_table,
631 unsigned n, enum rte_ring_queue_behavior behavior)
633 uint32_t cons_head, prod_tail;
634 uint32_t cons_next, entries;
635 const unsigned max = n;
638 uint32_t mask = r->prod.mask;
650 cons_head = r->cons.head;
657 prod_tail = r->prod.tail;
662 entries = (prod_tail - cons_head);
666 if (behavior == RTE_RING_QUEUE_FIXED) {
667 __RING_STAT_ADD(r, deq_fail, n);
672 __RING_STAT_ADD(r, deq_fail, n);
680 cons_next = cons_head + n;
693 while (
unlikely(r->cons.tail != cons_head)) {
705 __RING_STAT_ADD(r, deq_success, n);
706 r->cons.tail = cons_next;
708 return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
734 static inline int __attribute__((always_inline))
735 __rte_ring_sc_do_dequeue(struct
rte_ring *r,
void **obj_table,
736 unsigned n, enum rte_ring_queue_behavior behavior)
738 uint32_t cons_head, prod_tail;
739 uint32_t cons_next, entries;
741 uint32_t mask = r->prod.mask;
743 cons_head = r->cons.head;
744 prod_tail = r->prod.tail;
749 entries = prod_tail - cons_head;
752 if (behavior == RTE_RING_QUEUE_FIXED) {
753 __RING_STAT_ADD(r, deq_fail, n);
758 __RING_STAT_ADD(r, deq_fail, n);
766 cons_next = cons_head + n;
767 r->cons.head = cons_next;
773 __RING_STAT_ADD(r, deq_success, n);
774 r->cons.tail = cons_next;
775 return behavior == RTE_RING_QUEUE_FIXED ? 0 : n;
796 static inline int __attribute__((always_inline))
800 return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
818 static inline int __attribute__((always_inline))
822 return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
844 static inline int __attribute__((always_inline))
848 if (r->prod.sp_enqueue)
870 static inline int __attribute__((always_inline))
889 static inline int __attribute__((always_inline))
912 static inline int __attribute__((always_inline))
915 if (r->prod.sp_enqueue)
938 static inline int __attribute__((always_inline))
941 return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
959 static inline int __attribute__((always_inline))
962 return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_FIXED);
983 static inline int __attribute__((always_inline))
986 if (r->cons.sc_dequeue)
1007 static inline int __attribute__((always_inline))
1025 static inline int __attribute__((always_inline))
1047 static inline int __attribute__((always_inline))
1050 if (r->cons.sc_dequeue)
1069 uint32_t cons_tail = r->cons.
tail;
1070 return ((cons_tail - prod_tail - 1) & r->
prod.
mask) == 0;
1086 uint32_t cons_tail = r->cons.
tail;
1087 return !!(cons_tail == prod_tail);
1098 static inline unsigned
1102 uint32_t cons_tail = r->cons.
tail;
1103 return (prod_tail - cons_tail) & r->
prod.
mask;
1114 static inline unsigned
1118 uint32_t cons_tail = r->cons.
tail;
1119 return (cons_tail - prod_tail - 1) & r->
prod.
mask;
1157 static inline unsigned __attribute__((always_inline))
1161 return __rte_ring_mp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1176 static inline unsigned __attribute__((always_inline))
1180 return __rte_ring_sp_do_enqueue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1199 static inline unsigned __attribute__((always_inline))
1203 if (r->prod.sp_enqueue)
1226 static inline unsigned __attribute__((always_inline))
1229 return __rte_ring_mc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1246 static inline unsigned __attribute__((always_inline))
1249 return __rte_ring_sc_do_dequeue(r, obj_table, n, RTE_RING_QUEUE_VARIABLE);
1268 static inline unsigned __attribute__((always_inline))
1271 if (r->cons.sc_dequeue)
static void rte_smp_rmb(void)
static int rte_atomic32_cmpset(volatile uint32_t *dst, uint32_t exp, uint32_t src)
const struct rte_memzone * memzone
static int rte_ring_sc_dequeue(struct rte_ring *r, void **obj_p)
static int rte_ring_dequeue(struct rte_ring *r, void **obj_p)
static int rte_ring_mp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
int rte_ring_set_water_mark(struct rte_ring *r, unsigned count)
static int rte_ring_empty(const struct rte_ring *r)
static int rte_ring_sp_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
void rte_ring_list_dump(FILE *f)
#define RTE_RING_QUOT_EXCEED
static int rte_ring_mc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
static int rte_ring_sc_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
#define RTE_RING_PAUSE_REP_COUNT
static int rte_ring_enqueue_bulk(struct rte_ring *r, void *const *obj_table, unsigned n)
void rte_ring_free(struct rte_ring *r)
static unsigned rte_ring_sc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
static unsigned rte_ring_sp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
static void rte_smp_wmb(void)
char name[RTE_MEMZONE_NAMESIZE]
static int rte_ring_enqueue(struct rte_ring *r, void *obj)
static unsigned rte_ring_mc_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
static unsigned rte_ring_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
static int rte_ring_sp_enqueue(struct rte_ring *r, void *obj)
void rte_ring_dump(FILE *f, const struct rte_ring *r)
static unsigned rte_ring_count(const struct rte_ring *r)
struct rte_ring * rte_ring_create(const char *name, unsigned count, int socket_id, unsigned flags)
static int rte_ring_mp_enqueue(struct rte_ring *r, void *obj)
#define __rte_cache_aligned
static unsigned rte_ring_mp_enqueue_burst(struct rte_ring *r, void *const *obj_table, unsigned n)
struct rte_ring * rte_ring_lookup(const char *name)
static unsigned rte_ring_free_count(const struct rte_ring *r)
static int rte_ring_mc_dequeue(struct rte_ring *r, void **obj_p)
static unsigned rte_ring_dequeue_burst(struct rte_ring *r, void **obj_table, unsigned n)
int rte_ring_init(struct rte_ring *r, const char *name, unsigned count, unsigned flags)
static int rte_ring_full(const struct rte_ring *r)
static int rte_ring_dequeue_bulk(struct rte_ring *r, void **obj_table, unsigned n)
ssize_t rte_ring_get_memsize(unsigned count)
#define RTE_MEMZONE_NAMESIZE