DPDK  23.07.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <inttypes.h>
33 #include <stdbool.h>
34 #include <stdio.h>
35 #include <stdint.h>
36 #include <rte_compat.h>
37 #include <rte_common.h>
38 #include <rte_debug.h>
39 #include <rte_atomic.h>
40 #include <rte_ring.h>
41 
42 extern int rte_rcu_log_type;
43 
44 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
45 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
46  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
47  "%s(): " fmt "\n", __func__, ## args)
48 #else
49 #define __RTE_RCU_DP_LOG(level, fmt, args...)
50 #endif
51 
52 #if defined(RTE_LIBRTE_RCU_DEBUG)
53 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
54  if (v->qsbr_cnt[thread_id].lock_cnt) \
55  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
56  "%s(): " fmt "\n", __func__, ## args); \
57 } while (0)
58 #else
59 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
60 #endif
61 
62 /* Registered thread IDs are stored as a bitmap of 64b element array.
63  * Given thread id needs to be converted to index into the array and
64  * the id within the array element.
65  */
66 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
67 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
68  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
69  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
70 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
71  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
72 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
73 #define __RTE_QSBR_THRID_MASK 0x3f
74 #define RTE_QSBR_THRID_INVALID 0xffffffff
75 
76 /* Worker thread counter */
77 struct rte_rcu_qsbr_cnt {
78  uint64_t cnt;
84  uint32_t lock_cnt;
87 
88 #define __RTE_QSBR_CNT_THR_OFFLINE 0
89 #define __RTE_QSBR_CNT_INIT 1
90 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
91 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
92 
93 /* RTE Quiescent State variable structure.
94  * This structure has two elements that vary in size based on the
95  * 'max_threads' parameter.
96  * 1) Quiescent state counter array
97  * 2) Register thread ID array
98  */
99 struct rte_rcu_qsbr {
100  uint64_t token __rte_cache_aligned;
102  uint64_t acked_token;
107  uint32_t num_elems __rte_cache_aligned;
109  uint32_t num_threads;
111  uint32_t max_threads;
114  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
121 
135 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
136 
137 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
138 
147 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
148 
153  const char *name;
155  uint32_t flags;
157  uint32_t size;
164  uint32_t esize;
186  void *p;
191  struct rte_rcu_qsbr *v;
193 };
194 
195 /* RTE defer queue structure.
196  * This structure holds the defer queue. The defer queue is used to
197  * hold the deleted entries from the data structure that are not
198  * yet freed.
199  */
200 struct rte_rcu_qsbr_dq;
201 
213 size_t
214 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
215 
230 int
231 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
232 
253 int
254 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
255 
271 int
272 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
273 
299 static __rte_always_inline void
300 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
301 {
302  uint64_t t;
303 
304  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
305 
306  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
307  v->qsbr_cnt[thread_id].lock_cnt);
308 
309  /* Copy the current value of token.
310  * The fence at the end of the function will ensure that
311  * the following will not move down after the load of any shared
312  * data structure.
313  */
314  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
315 
316  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
317  * 'cnt' (64b) is accessed atomically.
318  */
319  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
320  t, __ATOMIC_RELAXED);
321 
322  /* The subsequent load of the data structure should not
323  * move above the store. Hence a store-load barrier
324  * is required.
325  * If the load of the data structure moves above the store,
326  * writer might not see that the reader is online, even though
327  * the reader is referencing the shared data structure.
328  */
329  rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
330 }
331 
352 static __rte_always_inline void
353 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
354 {
355  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
356 
357  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
358  v->qsbr_cnt[thread_id].lock_cnt);
359 
360  /* The reader can go offline only after the load of the
361  * data structure is completed. i.e. any load of the
362  * data structure can not move after this store.
363  */
364 
365  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
366  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
367 }
368 
389 static __rte_always_inline void
390 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
391  __rte_unused unsigned int thread_id)
392 {
393  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
394 
395 #if defined(RTE_LIBRTE_RCU_DEBUG)
396  /* Increment the lock counter */
397  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
398  1, __ATOMIC_ACQUIRE);
399 #endif
400 }
401 
422 static __rte_always_inline void
423 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
424  __rte_unused unsigned int thread_id)
425 {
426  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
427 
428 #if defined(RTE_LIBRTE_RCU_DEBUG)
429  /* Decrement the lock counter */
430  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
431  1, __ATOMIC_RELEASE);
432 
433  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
434  "Lock counter %u. Nested locks?\n",
435  v->qsbr_cnt[thread_id].lock_cnt);
436 #endif
437 }
438 
452 static __rte_always_inline uint64_t
453 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
454 {
455  uint64_t t;
456 
457  RTE_ASSERT(v != NULL);
458 
459  /* Release the changes to the shared data structure.
460  * This store release will ensure that changes to any data
461  * structure are visible to the workers before the token
462  * update is visible.
463  */
464  t = __atomic_fetch_add(&v->token, 1, __ATOMIC_RELEASE) + 1;
465 
466  return t;
467 }
468 
481 static __rte_always_inline void
482 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
483 {
484  uint64_t t;
485 
486  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
487 
488  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
489  v->qsbr_cnt[thread_id].lock_cnt);
490 
491  /* Acquire the changes to the shared data structure released
492  * by rte_rcu_qsbr_start.
493  * Later loads of the shared data structure should not move
494  * above this load. Hence, use load-acquire.
495  */
496  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
497 
498  /* Check if there are updates available from the writer.
499  * Inform the writer that updates are visible to this reader.
500  * Prior loads of the shared data structure should not move
501  * beyond this store. Hence use store-release.
502  */
503  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
504  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
505  t, __ATOMIC_RELEASE);
506 
507  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
508  __func__, t, thread_id);
509 }
510 
511 /* Check the quiescent state counter for registered threads only, assuming
512  * that not all threads have registered.
513  */
514 static __rte_always_inline int
515 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
516 {
517  uint32_t i, j, id;
518  uint64_t bmap;
519  uint64_t c;
520  uint64_t *reg_thread_id;
521  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
522 
523  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
524  i < v->num_elems;
525  i++, reg_thread_id++) {
526  /* Load the current registered thread bit map before
527  * loading the reader thread quiescent state counters.
528  */
529  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
530  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
531 
532  while (bmap) {
533  j = __builtin_ctzl(bmap);
534  __RTE_RCU_DP_LOG(DEBUG,
535  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
536  __func__, t, wait, bmap, id + j);
537  c = __atomic_load_n(
538  &v->qsbr_cnt[id + j].cnt,
539  __ATOMIC_ACQUIRE);
540  __RTE_RCU_DP_LOG(DEBUG,
541  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
542  __func__, t, wait, c, id+j);
543 
544  /* Counter is not checked for wrap-around condition
545  * as it is a 64b counter.
546  */
547  if (unlikely(c !=
548  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
549  /* This thread is not in quiescent state */
550  if (!wait)
551  return 0;
552 
553  rte_pause();
554  /* This thread might have unregistered.
555  * Re-read the bitmap.
556  */
557  bmap = __atomic_load_n(reg_thread_id,
558  __ATOMIC_ACQUIRE);
559 
560  continue;
561  }
562 
563  /* This thread is in quiescent state. Use the counter
564  * to find the least acknowledged token among all the
565  * readers.
566  */
567  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
568  acked_token = c;
569 
570  bmap &= ~(1UL << j);
571  }
572  }
573 
574  /* All readers are checked, update least acknowledged token.
575  * There might be multiple writers trying to update this. There is
576  * no need to update this very accurately using compare-and-swap.
577  */
578  if (acked_token != __RTE_QSBR_CNT_MAX)
579  __atomic_store_n(&v->acked_token, acked_token,
580  __ATOMIC_RELAXED);
581 
582  return 1;
583 }
584 
585 /* Check the quiescent state counter for all threads, assuming that
586  * all the threads have registered.
587  */
588 static __rte_always_inline int
589 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
590 {
591  uint32_t i;
592  struct rte_rcu_qsbr_cnt *cnt;
593  uint64_t c;
594  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
595 
596  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
597  __RTE_RCU_DP_LOG(DEBUG,
598  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
599  __func__, t, wait, i);
600  while (1) {
601  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
602  __RTE_RCU_DP_LOG(DEBUG,
603  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
604  __func__, t, wait, c, i);
605 
606  /* Counter is not checked for wrap-around condition
607  * as it is a 64b counter.
608  */
609  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
610  break;
611 
612  /* This thread is not in quiescent state */
613  if (!wait)
614  return 0;
615 
616  rte_pause();
617  }
618 
619  /* This thread is in quiescent state. Use the counter to find
620  * the least acknowledged token among all the readers.
621  */
622  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
623  acked_token = c;
624  }
625 
626  /* All readers are checked, update least acknowledged token.
627  * There might be multiple writers trying to update this. There is
628  * no need to update this very accurately using compare-and-swap.
629  */
630  if (acked_token != __RTE_QSBR_CNT_MAX)
631  __atomic_store_n(&v->acked_token, acked_token,
632  __ATOMIC_RELAXED);
633 
634  return 1;
635 }
636 
668 static __rte_always_inline int
669 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
670 {
671  RTE_ASSERT(v != NULL);
672 
673  /* Check if all the readers have already acknowledged this token */
674  if (likely(t <= v->acked_token)) {
675  __RTE_RCU_DP_LOG(DEBUG,
676  "%s: check: token = %" PRIu64 ", wait = %d",
677  __func__, t, wait);
678  __RTE_RCU_DP_LOG(DEBUG,
679  "%s: status: least acked token = %" PRIu64,
680  __func__, v->acked_token);
681  return 1;
682  }
683 
684  if (likely(v->num_threads == v->max_threads))
685  return __rte_rcu_qsbr_check_all(v, t, wait);
686  else
687  return __rte_rcu_qsbr_check_selective(v, t, wait);
688 }
689 
708 void
709 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
710 
726 int
727 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
728 
745 __rte_experimental
746 struct rte_rcu_qsbr_dq *
748 
780 __rte_experimental
781 int
782 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
783 
809 __rte_experimental
810 int
811 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
812  unsigned int *freed, unsigned int *pending, unsigned int *available);
813 
835 __rte_experimental
836 int
837 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
838 
839 #ifdef __cplusplus
840 }
841 #endif
842 
843 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:255
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:120
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:669
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:482
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:191
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:353
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:423
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:135
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:184
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:440
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:300
static void rte_atomic_thread_fence(int memorder)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:390
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:453
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)