DPDK  23.03.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <inttypes.h>
33 #include <stdbool.h>
34 #include <stdio.h>
35 #include <stdint.h>
36 #include <rte_compat.h>
37 #include <rte_common.h>
38 #include <rte_debug.h>
39 #include <rte_atomic.h>
40 #include <rte_ring.h>
41 
42 extern int rte_rcu_log_type;
43 
44 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
45 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
46  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
47  "%s(): " fmt "\n", __func__, ## args)
48 #else
49 #define __RTE_RCU_DP_LOG(level, fmt, args...)
50 #endif
51 
52 #if defined(RTE_LIBRTE_RCU_DEBUG)
53 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
54  if (v->qsbr_cnt[thread_id].lock_cnt) \
55  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
56  "%s(): " fmt "\n", __func__, ## args); \
57 } while (0)
58 #else
59 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
60 #endif
61 
62 /* Registered thread IDs are stored as a bitmap of 64b element array.
63  * Given thread id needs to be converted to index into the array and
64  * the id within the array element.
65  */
66 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
67 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
68  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
69  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
70 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
71  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
72 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
73 #define __RTE_QSBR_THRID_MASK 0x3f
74 #define RTE_QSBR_THRID_INVALID 0xffffffff
75 
76 /* Worker thread counter */
77 struct rte_rcu_qsbr_cnt {
78  uint64_t cnt;
84  uint32_t lock_cnt;
87 
88 #define __RTE_QSBR_CNT_THR_OFFLINE 0
89 #define __RTE_QSBR_CNT_INIT 1
90 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
91 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
92 
93 /* RTE Quiescent State variable structure.
94  * This structure has two elements that vary in size based on the
95  * 'max_threads' parameter.
96  * 1) Quiescent state counter array
97  * 2) Register thread ID array
98  */
99 struct rte_rcu_qsbr {
100  uint64_t token __rte_cache_aligned;
102  uint64_t acked_token;
107  uint32_t num_elems __rte_cache_aligned;
109  uint32_t num_threads;
111  uint32_t max_threads;
114  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
121 
135 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
136 
137 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
138 
147 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
148 
153  const char *name;
155  uint32_t flags;
157  uint32_t size;
164  uint32_t esize;
186  void *p;
191  struct rte_rcu_qsbr *v;
193 };
194 
195 /* RTE defer queue structure.
196  * This structure holds the defer queue. The defer queue is used to
197  * hold the deleted entries from the data structure that are not
198  * yet freed.
199  */
200 struct rte_rcu_qsbr_dq;
201 
213 size_t
214 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
215 
231 int
232 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
233 
254 int
255 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
256 
272 int
273 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
274 
300 static __rte_always_inline void
301 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
302 {
303  uint64_t t;
304 
305  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
306 
307  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
308  v->qsbr_cnt[thread_id].lock_cnt);
309 
310  /* Copy the current value of token.
311  * The fence at the end of the function will ensure that
312  * the following will not move down after the load of any shared
313  * data structure.
314  */
315  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
316 
317  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
318  * 'cnt' (64b) is accessed atomically.
319  */
320  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
321  t, __ATOMIC_RELAXED);
322 
323  /* The subsequent load of the data structure should not
324  * move above the store. Hence a store-load barrier
325  * is required.
326  * If the load of the data structure moves above the store,
327  * writer might not see that the reader is online, even though
328  * the reader is referencing the shared data structure.
329  */
330  rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
331 }
332 
353 static __rte_always_inline void
354 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
355 {
356  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
357 
358  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
359  v->qsbr_cnt[thread_id].lock_cnt);
360 
361  /* The reader can go offline only after the load of the
362  * data structure is completed. i.e. any load of the
363  * data structure can not move after this store.
364  */
365 
366  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
367  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
368 }
369 
390 static __rte_always_inline void
391 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
392  __rte_unused unsigned int thread_id)
393 {
394  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
395 
396 #if defined(RTE_LIBRTE_RCU_DEBUG)
397  /* Increment the lock counter */
398  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
399  1, __ATOMIC_ACQUIRE);
400 #endif
401 }
402 
423 static __rte_always_inline void
424 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
425  __rte_unused unsigned int thread_id)
426 {
427  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
428 
429 #if defined(RTE_LIBRTE_RCU_DEBUG)
430  /* Decrement the lock counter */
431  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
432  1, __ATOMIC_RELEASE);
433 
434  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
435  "Lock counter %u. Nested locks?\n",
436  v->qsbr_cnt[thread_id].lock_cnt);
437 #endif
438 }
439 
453 static __rte_always_inline uint64_t
454 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
455 {
456  uint64_t t;
457 
458  RTE_ASSERT(v != NULL);
459 
460  /* Release the changes to the shared data structure.
461  * This store release will ensure that changes to any data
462  * structure are visible to the workers before the token
463  * update is visible.
464  */
465  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
466 
467  return t;
468 }
469 
482 static __rte_always_inline void
483 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
484 {
485  uint64_t t;
486 
487  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
488 
489  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
490  v->qsbr_cnt[thread_id].lock_cnt);
491 
492  /* Acquire the changes to the shared data structure released
493  * by rte_rcu_qsbr_start.
494  * Later loads of the shared data structure should not move
495  * above this load. Hence, use load-acquire.
496  */
497  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
498 
499  /* Check if there are updates available from the writer.
500  * Inform the writer that updates are visible to this reader.
501  * Prior loads of the shared data structure should not move
502  * beyond this store. Hence use store-release.
503  */
504  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
505  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
506  t, __ATOMIC_RELEASE);
507 
508  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
509  __func__, t, thread_id);
510 }
511 
512 /* Check the quiescent state counter for registered threads only, assuming
513  * that not all threads have registered.
514  */
515 static __rte_always_inline int
516 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
517 {
518  uint32_t i, j, id;
519  uint64_t bmap;
520  uint64_t c;
521  uint64_t *reg_thread_id;
522  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
523 
524  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
525  i < v->num_elems;
526  i++, reg_thread_id++) {
527  /* Load the current registered thread bit map before
528  * loading the reader thread quiescent state counters.
529  */
530  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
531  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
532 
533  while (bmap) {
534  j = __builtin_ctzl(bmap);
535  __RTE_RCU_DP_LOG(DEBUG,
536  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
537  __func__, t, wait, bmap, id + j);
538  c = __atomic_load_n(
539  &v->qsbr_cnt[id + j].cnt,
540  __ATOMIC_ACQUIRE);
541  __RTE_RCU_DP_LOG(DEBUG,
542  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
543  __func__, t, wait, c, id+j);
544 
545  /* Counter is not checked for wrap-around condition
546  * as it is a 64b counter.
547  */
548  if (unlikely(c !=
549  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
550  /* This thread is not in quiescent state */
551  if (!wait)
552  return 0;
553 
554  rte_pause();
555  /* This thread might have unregistered.
556  * Re-read the bitmap.
557  */
558  bmap = __atomic_load_n(reg_thread_id,
559  __ATOMIC_ACQUIRE);
560 
561  continue;
562  }
563 
564  /* This thread is in quiescent state. Use the counter
565  * to find the least acknowledged token among all the
566  * readers.
567  */
568  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
569  acked_token = c;
570 
571  bmap &= ~(1UL << j);
572  }
573  }
574 
575  /* All readers are checked, update least acknowledged token.
576  * There might be multiple writers trying to update this. There is
577  * no need to update this very accurately using compare-and-swap.
578  */
579  if (acked_token != __RTE_QSBR_CNT_MAX)
580  __atomic_store_n(&v->acked_token, acked_token,
581  __ATOMIC_RELAXED);
582 
583  return 1;
584 }
585 
586 /* Check the quiescent state counter for all threads, assuming that
587  * all the threads have registered.
588  */
589 static __rte_always_inline int
590 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
591 {
592  uint32_t i;
593  struct rte_rcu_qsbr_cnt *cnt;
594  uint64_t c;
595  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
596 
597  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
598  __RTE_RCU_DP_LOG(DEBUG,
599  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
600  __func__, t, wait, i);
601  while (1) {
602  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
603  __RTE_RCU_DP_LOG(DEBUG,
604  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
605  __func__, t, wait, c, i);
606 
607  /* Counter is not checked for wrap-around condition
608  * as it is a 64b counter.
609  */
610  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
611  break;
612 
613  /* This thread is not in quiescent state */
614  if (!wait)
615  return 0;
616 
617  rte_pause();
618  }
619 
620  /* This thread is in quiescent state. Use the counter to find
621  * the least acknowledged token among all the readers.
622  */
623  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
624  acked_token = c;
625  }
626 
627  /* All readers are checked, update least acknowledged token.
628  * There might be multiple writers trying to update this. There is
629  * no need to update this very accurately using compare-and-swap.
630  */
631  if (acked_token != __RTE_QSBR_CNT_MAX)
632  __atomic_store_n(&v->acked_token, acked_token,
633  __ATOMIC_RELAXED);
634 
635  return 1;
636 }
637 
669 static __rte_always_inline int
670 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
671 {
672  RTE_ASSERT(v != NULL);
673 
674  /* Check if all the readers have already acknowledged this token */
675  if (likely(t <= v->acked_token)) {
676  __RTE_RCU_DP_LOG(DEBUG,
677  "%s: check: token = %" PRIu64 ", wait = %d",
678  __func__, t, wait);
679  __RTE_RCU_DP_LOG(DEBUG,
680  "%s: status: least acked token = %" PRIu64,
681  __func__, v->acked_token);
682  return 1;
683  }
684 
685  if (likely(v->num_threads == v->max_threads))
686  return __rte_rcu_qsbr_check_all(v, t, wait);
687  else
688  return __rte_rcu_qsbr_check_selective(v, t, wait);
689 }
690 
709 void
710 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
711 
727 int
728 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
729 
746 __rte_experimental
747 struct rte_rcu_qsbr_dq *
749 
781 __rte_experimental
782 int
783 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
784 
810 __rte_experimental
811 int
812 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
813  unsigned int *freed, unsigned int *pending, unsigned int *available);
814 
836 __rte_experimental
837 int
838 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
839 
840 #ifdef __cplusplus
841 }
842 #endif
843 
844 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:255
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:120
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:670
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:483
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:191
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:354
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:424
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:135
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:184
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:440
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:301
static void rte_atomic_thread_fence(int memorder)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:391
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:454
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)