DPDK  19.11.14
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
28 #ifdef __cplusplus
29 extern "C" {
30 #endif
31 
32 #include <stdbool.h>
33 #include <stdio.h>
34 #include <stdint.h>
35 #include <inttypes.h>
36 #include <errno.h>
37 #include <rte_common.h>
38 #include <rte_memory.h>
39 #include <rte_lcore.h>
40 #include <rte_debug.h>
41 #include <rte_atomic.h>
42 
43 extern int rte_rcu_log_type;
44 
45 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
46 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
47  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
48  "%s(): " fmt "\n", __func__, ## args)
49 #else
50 #define __RTE_RCU_DP_LOG(level, fmt, args...)
51 #endif
52 
53 #if defined(RTE_LIBRTE_RCU_DEBUG)
54 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
55  if (v->qsbr_cnt[thread_id].lock_cnt) \
56  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
57  "%s(): " fmt "\n", __func__, ## args); \
58 } while (0)
59 #else
60 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
61 #endif
62 
63 /* Registered thread IDs are stored as a bitmap of 64b element array.
64  * Given thread id needs to be converted to index into the array and
65  * the id within the array element.
66  */
67 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
68 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
69  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
70  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
71 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
72  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
73 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
74 #define __RTE_QSBR_THRID_MASK 0x3f
75 #define RTE_QSBR_THRID_INVALID 0xffffffff
76 
77 /* Worker thread counter */
78 struct rte_rcu_qsbr_cnt {
79  uint64_t cnt;
85  uint32_t lock_cnt;
88 
89 #define __RTE_QSBR_CNT_THR_OFFLINE 0
90 #define __RTE_QSBR_CNT_INIT 1
91 #define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
92 
93 /* RTE Quiescent State variable structure.
94  * This structure has two elements that vary in size based on the
95  * 'max_threads' parameter.
96  * 1) Quiescent state counter array
97  * 2) Register thread ID array
98  */
99 struct rte_rcu_qsbr {
100  uint64_t token __rte_cache_aligned;
102  uint64_t acked_token;
107  uint32_t num_elems __rte_cache_aligned;
109  uint32_t num_threads;
111  uint32_t max_threads;
114  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
121 
136 __rte_experimental
137 size_t
138 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
139 
158 __rte_experimental
159 int
160 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
161 
185 __rte_experimental
186 int
187 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
188 
207 __rte_experimental
208 int
209 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
210 
239 __rte_experimental
240 static __rte_always_inline void
241 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
242 {
243  uint64_t t;
244 
245  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
246 
247  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
248  v->qsbr_cnt[thread_id].lock_cnt);
249 
250  /* Copy the current value of token.
251  * The fence at the end of the function will ensure that
252  * the following will not move down after the load of any shared
253  * data structure.
254  */
255  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
256 
257  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
258  * 'cnt' (64b) is accessed atomically.
259  */
260  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
261  t, __ATOMIC_RELAXED);
262 
263  /* The subsequent load of the data structure should not
264  * move above the store. Hence a store-load barrier
265  * is required.
266  * If the load of the data structure moves above the store,
267  * writer might not see that the reader is online, even though
268  * the reader is referencing the shared data structure.
269  */
270 #ifdef RTE_ARCH_X86_64
271  /* rte_smp_mb() for x86 is lighter */
272  rte_smp_mb();
273 #else
274  __atomic_thread_fence(__ATOMIC_SEQ_CST);
275 #endif
276 }
277 
301 __rte_experimental
302 static __rte_always_inline void
303 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
304 {
305  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
306 
307  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
308  v->qsbr_cnt[thread_id].lock_cnt);
309 
310  /* The reader can go offline only after the load of the
311  * data structure is completed. i.e. any load of the
312  * data structure can not move after this store.
313  */
314 
315  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
316  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
317 }
318 
342 __rte_experimental
343 static __rte_always_inline void
344 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
345  __rte_unused unsigned int thread_id)
346 {
347  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
348 
349 #if defined(RTE_LIBRTE_RCU_DEBUG)
350  /* Increment the lock counter */
351  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
352  1, __ATOMIC_ACQUIRE);
353 #endif
354 }
355 
379 __rte_experimental
380 static __rte_always_inline void
381 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
382  __rte_unused unsigned int thread_id)
383 {
384  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
385 
386 #if defined(RTE_LIBRTE_RCU_DEBUG)
387  /* Decrement the lock counter */
388  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
389  1, __ATOMIC_RELEASE);
390 
391  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
392  "Lock counter %u. Nested locks?\n",
393  v->qsbr_cnt[thread_id].lock_cnt);
394 #endif
395 }
396 
413 __rte_experimental
414 static __rte_always_inline uint64_t
415 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
416 {
417  uint64_t t;
418 
419  RTE_ASSERT(v != NULL);
420 
421  /* Release the changes to the shared data structure.
422  * This store release will ensure that changes to any data
423  * structure are visible to the workers before the token
424  * update is visible.
425  */
426  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
427 
428  return t;
429 }
430 
446 __rte_experimental
447 static __rte_always_inline void
448 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
449 {
450  uint64_t t;
451 
452  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
453 
454  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
455  v->qsbr_cnt[thread_id].lock_cnt);
456 
457  /* Acquire the changes to the shared data structure released
458  * by rte_rcu_qsbr_start.
459  * Later loads of the shared data structure should not move
460  * above this load. Hence, use load-acquire.
461  */
462  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
463 
464  /* Check if there are updates available from the writer.
465  * Inform the writer that updates are visible to this reader.
466  * Prior loads of the shared data structure should not move
467  * beyond this store. Hence use store-release.
468  */
469  if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
470  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
471  t, __ATOMIC_RELEASE);
472 
473  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
474  __func__, t, thread_id);
475 }
476 
477 /* Check the quiescent state counter for registered threads only, assuming
478  * that not all threads have registered.
479  */
480 static __rte_always_inline int
481 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
482 {
483  uint32_t i, j, id;
484  uint64_t bmap;
485  uint64_t c;
486  uint64_t *reg_thread_id;
487  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
488 
489  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
490  i < v->num_elems;
491  i++, reg_thread_id++) {
492  /* Load the current registered thread bit map before
493  * loading the reader thread quiescent state counters.
494  */
495  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
496  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
497 
498  while (bmap) {
499  j = __builtin_ctzl(bmap);
500  __RTE_RCU_DP_LOG(DEBUG,
501  "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
502  __func__, t, wait, bmap, id + j);
503  c = __atomic_load_n(
504  &v->qsbr_cnt[id + j].cnt,
505  __ATOMIC_ACQUIRE);
506  __RTE_RCU_DP_LOG(DEBUG,
507  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
508  __func__, t, wait, c, id+j);
509 
510  /* Counter is not checked for wrap-around condition
511  * as it is a 64b counter.
512  */
513  if (unlikely(c !=
514  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
515  /* This thread is not in quiescent state */
516  if (!wait)
517  return 0;
518 
519  rte_pause();
520  /* This thread might have unregistered.
521  * Re-read the bitmap.
522  */
523  bmap = __atomic_load_n(reg_thread_id,
524  __ATOMIC_ACQUIRE);
525 
526  continue;
527  }
528 
529  /* This thread is in quiescent state. Use the counter
530  * to find the least acknowledged token among all the
531  * readers.
532  */
533  if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
534  acked_token = c;
535 
536  bmap &= ~(1UL << j);
537  }
538  }
539 
540  /* All readers are checked, update least acknowledged token.
541  * There might be multiple writers trying to update this. There is
542  * no need to update this very accurately using compare-and-swap.
543  */
544  if (acked_token != __RTE_QSBR_CNT_MAX)
545  __atomic_store_n(&v->acked_token, acked_token,
546  __ATOMIC_RELAXED);
547 
548  return 1;
549 }
550 
551 /* Check the quiescent state counter for all threads, assuming that
552  * all the threads have registered.
553  */
554 static __rte_always_inline int
555 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
556 {
557  uint32_t i;
558  struct rte_rcu_qsbr_cnt *cnt;
559  uint64_t c;
560  uint64_t acked_token = __RTE_QSBR_CNT_MAX;
561 
562  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
563  __RTE_RCU_DP_LOG(DEBUG,
564  "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
565  __func__, t, wait, i);
566  while (1) {
567  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
568  __RTE_RCU_DP_LOG(DEBUG,
569  "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
570  __func__, t, wait, c, i);
571 
572  /* Counter is not checked for wrap-around condition
573  * as it is a 64b counter.
574  */
575  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
576  break;
577 
578  /* This thread is not in quiescent state */
579  if (!wait)
580  return 0;
581 
582  rte_pause();
583  }
584 
585  /* This thread is in quiescent state. Use the counter to find
586  * the least acknowledged token among all the readers.
587  */
588  if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
589  acked_token = c;
590  }
591 
592  /* All readers are checked, update least acknowledged token.
593  * There might be multiple writers trying to update this. There is
594  * no need to update this very accurately using compare-and-swap.
595  */
596  if (acked_token != __RTE_QSBR_CNT_MAX)
597  __atomic_store_n(&v->acked_token, acked_token,
598  __ATOMIC_RELAXED);
599 
600  return 1;
601 }
602 
637 __rte_experimental
638 static __rte_always_inline int
639 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
640 {
641  RTE_ASSERT(v != NULL);
642 
643  /* Check if all the readers have already acknowledged this token */
644  if (likely(t <= v->acked_token))
645  return 1;
646 
647  if (likely(v->num_threads == v->max_threads))
648  return __rte_rcu_qsbr_check_all(v, t, wait);
649  else
650  return __rte_rcu_qsbr_check_selective(v, t, wait);
651 }
652 
674 __rte_experimental
675 void
676 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
677 
696 __rte_experimental
697 int
698 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
699 
700 #ifdef __cplusplus
701 }
702 #endif
703 
704 #endif /* _RTE_RCU_QSBR_H_ */
static __rte_experimental __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:639
__rte_experimental int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:158
#define likely(x)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:241
#define __rte_unused
Definition: rte_common.h:89
__rte_experimental int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:448
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:303
#define unlikely(x)
static void rte_pause(void)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:381
__rte_experimental int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:415
__rte_experimental void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:344
#define __rte_cache_aligned
Definition: rte_common.h:322
static void rte_smp_mb(void)
__rte_experimental size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)