DPDK  21.08.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <errno.h>
37 #include <rte_common.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_debug.h>
41 #include <rte_atomic.h>
42 #include <rte_ring.h>
43 
44 extern int rte_rcu_log_type;
45 
46 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
47 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
48  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
49  "%s(): " fmt "\n", __func__, ## args)
50 #else
51 #define __RTE_RCU_DP_LOG(level, fmt, args...)
52 #endif
53 
54 #if defined(RTE_LIBRTE_RCU_DEBUG)
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
56  if (v->qsbr_cnt[thread_id].lock_cnt) \
57  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
58  "%s(): " fmt "\n", __func__, ## args); \
59 } while (0)
60 #else
61 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
62 #endif
63 
64 /* Registered thread IDs are stored as a bitmap of 64b element array.
65  * Given thread id needs to be converted to index into the array and
66  * the id within the array element.
67  */
68 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
69 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
70  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
71  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
72 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
73  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
74 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
75 #define __RTE_QSBR_THRID_MASK 0x3f
76 #define RTE_QSBR_THRID_INVALID 0xffffffff
77 
78 /* Worker thread counter */
79 struct rte_rcu_qsbr_cnt {
80  uint64_t cnt;
86  uint32_t lock_cnt;
89 
90 #define __RTE_QSBR_CNT_THR_OFFLINE 0
91 #define __RTE_QSBR_CNT_INIT 1
92 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
93 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
94 
95 /* RTE Quiescent State variable structure.
96  * This structure has two elements that vary in size based on the
97  * 'max_threads' parameter.
98  * 1) Quiescent state counter array
99  * 2) Register thread ID array
100  */
101 struct rte_rcu_qsbr {
102  uint64_t token __rte_cache_aligned;
104  uint64_t acked_token;
109  uint32_t num_elems __rte_cache_aligned;
111  uint32_t num_threads;
113  uint32_t max_threads;
116  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
123 
137 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
138 
139 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
140 
149 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
150 
155  const char *name;
157  uint32_t flags;
159  uint32_t size;
166  uint32_t esize;
188  void *p;
193  struct rte_rcu_qsbr *v;
195 };
196 
197 /* RTE defer queue structure.
198  * This structure holds the defer queue. The defer queue is used to
199  * hold the deleted entries from the data structure that are not
200  * yet freed.
201  */
202 struct rte_rcu_qsbr_dq;
203 
215 size_t
216 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
217 
233 int
234 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
235 
256 int
257 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
258 
274 int
275 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
276 
302 static __rte_always_inline void
303 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
304 {
305  uint64_t t;
306 
307  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
308 
309  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
310  v->qsbr_cnt[thread_id].lock_cnt);
311 
312  /* Copy the current value of token.
313  * The fence at the end of the function will ensure that
314  * the following will not move down after the load of any shared
315  * data structure.
316  */
317  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
318 
319  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
320  * 'cnt' (64b) is accessed atomically.
321  */
322  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
323  t, __ATOMIC_RELAXED);
324 
325  /* The subsequent load of the data structure should not
326  * move above the store. Hence a store-load barrier
327  * is required.
328  * If the load of the data structure moves above the store,
329  * writer might not see that the reader is online, even though
330  * the reader is referencing the shared data structure.
331  */
332  rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
333 }
334 
355 static __rte_always_inline void
356 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
357 {
358  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
359 
360  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
361  v->qsbr_cnt[thread_id].lock_cnt);
362 
363  /* The reader can go offline only after the load of the
364  * data structure is completed. i.e. any load of the
365  * data strcture can not move after this store.
366  */
367 
368  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
369  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
370 }
371 
392 static __rte_always_inline void
393 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
394  __rte_unused unsigned int thread_id)
395 {
396  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
397 
398 #if defined(RTE_LIBRTE_RCU_DEBUG)
399  /* Increment the lock counter */
400  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
401  1, __ATOMIC_ACQUIRE);
402 #endif
403 }
404 
425 static __rte_always_inline void
426 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
427  __rte_unused unsigned int thread_id)
428 {
429  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
430 
431 #if defined(RTE_LIBRTE_RCU_DEBUG)
432  /* Decrement the lock counter */
433  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
434  1, __ATOMIC_RELEASE);
435 
436  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
437  "Lock counter %u. Nested locks?\n",
438  v->qsbr_cnt[thread_id].lock_cnt);
439 #endif
440 }
441 
455 static __rte_always_inline uint64_t
456 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
457 {
458  uint64_t t;
459 
460  RTE_ASSERT(v != NULL);
461 
462  /* Release the changes to the shared data structure.
463  * This store release will ensure that changes to any data
464  * structure are visible to the workers before the token
465  * update is visible.
466  */
467  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
468 
469  return t;
470 }
471 
484 static __rte_always_inline void
485 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
486 {
487  uint64_t t;
488 
489  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
490 
491  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
492  v->qsbr_cnt[thread_id].lock_cnt);
493 
494  /* Acquire the changes to the shared data structure released
495  * by rte_rcu_qsbr_start.
496  * Later loads of the shared data structure should not move
497  * above this load. Hence, use load-acquire.
498  */
499  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
500 
501  /* Check if there are updates available from the writer.
502  * Inform the writer that updates are visible to this reader.
503  * Prior loads of the shared data structure should not move
504  * beyond this store. Hence use store-release.
505  */
506  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
507  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
508  t, __ATOMIC_RELEASE);
509 
510  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
511  __func__, t, thread_id);
512 }
513 
514 /* Check the quiescent state counter for registered threads only, assuming
515  * that not all threads have registered.
516  */
517 static __rte_always_inline int
518 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
519 {
520  uint32_t i, j, id;
521  uint64_t bmap;
522  uint64_t c;
523  uint64_t *reg_thread_id;
524  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
525 
526  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
527  i < v->num_elems;
528  i++, reg_thread_id++) {
529  /* Load the current registered thread bit map before
530  * loading the reader thread quiescent state counters.
531  */
532  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
533  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
534 
535  while (bmap) {
536  j = __builtin_ctzl(bmap);
537  __RTE_RCU_DP_LOG(DEBUG,
538  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
539  __func__, t, wait, bmap, id + j);
540  c = __atomic_load_n(
541  &v->qsbr_cnt[id + j].cnt,
542  __ATOMIC_ACQUIRE);
543  __RTE_RCU_DP_LOG(DEBUG,
544  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
545  __func__, t, wait, c, id+j);
546 
547  /* Counter is not checked for wrap-around condition
548  * as it is a 64b counter.
549  */
550  if (unlikely(c !=
551  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
552  /* This thread is not in quiescent state */
553  if (!wait)
554  return 0;
555 
556  rte_pause();
557  /* This thread might have unregistered.
558  * Re-read the bitmap.
559  */
560  bmap = __atomic_load_n(reg_thread_id,
561  __ATOMIC_ACQUIRE);
562 
563  continue;
564  }
565 
566  /* This thread is in quiescent state. Use the counter
567  * to find the least acknowledged token among all the
568  * readers.
569  */
570  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
571  acked_token = c;
572 
573  bmap &= ~(1UL << j);
574  }
575  }
576 
577  /* All readers are checked, update least acknowledged token.
578  * There might be multiple writers trying to update this. There is
579  * no need to update this very accurately using compare-and-swap.
580  */
581  if (acked_token != __RTE_QSBR_CNT_MAX)
582  __atomic_store_n(&v->acked_token, acked_token,
583  __ATOMIC_RELAXED);
584 
585  return 1;
586 }
587 
588 /* Check the quiescent state counter for all threads, assuming that
589  * all the threads have registered.
590  */
591 static __rte_always_inline int
592 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
593 {
594  uint32_t i;
595  struct rte_rcu_qsbr_cnt *cnt;
596  uint64_t c;
597  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
598 
599  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
600  __RTE_RCU_DP_LOG(DEBUG,
601  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
602  __func__, t, wait, i);
603  while (1) {
604  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
605  __RTE_RCU_DP_LOG(DEBUG,
606  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
607  __func__, t, wait, c, i);
608 
609  /* Counter is not checked for wrap-around condition
610  * as it is a 64b counter.
611  */
612  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
613  break;
614 
615  /* This thread is not in quiescent state */
616  if (!wait)
617  return 0;
618 
619  rte_pause();
620  }
621 
622  /* This thread is in quiescent state. Use the counter to find
623  * the least acknowledged token among all the readers.
624  */
625  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
626  acked_token = c;
627  }
628 
629  /* All readers are checked, update least acknowledged token.
630  * There might be multiple writers trying to update this. There is
631  * no need to update this very accurately using compare-and-swap.
632  */
633  if (acked_token != __RTE_QSBR_CNT_MAX)
634  __atomic_store_n(&v->acked_token, acked_token,
635  __ATOMIC_RELAXED);
636 
637  return 1;
638 }
639 
671 static __rte_always_inline int
672 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
673 {
674  RTE_ASSERT(v != NULL);
675 
676  /* Check if all the readers have already acknowledged this token */
677  if (likely(t <= v->acked_token)) {
678  __RTE_RCU_DP_LOG(DEBUG,
679  "%s: check: token = %" PRIu64 ", wait = %d",
680  __func__, t, wait);
681  __RTE_RCU_DP_LOG(DEBUG,
682  "%s: status: least acked token = %" PRIu64,
683  __func__, v->acked_token);
684  return 1;
685  }
686 
687  if (likely(v->num_threads == v->max_threads))
688  return __rte_rcu_qsbr_check_all(v, t, wait);
689  else
690  return __rte_rcu_qsbr_check_selective(v, t, wait);
691 }
692 
711 void
712 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
713 
729 int
730 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
731 
748 __rte_experimental
749 struct rte_rcu_qsbr_dq *
751 
783 __rte_experimental
784 int
785 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
786 
812 __rte_experimental
813 int
814 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
815  unsigned int *freed, unsigned int *pending, unsigned int *available);
816 
838 __rte_experimental
839 int
840 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
841 
842 #ifdef __cplusplus
843 }
844 #endif
845 
846 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:228
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:118
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:672
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:485
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:193
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:356
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:426
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:137
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:186
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:402
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:303
static void rte_atomic_thread_fence(int memorder)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:393
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:456
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)