DPDK  20.11.10
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <errno.h>
37 #include <rte_common.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_debug.h>
41 #include <rte_atomic.h>
42 #include <rte_ring.h>
43 
44 extern int rte_rcu_log_type;
45 
46 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
47 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
48  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
49  "%s(): " fmt "\n", __func__, ## args)
50 #else
51 #define __RTE_RCU_DP_LOG(level, fmt, args...)
52 #endif
53 
54 #if defined(RTE_LIBRTE_RCU_DEBUG)
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
56  if (v->qsbr_cnt[thread_id].lock_cnt) \
57  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
58  "%s(): " fmt "\n", __func__, ## args); \
59 } while (0)
60 #else
61 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
62 #endif
63 
64 /* Registered thread IDs are stored as a bitmap of 64b element array.
65  * Given thread id needs to be converted to index into the array and
66  * the id within the array element.
67  */
68 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
69 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
70  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
71  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
72 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
73  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
74 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
75 #define __RTE_QSBR_THRID_MASK 0x3f
76 #define RTE_QSBR_THRID_INVALID 0xffffffff
77 
78 /* Worker thread counter */
79 struct rte_rcu_qsbr_cnt {
80  uint64_t cnt;
86  uint32_t lock_cnt;
89 
90 #define __RTE_QSBR_CNT_THR_OFFLINE 0
91 #define __RTE_QSBR_CNT_INIT 1
92 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
93 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
94 
95 /* RTE Quiescent State variable structure.
96  * This structure has two elements that vary in size based on the
97  * 'max_threads' parameter.
98  * 1) Quiescent state counter array
99  * 2) Register thread ID array
100  */
101 struct rte_rcu_qsbr {
102  uint64_t token __rte_cache_aligned;
104  uint64_t acked_token;
109  uint32_t num_elems __rte_cache_aligned;
111  uint32_t num_threads;
113  uint32_t max_threads;
116  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
123 
134 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
135 
136 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
137 
146 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
147 
152  const char *name;
154  uint32_t flags;
156  uint32_t size;
163  uint32_t esize;
185  void *p;
190  struct rte_rcu_qsbr *v;
192 };
193 
194 /* RTE defer queue structure.
195  * This structure holds the defer queue. The defer queue is used to
196  * hold the deleted entries from the data structure that are not
197  * yet freed.
198  */
199 struct rte_rcu_qsbr_dq;
200 
212 size_t
213 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
214 
230 int
231 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
232 
253 int
254 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
255 
271 int
272 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
273 
299 static __rte_always_inline void
300 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
301 {
302  uint64_t t;
303 
304  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
305 
306  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
307  v->qsbr_cnt[thread_id].lock_cnt);
308 
309  /* Copy the current value of token.
310  * The fence at the end of the function will ensure that
311  * the following will not move down after the load of any shared
312  * data structure.
313  */
314  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
315 
316  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
317  * 'cnt' (64b) is accessed atomically.
318  */
319  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
320  t, __ATOMIC_RELAXED);
321 
322  /* The subsequent load of the data structure should not
323  * move above the store. Hence a store-load barrier
324  * is required.
325  * If the load of the data structure moves above the store,
326  * writer might not see that the reader is online, even though
327  * the reader is referencing the shared data structure.
328  */
329 #ifdef RTE_ARCH_X86_64
330  /* rte_smp_mb() for x86 is lighter */
331  rte_smp_mb();
332 #else
333  __atomic_thread_fence(__ATOMIC_SEQ_CST);
334 #endif
335 }
336 
357 static __rte_always_inline void
358 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
359 {
360  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
361 
362  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
363  v->qsbr_cnt[thread_id].lock_cnt);
364 
365  /* The reader can go offline only after the load of the
366  * data structure is completed. i.e. any load of the
367  * data structure can not move after this store.
368  */
369 
370  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
371  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
372 }
373 
394 static __rte_always_inline void
395 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
396  __rte_unused unsigned int thread_id)
397 {
398  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
399 
400 #if defined(RTE_LIBRTE_RCU_DEBUG)
401  /* Increment the lock counter */
402  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
403  1, __ATOMIC_ACQUIRE);
404 #endif
405 }
406 
427 static __rte_always_inline void
428 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
429  __rte_unused unsigned int thread_id)
430 {
431  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
432 
433 #if defined(RTE_LIBRTE_RCU_DEBUG)
434  /* Decrement the lock counter */
435  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
436  1, __ATOMIC_RELEASE);
437 
438  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
439  "Lock counter %u. Nested locks?\n",
440  v->qsbr_cnt[thread_id].lock_cnt);
441 #endif
442 }
443 
457 static __rte_always_inline uint64_t
458 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
459 {
460  uint64_t t;
461 
462  RTE_ASSERT(v != NULL);
463 
464  /* Release the changes to the shared data structure.
465  * This store release will ensure that changes to any data
466  * structure are visible to the workers before the token
467  * update is visible.
468  */
469  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
470 
471  return t;
472 }
473 
486 static __rte_always_inline void
487 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
488 {
489  uint64_t t;
490 
491  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
492 
493  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
494  v->qsbr_cnt[thread_id].lock_cnt);
495 
496  /* Acquire the changes to the shared data structure released
497  * by rte_rcu_qsbr_start.
498  * Later loads of the shared data structure should not move
499  * above this load. Hence, use load-acquire.
500  */
501  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
502 
503  /* Check if there are updates available from the writer.
504  * Inform the writer that updates are visible to this reader.
505  * Prior loads of the shared data structure should not move
506  * beyond this store. Hence use store-release.
507  */
508  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
509  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
510  t, __ATOMIC_RELEASE);
511 
512  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
513  __func__, t, thread_id);
514 }
515 
516 /* Check the quiescent state counter for registered threads only, assuming
517  * that not all threads have registered.
518  */
519 static __rte_always_inline int
520 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
521 {
522  uint32_t i, j, id;
523  uint64_t bmap;
524  uint64_t c;
525  uint64_t *reg_thread_id;
526  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
527 
528  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
529  i < v->num_elems;
530  i++, reg_thread_id++) {
531  /* Load the current registered thread bit map before
532  * loading the reader thread quiescent state counters.
533  */
534  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
535  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
536 
537  while (bmap) {
538  j = __builtin_ctzl(bmap);
539  __RTE_RCU_DP_LOG(DEBUG,
540  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
541  __func__, t, wait, bmap, id + j);
542  c = __atomic_load_n(
543  &v->qsbr_cnt[id + j].cnt,
544  __ATOMIC_ACQUIRE);
545  __RTE_RCU_DP_LOG(DEBUG,
546  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
547  __func__, t, wait, c, id+j);
548 
549  /* Counter is not checked for wrap-around condition
550  * as it is a 64b counter.
551  */
552  if (unlikely(c !=
553  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
554  /* This thread is not in quiescent state */
555  if (!wait)
556  return 0;
557 
558  rte_pause();
559  /* This thread might have unregistered.
560  * Re-read the bitmap.
561  */
562  bmap = __atomic_load_n(reg_thread_id,
563  __ATOMIC_ACQUIRE);
564 
565  continue;
566  }
567 
568  /* This thread is in quiescent state. Use the counter
569  * to find the least acknowledged token among all the
570  * readers.
571  */
572  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
573  acked_token = c;
574 
575  bmap &= ~(1UL << j);
576  }
577  }
578 
579  /* All readers are checked, update least acknowledged token.
580  * There might be multiple writers trying to update this. There is
581  * no need to update this very accurately using compare-and-swap.
582  */
583  if (acked_token != __RTE_QSBR_CNT_MAX)
584  __atomic_store_n(&v->acked_token, acked_token,
585  __ATOMIC_RELAXED);
586 
587  return 1;
588 }
589 
590 /* Check the quiescent state counter for all threads, assuming that
591  * all the threads have registered.
592  */
593 static __rte_always_inline int
594 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
595 {
596  uint32_t i;
597  struct rte_rcu_qsbr_cnt *cnt;
598  uint64_t c;
599  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
600 
601  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
602  __RTE_RCU_DP_LOG(DEBUG,
603  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
604  __func__, t, wait, i);
605  while (1) {
606  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
607  __RTE_RCU_DP_LOG(DEBUG,
608  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
609  __func__, t, wait, c, i);
610 
611  /* Counter is not checked for wrap-around condition
612  * as it is a 64b counter.
613  */
614  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
615  break;
616 
617  /* This thread is not in quiescent state */
618  if (!wait)
619  return 0;
620 
621  rte_pause();
622  }
623 
624  /* This thread is in quiescent state. Use the counter to find
625  * the least acknowledged token among all the readers.
626  */
627  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
628  acked_token = c;
629  }
630 
631  /* All readers are checked, update least acknowledged token.
632  * There might be multiple writers trying to update this. There is
633  * no need to update this very accurately using compare-and-swap.
634  */
635  if (acked_token != __RTE_QSBR_CNT_MAX)
636  __atomic_store_n(&v->acked_token, acked_token,
637  __ATOMIC_RELAXED);
638 
639  return 1;
640 }
641 
673 static __rte_always_inline int
674 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
675 {
676  RTE_ASSERT(v != NULL);
677 
678  /* Check if all the readers have already acknowledged this token */
679  if (likely(t <= v->acked_token)) {
680  __RTE_RCU_DP_LOG(DEBUG,
681  "%s: check: token = %" PRIu64 ", wait = %d",
682  __func__, t, wait);
683  __RTE_RCU_DP_LOG(DEBUG,
684  "%s: status: least acked token = %" PRIu64,
685  __func__, v->acked_token);
686  return 1;
687  }
688 
689  if (likely(v->num_threads == v->max_threads))
690  return __rte_rcu_qsbr_check_all(v, t, wait);
691  else
692  return __rte_rcu_qsbr_check_selective(v, t, wait);
693 }
694 
713 void
714 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
715 
731 int
732 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
733 
750 __rte_experimental
751 struct rte_rcu_qsbr_dq *
753 
785 __rte_experimental
786 int
787 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
788 
814 __rte_experimental
815 int
816 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
817  unsigned int *freed, unsigned int *pending, unsigned int *available);
818 
840 __rte_experimental
841 int
842 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
843 
844 #ifdef __cplusplus
845 }
846 #endif
847 
848 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:231
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:121
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:674
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:487
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:190
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:358
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:428
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:134
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:183
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:405
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:300
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:395
static void rte_smp_mb(void)
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:458
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)