DPDK  22.03.0
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018-2020 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <rte_common.h>
36 #include <rte_debug.h>
37 #include <rte_atomic.h>
38 #include <rte_ring.h>
39 
40 extern int rte_rcu_log_type;
41 
42 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
43 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
44  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
45  "%s(): " fmt "\n", __func__, ## args)
46 #else
47 #define __RTE_RCU_DP_LOG(level, fmt, args...)
48 #endif
49 
50 #if defined(RTE_LIBRTE_RCU_DEBUG)
51 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
52  if (v->qsbr_cnt[thread_id].lock_cnt) \
53  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
54  "%s(): " fmt "\n", __func__, ## args); \
55 } while (0)
56 #else
57 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
58 #endif
59 
60 /* Registered thread IDs are stored as a bitmap of 64b element array.
61  * Given thread id needs to be converted to index into the array and
62  * the id within the array element.
63  */
64 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
65 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
66  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
67  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
68 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
69  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
70 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
71 #define __RTE_QSBR_THRID_MASK 0x3f
72 #define RTE_QSBR_THRID_INVALID 0xffffffff
73 
74 /* Worker thread counter */
75 struct rte_rcu_qsbr_cnt {
76  uint64_t cnt;
82  uint32_t lock_cnt;
85 
86 #define __RTE_QSBR_CNT_THR_OFFLINE 0
87 #define __RTE_QSBR_CNT_INIT 1
88 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
89 #define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
90 
91 /* RTE Quiescent State variable structure.
92  * This structure has two elements that vary in size based on the
93  * 'max_threads' parameter.
94  * 1) Quiescent state counter array
95  * 2) Register thread ID array
96  */
97 struct rte_rcu_qsbr {
98  uint64_t token __rte_cache_aligned;
100  uint64_t acked_token;
105  uint32_t num_elems __rte_cache_aligned;
107  uint32_t num_threads;
109  uint32_t max_threads;
112  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
119 
133 typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
134 
135 #define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
136 
145 #define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
146 
151  const char *name;
153  uint32_t flags;
155  uint32_t size;
162  uint32_t esize;
184  void *p;
189  struct rte_rcu_qsbr *v;
191 };
192 
193 /* RTE defer queue structure.
194  * This structure holds the defer queue. The defer queue is used to
195  * hold the deleted entries from the data structure that are not
196  * yet freed.
197  */
198 struct rte_rcu_qsbr_dq;
199 
211 size_t
212 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
213 
229 int
230 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
231 
252 int
253 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
254 
270 int
271 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
272 
298 static __rte_always_inline void
299 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
300 {
301  uint64_t t;
302 
303  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
304 
305  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
306  v->qsbr_cnt[thread_id].lock_cnt);
307 
308  /* Copy the current value of token.
309  * The fence at the end of the function will ensure that
310  * the following will not move down after the load of any shared
311  * data structure.
312  */
313  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
314 
315  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
316  * 'cnt' (64b) is accessed atomically.
317  */
318  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
319  t, __ATOMIC_RELAXED);
320 
321  /* The subsequent load of the data structure should not
322  * move above the store. Hence a store-load barrier
323  * is required.
324  * If the load of the data structure moves above the store,
325  * writer might not see that the reader is online, even though
326  * the reader is referencing the shared data structure.
327  */
328  rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
329 }
330 
351 static __rte_always_inline void
352 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
353 {
354  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
355 
356  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
357  v->qsbr_cnt[thread_id].lock_cnt);
358 
359  /* The reader can go offline only after the load of the
360  * data structure is completed. i.e. any load of the
361  * data structure can not move after this store.
362  */
363 
364  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
365  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
366 }
367 
388 static __rte_always_inline void
389 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
390  __rte_unused unsigned int thread_id)
391 {
392  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
393 
394 #if defined(RTE_LIBRTE_RCU_DEBUG)
395  /* Increment the lock counter */
396  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
397  1, __ATOMIC_ACQUIRE);
398 #endif
399 }
400 
421 static __rte_always_inline void
422 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
423  __rte_unused unsigned int thread_id)
424 {
425  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
426 
427 #if defined(RTE_LIBRTE_RCU_DEBUG)
428  /* Decrement the lock counter */
429  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
430  1, __ATOMIC_RELEASE);
431 
432  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
433  "Lock counter %u. Nested locks?\n",
434  v->qsbr_cnt[thread_id].lock_cnt);
435 #endif
436 }
437 
451 static __rte_always_inline uint64_t
452 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
453 {
454  uint64_t t;
455 
456  RTE_ASSERT(v != NULL);
457 
458  /* Release the changes to the shared data structure.
459  * This store release will ensure that changes to any data
460  * structure are visible to the workers before the token
461  * update is visible.
462  */
463  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
464 
465  return t;
466 }
467 
480 static __rte_always_inline void
481 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
482 {
483  uint64_t t;
484 
485  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
486 
487  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
488  v->qsbr_cnt[thread_id].lock_cnt);
489 
490  /* Acquire the changes to the shared data structure released
491  * by rte_rcu_qsbr_start.
492  * Later loads of the shared data structure should not move
493  * above this load. Hence, use load-acquire.
494  */
495  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
496 
497  /* Check if there are updates available from the writer.
498  * Inform the writer that updates are visible to this reader.
499  * Prior loads of the shared data structure should not move
500  * beyond this store. Hence use store-release.
501  */
502  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
503  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
504  t, __ATOMIC_RELEASE);
505 
506  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
507  __func__, t, thread_id);
508 }
509 
510 /* Check the quiescent state counter for registered threads only, assuming
511  * that not all threads have registered.
512  */
513 static __rte_always_inline int
514 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
515 {
516  uint32_t i, j, id;
517  uint64_t bmap;
518  uint64_t c;
519  uint64_t *reg_thread_id;
520  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
521 
522  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
523  i < v->num_elems;
524  i++, reg_thread_id++) {
525  /* Load the current registered thread bit map before
526  * loading the reader thread quiescent state counters.
527  */
528  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
529  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
530 
531  while (bmap) {
532  j = __builtin_ctzl(bmap);
533  __RTE_RCU_DP_LOG(DEBUG,
534  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
535  __func__, t, wait, bmap, id + j);
536  c = __atomic_load_n(
537  &v->qsbr_cnt[id + j].cnt,
538  __ATOMIC_ACQUIRE);
539  __RTE_RCU_DP_LOG(DEBUG,
540  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
541  __func__, t, wait, c, id+j);
542 
543  /* Counter is not checked for wrap-around condition
544  * as it is a 64b counter.
545  */
546  if (unlikely(c !=
547  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
548  /* This thread is not in quiescent state */
549  if (!wait)
550  return 0;
551 
552  rte_pause();
553  /* This thread might have unregistered.
554  * Re-read the bitmap.
555  */
556  bmap = __atomic_load_n(reg_thread_id,
557  __ATOMIC_ACQUIRE);
558 
559  continue;
560  }
561 
562  /* This thread is in quiescent state. Use the counter
563  * to find the least acknowledged token among all the
564  * readers.
565  */
566  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
567  acked_token = c;
568 
569  bmap &= ~(1UL << j);
570  }
571  }
572 
573  /* All readers are checked, update least acknowledged token.
574  * There might be multiple writers trying to update this. There is
575  * no need to update this very accurately using compare-and-swap.
576  */
577  if (acked_token != __RTE_QSBR_CNT_MAX)
578  __atomic_store_n(&v->acked_token, acked_token,
579  __ATOMIC_RELAXED);
580 
581  return 1;
582 }
583 
584 /* Check the quiescent state counter for all threads, assuming that
585  * all the threads have registered.
586  */
587 static __rte_always_inline int
588 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
589 {
590  uint32_t i;
591  struct rte_rcu_qsbr_cnt *cnt;
592  uint64_t c;
593  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
594 
595  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
596  __RTE_RCU_DP_LOG(DEBUG,
597  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
598  __func__, t, wait, i);
599  while (1) {
600  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
601  __RTE_RCU_DP_LOG(DEBUG,
602  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
603  __func__, t, wait, c, i);
604 
605  /* Counter is not checked for wrap-around condition
606  * as it is a 64b counter.
607  */
608  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
609  break;
610 
611  /* This thread is not in quiescent state */
612  if (!wait)
613  return 0;
614 
615  rte_pause();
616  }
617 
618  /* This thread is in quiescent state. Use the counter to find
619  * the least acknowledged token among all the readers.
620  */
621  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
622  acked_token = c;
623  }
624 
625  /* All readers are checked, update least acknowledged token.
626  * There might be multiple writers trying to update this. There is
627  * no need to update this very accurately using compare-and-swap.
628  */
629  if (acked_token != __RTE_QSBR_CNT_MAX)
630  __atomic_store_n(&v->acked_token, acked_token,
631  __ATOMIC_RELAXED);
632 
633  return 1;
634 }
635 
667 static __rte_always_inline int
668 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
669 {
670  RTE_ASSERT(v != NULL);
671 
672  /* Check if all the readers have already acknowledged this token */
673  if (likely(t <= v->acked_token)) {
674  __RTE_RCU_DP_LOG(DEBUG,
675  "%s: check: token = %" PRIu64 ", wait = %d",
676  __func__, t, wait);
677  __RTE_RCU_DP_LOG(DEBUG,
678  "%s: status: least acked token = %" PRIu64,
679  __func__, v->acked_token);
680  return 1;
681  }
682 
683  if (likely(v->num_threads == v->max_threads))
684  return __rte_rcu_qsbr_check_all(v, t, wait);
685  else
686  return __rte_rcu_qsbr_check_selective(v, t, wait);
687 }
688 
707 void
708 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
709 
725 int
726 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
727 
744 __rte_experimental
745 struct rte_rcu_qsbr_dq *
747 
779 __rte_experimental
780 int
781 rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
782 
808 __rte_experimental
809 int
810 rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
811  unsigned int *freed, unsigned int *pending, unsigned int *available);
812 
834 __rte_experimental
835 int
836 rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
837 
838 #ifdef __cplusplus
839 }
840 #endif
841 
842 #endif /* _RTE_RCU_QSBR_H_ */
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:228
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
#define likely(x)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
#define __rte_unused
Definition: rte_common.h:118
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:668
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:481
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:189
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:352
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
#define unlikely(x)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:422
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:133
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:182
static void rte_pause(void)
#define __rte_cache_aligned
Definition: rte_common.h:402
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:299
static void rte_atomic_thread_fence(int memorder)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:389
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:452
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)