DPDK  24.03.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
24 #ifdef __cplusplus
25 extern "C" {
26 #endif
27 
28 #include <inttypes.h>
29 #include <stdalign.h>
30 #include <stdbool.h>
31 #include <stdio.h>
32 #include <stdint.h>
33 
34 #include <rte_common.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
37 #include <rte_ring.h>
38 
39 extern int rte_rcu_log_type;
40 #define RTE_LOGTYPE_RCU rte_rcu_log_type
41 
42 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
43 #define __RTE_RCU_DP_LOG(level, ...) \
44  RTE_LOG_DP_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
45 #else
46 #define __RTE_RCU_DP_LOG(level, ...)
47 #endif
48 
49 #if defined(RTE_LIBRTE_RCU_DEBUG)
50 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...) do { \
51  if (v->qsbr_cnt[thread_id].lock_cnt) \
52  RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__); \
53 } while (0)
54 #else
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...)
56 #endif
57 
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59  * Given thread id needs to be converted to index into the array and
60  * the id within the array element.
61  */
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(RTE_ATOMIC(uint64_t)) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t __rte_atomic *) \
67  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
71 
72 /* Worker thread counter */
73 struct __rte_cache_aligned rte_rcu_qsbr_cnt {
74  RTE_ATOMIC(uint64_t) cnt;
80  RTE_ATOMIC(uint32_t) lock_cnt;
82 };
83 
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
87 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
88 
89 /* RTE Quiescent State variable structure.
90  * This structure has two elements that vary in size based on the
91  * 'max_threads' parameter.
92  * 1) Quiescent state counter array
93  * 2) Register thread ID array
94  */
95 struct __rte_cache_aligned rte_rcu_qsbr {
96  alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint64_t) token;
98  RTE_ATOMIC(uint64_t) acked_token;
103  alignas(RTE_CACHE_LINE_SIZE) uint32_t num_elems;
105  RTE_ATOMIC(uint32_t) num_threads;
107  uint32_t max_threads;
110  alignas(RTE_CACHE_LINE_SIZE) struct rte_rcu_qsbr_cnt qsbr_cnt[0];
116 };
117 
131 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
132 
133 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
134 
143 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
144 
149  const char *name;
151  uint32_t flags;
153  uint32_t size;
160  uint32_t esize;
182  void *p;
187  struct rte_rcu_qsbr *v;
189 };
190 
191 /* RTE defer queue structure.
192  * This structure holds the defer queue. The defer queue is used to
193  * hold the deleted entries from the data structure that are not
194  * yet freed.
195  */
196 struct rte_rcu_qsbr_dq;
197 
209 size_t
210 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
211 
226 int
227 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
228 
249 int
250 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
251 
267 int
268 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
269 
295 static __rte_always_inline void
296 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
297 {
298  uint64_t t;
299 
300  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
301 
302  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
303  v->qsbr_cnt[thread_id].lock_cnt);
304 
305  /* Copy the current value of token.
306  * The fence at the end of the function will ensure that
307  * the following will not move down after the load of any shared
308  * data structure.
309  */
310  t = rte_atomic_load_explicit(&v->token, rte_memory_order_relaxed);
311 
312  /* rte_atomic_store_explicit(cnt, rte_memory_order_relaxed) is used to ensure
313  * 'cnt' (64b) is accessed atomically.
314  */
315  rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
316  t, rte_memory_order_relaxed);
317 
318  /* The subsequent load of the data structure should not
319  * move above the store. Hence a store-load barrier
320  * is required.
321  * If the load of the data structure moves above the store,
322  * writer might not see that the reader is online, even though
323  * the reader is referencing the shared data structure.
324  */
325  rte_atomic_thread_fence(rte_memory_order_seq_cst);
326 }
327 
348 static __rte_always_inline void
349 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
350 {
351  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
352 
353  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
354  v->qsbr_cnt[thread_id].lock_cnt);
355 
356  /* The reader can go offline only after the load of the
357  * data structure is completed. i.e. any load of the
358  * data structure can not move after this store.
359  */
360 
361  rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
362  __RTE_QSBR_CNT_THR_OFFLINE, rte_memory_order_release);
363 }
364 
385 static __rte_always_inline void
386 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
387  __rte_unused unsigned int thread_id)
388 {
389  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
390 
391 #if defined(RTE_LIBRTE_RCU_DEBUG)
392  /* Increment the lock counter */
393  rte_atomic_fetch_add_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
394  1, rte_memory_order_acquire);
395 #endif
396 }
397 
418 static __rte_always_inline void
419 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
420  __rte_unused unsigned int thread_id)
421 {
422  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
423 
424 #if defined(RTE_LIBRTE_RCU_DEBUG)
425  /* Decrement the lock counter */
426  rte_atomic_fetch_sub_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
427  1, rte_memory_order_release);
428 
429  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
430  "Lock counter %u. Nested locks?",
431  v->qsbr_cnt[thread_id].lock_cnt);
432 #endif
433 }
434 
448 static __rte_always_inline uint64_t
449 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
450 {
451  uint64_t t;
452 
453  RTE_ASSERT(v != NULL);
454 
455  /* Release the changes to the shared data structure.
456  * This store release will ensure that changes to any data
457  * structure are visible to the workers before the token
458  * update is visible.
459  */
460  t = rte_atomic_fetch_add_explicit(&v->token, 1, rte_memory_order_release) + 1;
461 
462  return t;
463 }
464 
477 static __rte_always_inline void
478 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
479 {
480  uint64_t t;
481 
482  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
483 
484  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
485  v->qsbr_cnt[thread_id].lock_cnt);
486 
487  /* Acquire the changes to the shared data structure released
488  * by rte_rcu_qsbr_start.
489  * Later loads of the shared data structure should not move
490  * above this load. Hence, use load-acquire.
491  */
492  t = rte_atomic_load_explicit(&v->token, rte_memory_order_acquire);
493 
494  /* Check if there are updates available from the writer.
495  * Inform the writer that updates are visible to this reader.
496  * Prior loads of the shared data structure should not move
497  * beyond this store. Hence use store-release.
498  */
499  if (t != rte_atomic_load_explicit(&v->qsbr_cnt[thread_id].cnt, rte_memory_order_relaxed))
500  rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
501  t, rte_memory_order_release);
502 
503  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
504  __func__, t, thread_id);
505 }
506 
507 /* Check the quiescent state counter for registered threads only, assuming
508  * that not all threads have registered.
509  */
510 static __rte_always_inline int
511 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
512 {
513  uint32_t i, j, id;
514  uint64_t bmap;
515  uint64_t c;
516  RTE_ATOMIC(uint64_t) *reg_thread_id;
517  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
518 
519  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
520  i < v->num_elems;
521  i++, reg_thread_id++) {
522  /* Load the current registered thread bit map before
523  * loading the reader thread quiescent state counters.
524  */
525  bmap = rte_atomic_load_explicit(reg_thread_id, rte_memory_order_acquire);
526  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
527 
528  while (bmap) {
529  j = rte_ctz64(bmap);
530  __RTE_RCU_DP_LOG(DEBUG,
531  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
532  __func__, t, wait, bmap, id + j);
533  c = rte_atomic_load_explicit(
534  &v->qsbr_cnt[id + j].cnt,
535  rte_memory_order_acquire);
536  __RTE_RCU_DP_LOG(DEBUG,
537  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
538  __func__, t, wait, c, id+j);
539 
540  /* Counter is not checked for wrap-around condition
541  * as it is a 64b counter.
542  */
543  if (unlikely(c !=
544  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
545  /* This thread is not in quiescent state */
546  if (!wait)
547  return 0;
548 
549  rte_pause();
550  /* This thread might have unregistered.
551  * Re-read the bitmap.
552  */
553  bmap = rte_atomic_load_explicit(reg_thread_id,
554  rte_memory_order_acquire);
555 
556  continue;
557  }
558 
559  /* This thread is in quiescent state. Use the counter
560  * to find the least acknowledged token among all the
561  * readers.
562  */
563  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
564  acked_token = c;
565 
566  bmap &= ~(1UL << j);
567  }
568  }
569 
570  /* All readers are checked, update least acknowledged token.
571  * There might be multiple writers trying to update this. There is
572  * no need to update this very accurately using compare-and-swap.
573  */
574  if (acked_token != __RTE_QSBR_CNT_MAX)
575  rte_atomic_store_explicit(&v->acked_token, acked_token,
576  rte_memory_order_relaxed);
577 
578  return 1;
579 }
580 
581 /* Check the quiescent state counter for all threads, assuming that
582  * all the threads have registered.
583  */
584 static __rte_always_inline int
585 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
586 {
587  uint32_t i;
588  struct rte_rcu_qsbr_cnt *cnt;
589  uint64_t c;
590  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
591 
592  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
593  __RTE_RCU_DP_LOG(DEBUG,
594  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
595  __func__, t, wait, i);
596  while (1) {
597  c = rte_atomic_load_explicit(&cnt->cnt, rte_memory_order_acquire);
598  __RTE_RCU_DP_LOG(DEBUG,
599  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
600  __func__, t, wait, c, i);
601 
602  /* Counter is not checked for wrap-around condition
603  * as it is a 64b counter.
604  */
605  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
606  break;
607 
608  /* This thread is not in quiescent state */
609  if (!wait)
610  return 0;
611 
612  rte_pause();
613  }
614 
615  /* This thread is in quiescent state. Use the counter to find
616  * the least acknowledged token among all the readers.
617  */
618  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
619  acked_token = c;
620  }
621 
622  /* All readers are checked, update least acknowledged token.
623  * There might be multiple writers trying to update this. There is
624  * no need to update this very accurately using compare-and-swap.
625  */
626  if (acked_token != __RTE_QSBR_CNT_MAX)
627  rte_atomic_store_explicit(&v->acked_token, acked_token,
628  rte_memory_order_relaxed);
629 
630  return 1;
631 }
632 
664 static __rte_always_inline int
665 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
666 {
667  uint64_t acked_token;
668 
669  RTE_ASSERT(v != NULL);
670 
671  /* Check if all the readers have already acknowledged this token */
672  acked_token = rte_atomic_load_explicit(&v->acked_token,
673  rte_memory_order_relaxed);
674  if (likely(t <= acked_token)) {
675  __RTE_RCU_DP_LOG(DEBUG,
676  "%s: check: token = %" PRIu64 ", wait = %d",
677  __func__, t, wait);
678  __RTE_RCU_DP_LOG(DEBUG,
679  "%s: status: least acked token = %" PRIu64,
680  __func__, acked_token);
681  return 1;
682  }
683 
684  if (likely(v->num_threads == v->max_threads))
685  return __rte_rcu_qsbr_check_all(v, t, wait);
686  else
687  return __rte_rcu_qsbr_check_selective(v, t, wait);
688 }
689 
708 void
709 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
710 
726 int
727 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
728 
742 struct rte_rcu_qsbr_dq *
744 
773 int
774 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
775 
798 int
799 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
800  unsigned int *freed, unsigned int *pending, unsigned int *available);
801 
820 int
821 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
822 
823 #ifdef __cplusplus
824 }
825 #endif
826 
827 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:355
struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:156
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:665
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:478
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:187
#define __rte_cache_aligned
Definition: rte_common.h:553
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:349
int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:419
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:131
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:180
static void rte_pause(void)
static unsigned int rte_ctz64(uint64_t v)
Definition: rte_bitops.h:461
static void rte_atomic_thread_fence(rte_memory_order memorder)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:296
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:386
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:449
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)