DPDK  20.05.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26 
27 #include <stdbool.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <inttypes.h>
31 #include <errno.h>
32 #include <rte_common.h>
33 #include <rte_memory.h>
34 #include <rte_lcore.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
37 #include <rte_ring.h>
38 
39 extern int rte_rcu_log_type;
40 
41 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
42 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
43  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
44  "%s(): " fmt "\n", __func__, ## args)
45 #else
46 #define __RTE_RCU_DP_LOG(level, fmt, args...)
47 #endif
48 
49 #if defined(RTE_LIBRTE_RCU_DEBUG)
50 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
51  if (v->qsbr_cnt[thread_id].lock_cnt) \
52  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
53  "%s(): " fmt "\n", __func__, ## args); \
54 } while (0)
55 #else
56 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
57 #endif
58 
59 /* Registered thread IDs are stored as a bitmap of 64b element array.
60  * Given thread id needs to be converted to index into the array and
61  * the id within the array element.
62  */
63 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
64 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
65  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
66  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
67 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
68  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
69 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
70 #define __RTE_QSBR_THRID_MASK 0x3f
71 #define RTE_QSBR_THRID_INVALID 0xffffffff
72 
73 /* Worker thread counter */
74 struct rte_rcu_qsbr_cnt {
75  uint64_t cnt;
81  uint32_t lock_cnt;
84 
85 #define __RTE_QSBR_CNT_THR_OFFLINE 0
86 #define __RTE_QSBR_CNT_INIT 1
87 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
88 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
89 
90 /* RTE Quiescent State variable structure.
91  * This structure has two elements that vary in size based on the
92  * 'max_threads' parameter.
93  * 1) Quiescent state counter array
94  * 2) Register thread ID array
95  */
96 struct rte_rcu_qsbr {
97  uint64_t token __rte_cache_aligned;
99  uint64_t acked_token;
104  uint32_t num_elems __rte_cache_aligned;
106  uint32_t num_threads;
108  uint32_t max_threads;
111  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
118 
132 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
133 
134 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
135 
144 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
145 
150  const char *name;
152  uint32_t flags;
154  uint32_t size;
161  uint32_t esize;
183  void *p;
188  struct rte_rcu_qsbr *v;
190 };
191 
192 /* RTE defer queue structure.
193  * This structure holds the defer queue. The defer queue is used to
194  * hold the deleted entries from the data structure that are not
195  * yet freed.
196  */
197 struct rte_rcu_qsbr_dq;
198 
213 __rte_experimental
214 size_t
215 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
216 
235 __rte_experimental
236 int
237 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
238 
262 __rte_experimental
263 int
264 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
265 
284 __rte_experimental
285 int
286 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
287 
316 __rte_experimental
317 static __rte_always_inline void
318 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
319 {
320  uint64_t t;
321 
322  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
323 
324  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
325  v->qsbr_cnt[thread_id].lock_cnt);
326 
327  /* Copy the current value of token.
328  * The fence at the end of the function will ensure that
329  * the following will not move down after the load of any shared
330  * data structure.
331  */
332  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
333 
334  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
335  * 'cnt' (64b) is accessed atomically.
336  */
337  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
338  t, __ATOMIC_RELAXED);
339 
340  /* The subsequent load of the data structure should not
341  * move above the store. Hence a store-load barrier
342  * is required.
343  * If the load of the data structure moves above the store,
344  * writer might not see that the reader is online, even though
345  * the reader is referencing the shared data structure.
346  */
347 #ifdef RTE_ARCH_X86_64
348  /* rte_smp_mb() for x86 is lighter */
349  rte_smp_mb();
350 #else
351  __atomic_thread_fence(__ATOMIC_SEQ_CST);
352 #endif
353 }
354 
378 __rte_experimental
379 static __rte_always_inline void
380 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
381 {
382  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
383 
384  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
385  v->qsbr_cnt[thread_id].lock_cnt);
386 
387  /* The reader can go offline only after the load of the
388  * data structure is completed. i.e. any load of the
389  * data strcture can not move after this store.
390  */
391 
392  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
393  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
394 }
395 
419 __rte_experimental
420 static __rte_always_inline void
421 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
422  __rte_unused unsigned int thread_id)
423 {
424  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
425 
426 #if defined(RTE_LIBRTE_RCU_DEBUG)
427  /* Increment the lock counter */
428  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
429  1, __ATOMIC_ACQUIRE);
430 #endif
431 }
432 
456 __rte_experimental
457 static __rte_always_inline void
458 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
459  __rte_unused unsigned int thread_id)
460 {
461  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
462 
463 #if defined(RTE_LIBRTE_RCU_DEBUG)
464  /* Decrement the lock counter */
465  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
466  1, __ATOMIC_RELEASE);
467 
468  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
469  "Lock counter %u. Nested locks?\n",
470  v->qsbr_cnt[thread_id].lock_cnt);
471 #endif
472 }
473 
490 __rte_experimental
491 static __rte_always_inline uint64_t
492 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
493 {
494  uint64_t t;
495 
496  RTE_ASSERT(v != NULL);
497 
498  /* Release the changes to the shared data structure.
499  * This store release will ensure that changes to any data
500  * structure are visible to the workers before the token
501  * update is visible.
502  */
503  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
504 
505  return t;
506 }
507 
523 __rte_experimental
524 static __rte_always_inline void
525 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
526 {
527  uint64_t t;
528 
529  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
530 
531  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
532  v->qsbr_cnt[thread_id].lock_cnt);
533 
534  /* Acquire the changes to the shared data structure released
535  * by rte_rcu_qsbr_start.
536  * Later loads of the shared data structure should not move
537  * above this load. Hence, use load-acquire.
538  */
539  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
540 
541  /* Check if there are updates available from the writer.
542  * Inform the writer that updates are visible to this reader.
543  * Prior loads of the shared data structure should not move
544  * beyond this store. Hence use store-release.
545  */
546  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
547  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
548  t, __ATOMIC_RELEASE);
549 
550  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
551  __func__, t, thread_id);
552 }
553 
554 /* Check the quiescent state counter for registered threads only, assuming
555  * that not all threads have registered.
556  */
557 static __rte_always_inline int
558 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
559 {
560  uint32_t i, j, id;
561  uint64_t bmap;
562  uint64_t c;
563  uint64_t *reg_thread_id;
564  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
565 
566  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
567  i < v->num_elems;
568  i++, reg_thread_id++) {
569  /* Load the current registered thread bit map before
570  * loading the reader thread quiescent state counters.
571  */
572  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
573  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
574 
575  while (bmap) {
576  j = __builtin_ctzl(bmap);
577  __RTE_RCU_DP_LOG(DEBUG,
578  "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
579  __func__, t, wait, bmap, id + j);
580  c = __atomic_load_n(
581  &v->qsbr_cnt[id + j].cnt,
582  __ATOMIC_ACQUIRE);
583  __RTE_RCU_DP_LOG(DEBUG,
584  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
585  __func__, t, wait, c, id+j);
586 
587  /* Counter is not checked for wrap-around condition
588  * as it is a 64b counter.
589  */
590  if (unlikely(c !=
591  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
592  /* This thread is not in quiescent state */
593  if (!wait)
594  return 0;
595 
596  rte_pause();
597  /* This thread might have unregistered.
598  * Re-read the bitmap.
599  */
600  bmap = __atomic_load_n(reg_thread_id,
601  __ATOMIC_ACQUIRE);
602 
603  continue;
604  }
605 
606  /* This thread is in quiescent state. Use the counter
607  * to find the least acknowledged token among all the
608  * readers.
609  */
610  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
611  acked_token = c;
612 
613  bmap &= ~(1UL << j);
614  }
615  }
616 
617  /* All readers are checked, update least acknowledged token.
618  * There might be multiple writers trying to update this. There is
619  * no need to update this very accurately using compare-and-swap.
620  */
621  if (acked_token != __RTE_QSBR_CNT_MAX)
622  __atomic_store_n(&v->acked_token, acked_token,
623  __ATOMIC_RELAXED);
624 
625  return 1;
626 }
627 
628 /* Check the quiescent state counter for all threads, assuming that
629  * all the threads have registered.
630  */
631 static __rte_always_inline int
632 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
633 {
634  uint32_t i;
635  struct rte_rcu_qsbr_cnt *cnt;
636  uint64_t c;
637  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
638 
639  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
640  __RTE_RCU_DP_LOG(DEBUG,
641  "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
642  __func__, t, wait, i);
643  while (1) {
644  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
645  __RTE_RCU_DP_LOG(DEBUG,
646  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
647  __func__, t, wait, c, i);
648 
649  /* Counter is not checked for wrap-around condition
650  * as it is a 64b counter.
651  */
652  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
653  break;
654 
655  /* This thread is not in quiescent state */
656  if (!wait)
657  return 0;
658 
659  rte_pause();
660  }
661 
662  /* This thread is in quiescent state. Use the counter to find
663  * the least acknowledged token among all the readers.
664  */
665  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
666  acked_token = c;
667  }
668 
669  /* All readers are checked, update least acknowledged token.
670  * There might be multiple writers trying to update this. There is
671  * no need to update this very accurately using compare-and-swap.
672  */
673  if (acked_token != __RTE_QSBR_CNT_MAX)
674  __atomic_store_n(&v->acked_token, acked_token,
675  __ATOMIC_RELAXED);
676 
677  return 1;
678 }
679 
714 __rte_experimental
715 static __rte_always_inline int
716 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
717 {
718  RTE_ASSERT(v != NULL);
719 
720  /* Check if all the readers have already acknowledged this token */
721  if (likely(t <= v->acked_token)) {
722  __RTE_RCU_DP_LOG(DEBUG,
723  "%s: check: token = %"PRIu64", wait = %d",
724  __func__, t, wait);
725  __RTE_RCU_DP_LOG(DEBUG,
726  "%s: status: least acked token = %"PRIu64"",
727  __func__, v->acked_token);
728  return 1;
729  }
730 
731  if (likely(v->num_threads == v->max_threads))
732  return __rte_rcu_qsbr_check_all(v, t, wait);
733  else
734  return __rte_rcu_qsbr_check_selective(v, t, wait);
735 }
736 
758 __rte_experimental
759 void
760 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
761 
780 __rte_experimental
781 int
782 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
783 
800 __rte_experimental
801 struct rte_rcu_qsbr_dq *
803 
835 __rte_experimental
836 int
837 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
838 
864 __rte_experimental
865 int
866 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
867  unsigned int *freed, unsigned int *pending, unsigned int *available);
868 
890 __rte_experimental
891 int
892 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
893 
894 #ifdef __cplusplus
895 }
896 #endif
897 
898 #endif /* _RTE_RCU_QSBR_H_ */
static __rte_experimental __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:716
__rte_experimental int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:193
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:318
#define __rte_unused
Definition: rte_common.h:104
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:188
__rte_experimental int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:525
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:380
#define unlikely(x)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:132
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:181
static void rte_pause(void)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:458
__rte_experimental int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:492
__rte_experimental void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:421
#define __rte_cache_aligned
Definition: rte_common.h:367
static void rte_smp_mb(void)
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
__rte_experimental size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
__rte_experimental int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)