DPDK  20.08.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <errno.h>
37 #include <rte_common.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_debug.h>
41 #include <rte_atomic.h>
42 #include <rte_ring.h>
43 
44 extern int rte_rcu_log_type;
45 
46 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
47 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
48  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
49  "%s(): " fmt "\n", __func__, ## args)
50 #else
51 #define __RTE_RCU_DP_LOG(level, fmt, args...)
52 #endif
53 
54 #if defined(RTE_LIBRTE_RCU_DEBUG)
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
56  if (v->qsbr_cnt[thread_id].lock_cnt) \
57  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
58  "%s(): " fmt "\n", __func__, ## args); \
59 } while (0)
60 #else
61 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
62 #endif
63 
64 /* Registered thread IDs are stored as a bitmap of 64b element array.
65  * Given thread id needs to be converted to index into the array and
66  * the id within the array element.
67  */
68 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
69 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
70  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
71  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
72 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
73  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
74 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
75 #define __RTE_QSBR_THRID_MASK 0x3f
76 #define RTE_QSBR_THRID_INVALID 0xffffffff
77 
78 /* Worker thread counter */
79 struct rte_rcu_qsbr_cnt {
80  uint64_t cnt;
86  uint32_t lock_cnt;
89 
90 #define __RTE_QSBR_CNT_THR_OFFLINE 0
91 #define __RTE_QSBR_CNT_INIT 1
92 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
93 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
94 
95 /* RTE Quiescent State variable structure.
96  * This structure has two elements that vary in size based on the
97  * 'max_threads' parameter.
98  * 1) Quiescent state counter array
99  * 2) Register thread ID array
100  */
101 struct rte_rcu_qsbr {
102  uint64_t token __rte_cache_aligned;
104  uint64_t acked_token;
109  uint32_t num_elems __rte_cache_aligned;
111  uint32_t num_threads;
113  uint32_t max_threads;
116  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
123 
137 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
138 
139 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
140 
149 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
150 
155  const char *name;
157  uint32_t flags;
159  uint32_t size;
166  uint32_t esize;
188  void *p;
193  struct rte_rcu_qsbr *v;
195 };
196 
197 /* RTE defer queue structure.
198  * This structure holds the defer queue. The defer queue is used to
199  * hold the deleted entries from the data structure that are not
200  * yet freed.
201  */
202 struct rte_rcu_qsbr_dq;
203 
218 __rte_experimental
219 size_t
220 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
221 
240 __rte_experimental
241 int
242 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
243 
267 __rte_experimental
268 int
269 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
270 
289 __rte_experimental
290 int
291 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
292 
321 __rte_experimental
322 static __rte_always_inline void
323 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
324 {
325  uint64_t t;
326 
327  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
328 
329  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
330  v->qsbr_cnt[thread_id].lock_cnt);
331 
332  /* Copy the current value of token.
333  * The fence at the end of the function will ensure that
334  * the following will not move down after the load of any shared
335  * data structure.
336  */
337  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
338 
339  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
340  * 'cnt' (64b) is accessed atomically.
341  */
342  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
343  t, __ATOMIC_RELAXED);
344 
345  /* The subsequent load of the data structure should not
346  * move above the store. Hence a store-load barrier
347  * is required.
348  * If the load of the data structure moves above the store,
349  * writer might not see that the reader is online, even though
350  * the reader is referencing the shared data structure.
351  */
352 #ifdef RTE_ARCH_X86_64
353  /* rte_smp_mb() for x86 is lighter */
354  rte_smp_mb();
355 #else
356  __atomic_thread_fence(__ATOMIC_SEQ_CST);
357 #endif
358 }
359 
383 __rte_experimental
384 static __rte_always_inline void
385 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
386 {
387  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
388 
389  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
390  v->qsbr_cnt[thread_id].lock_cnt);
391 
392  /* The reader can go offline only after the load of the
393  * data structure is completed. i.e. any load of the
394  * data strcture can not move after this store.
395  */
396 
397  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
398  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
399 }
400 
424 __rte_experimental
425 static __rte_always_inline void
426 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
427  __rte_unused unsigned int thread_id)
428 {
429  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
430 
431 #if defined(RTE_LIBRTE_RCU_DEBUG)
432  /* Increment the lock counter */
433  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
434  1, __ATOMIC_ACQUIRE);
435 #endif
436 }
437 
461 __rte_experimental
462 static __rte_always_inline void
463 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
464  __rte_unused unsigned int thread_id)
465 {
466  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
467 
468 #if defined(RTE_LIBRTE_RCU_DEBUG)
469  /* Decrement the lock counter */
470  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
471  1, __ATOMIC_RELEASE);
472 
473  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
474  "Lock counter %u. Nested locks?\n",
475  v->qsbr_cnt[thread_id].lock_cnt);
476 #endif
477 }
478 
495 __rte_experimental
496 static __rte_always_inline uint64_t
497 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
498 {
499  uint64_t t;
500 
501  RTE_ASSERT(v != NULL);
502 
503  /* Release the changes to the shared data structure.
504  * This store release will ensure that changes to any data
505  * structure are visible to the workers before the token
506  * update is visible.
507  */
508  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
509 
510  return t;
511 }
512 
528 __rte_experimental
529 static __rte_always_inline void
530 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
531 {
532  uint64_t t;
533 
534  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
535 
536  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
537  v->qsbr_cnt[thread_id].lock_cnt);
538 
539  /* Acquire the changes to the shared data structure released
540  * by rte_rcu_qsbr_start.
541  * Later loads of the shared data structure should not move
542  * above this load. Hence, use load-acquire.
543  */
544  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
545 
546  /* Check if there are updates available from the writer.
547  * Inform the writer that updates are visible to this reader.
548  * Prior loads of the shared data structure should not move
549  * beyond this store. Hence use store-release.
550  */
551  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
552  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
553  t, __ATOMIC_RELEASE);
554 
555  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
556  __func__, t, thread_id);
557 }
558 
559 /* Check the quiescent state counter for registered threads only, assuming
560  * that not all threads have registered.
561  */
562 static __rte_always_inline int
563 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
564 {
565  uint32_t i, j, id;
566  uint64_t bmap;
567  uint64_t c;
568  uint64_t *reg_thread_id;
569  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
570 
571  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
572  i < v->num_elems;
573  i++, reg_thread_id++) {
574  /* Load the current registered thread bit map before
575  * loading the reader thread quiescent state counters.
576  */
577  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
578  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
579 
580  while (bmap) {
581  j = __builtin_ctzl(bmap);
582  __RTE_RCU_DP_LOG(DEBUG,
583  "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
584  __func__, t, wait, bmap, id + j);
585  c = __atomic_load_n(
586  &v->qsbr_cnt[id + j].cnt,
587  __ATOMIC_ACQUIRE);
588  __RTE_RCU_DP_LOG(DEBUG,
589  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
590  __func__, t, wait, c, id+j);
591 
592  /* Counter is not checked for wrap-around condition
593  * as it is a 64b counter.
594  */
595  if (unlikely(c !=
596  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
597  /* This thread is not in quiescent state */
598  if (!wait)
599  return 0;
600 
601  rte_pause();
602  /* This thread might have unregistered.
603  * Re-read the bitmap.
604  */
605  bmap = __atomic_load_n(reg_thread_id,
606  __ATOMIC_ACQUIRE);
607 
608  continue;
609  }
610 
611  /* This thread is in quiescent state. Use the counter
612  * to find the least acknowledged token among all the
613  * readers.
614  */
615  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
616  acked_token = c;
617 
618  bmap &= ~(1UL << j);
619  }
620  }
621 
622  /* All readers are checked, update least acknowledged token.
623  * There might be multiple writers trying to update this. There is
624  * no need to update this very accurately using compare-and-swap.
625  */
626  if (acked_token != __RTE_QSBR_CNT_MAX)
627  __atomic_store_n(&v->acked_token, acked_token,
628  __ATOMIC_RELAXED);
629 
630  return 1;
631 }
632 
633 /* Check the quiescent state counter for all threads, assuming that
634  * all the threads have registered.
635  */
636 static __rte_always_inline int
637 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
638 {
639  uint32_t i;
640  struct rte_rcu_qsbr_cnt *cnt;
641  uint64_t c;
642  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
643 
644  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
645  __RTE_RCU_DP_LOG(DEBUG,
646  "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
647  __func__, t, wait, i);
648  while (1) {
649  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
650  __RTE_RCU_DP_LOG(DEBUG,
651  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
652  __func__, t, wait, c, i);
653 
654  /* Counter is not checked for wrap-around condition
655  * as it is a 64b counter.
656  */
657  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
658  break;
659 
660  /* This thread is not in quiescent state */
661  if (!wait)
662  return 0;
663 
664  rte_pause();
665  }
666 
667  /* This thread is in quiescent state. Use the counter to find
668  * the least acknowledged token among all the readers.
669  */
670  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
671  acked_token = c;
672  }
673 
674  /* All readers are checked, update least acknowledged token.
675  * There might be multiple writers trying to update this. There is
676  * no need to update this very accurately using compare-and-swap.
677  */
678  if (acked_token != __RTE_QSBR_CNT_MAX)
679  __atomic_store_n(&v->acked_token, acked_token,
680  __ATOMIC_RELAXED);
681 
682  return 1;
683 }
684 
719 __rte_experimental
720 static __rte_always_inline int
721 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
722 {
723  RTE_ASSERT(v != NULL);
724 
725  /* Check if all the readers have already acknowledged this token */
726  if (likely(t <= v->acked_token)) {
727  __RTE_RCU_DP_LOG(DEBUG,
728  "%s: check: token = %"PRIu64", wait = %d",
729  __func__, t, wait);
730  __RTE_RCU_DP_LOG(DEBUG,
731  "%s: status: least acked token = %"PRIu64"",
732  __func__, v->acked_token);
733  return 1;
734  }
735 
736  if (likely(v->num_threads == v->max_threads))
737  return __rte_rcu_qsbr_check_all(v, t, wait);
738  else
739  return __rte_rcu_qsbr_check_selective(v, t, wait);
740 }
741 
763 __rte_experimental
764 void
765 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
766 
785 __rte_experimental
786 int
787 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
788 
805 __rte_experimental
806 struct rte_rcu_qsbr_dq *
808 
840 __rte_experimental
841 int
842 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
843 
869 __rte_experimental
870 int
871 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
872  unsigned int *freed, unsigned int *pending, unsigned int *available);
873 
895 __rte_experimental
896 int
897 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
898 
899 #ifdef __cplusplus
900 }
901 #endif
902 
903 #endif /* _RTE_RCU_QSBR_H_ */
static __rte_experimental __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:721
__rte_experimental int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:202
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:323
#define __rte_unused
Definition: rte_common.h:104
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:193
__rte_experimental int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:530
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:385
#define unlikely(x)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:137
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:186
static void rte_pause(void)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:463
__rte_experimental int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:497
__rte_experimental void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_cache_aligned
Definition: rte_common.h:376
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:426
static void rte_smp_mb(void)
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
__rte_experimental size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
__rte_experimental int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)