DPDK  19.08.2
rte_rcu_qsbr.h
Go to the documentation of this file.
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright (c) 2018 Arm Limited
3  */
4 
5 #ifndef _RTE_RCU_QSBR_H_
6 #define _RTE_RCU_QSBR_H_
7 
23 #ifdef __cplusplus
24 extern "C" {
25 #endif
26 
27 #include <stdbool.h>
28 #include <stdio.h>
29 #include <stdint.h>
30 #include <inttypes.h>
31 #include <errno.h>
32 #include <rte_common.h>
33 #include <rte_memory.h>
34 #include <rte_lcore.h>
35 #include <rte_debug.h>
36 #include <rte_atomic.h>
37 
38 extern int rte_rcu_log_type;
39 
40 #if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
41 #define __RTE_RCU_DP_LOG(level, fmt, args...) \
42  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
43  "%s(): " fmt "\n", __func__, ## args)
44 #else
45 #define __RTE_RCU_DP_LOG(level, fmt, args...)
46 #endif
47 
48 #if defined(RTE_LIBRTE_RCU_DEBUG)
49 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
50  if (v->qsbr_cnt[thread_id].lock_cnt) \
51  rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
52  "%s(): " fmt "\n", __func__, ## args); \
53 } while (0)
54 #else
55 #define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
56 #endif
57 
58 /* Registered thread IDs are stored as a bitmap of 64b element array.
59  * Given thread id needs to be converted to index into the array and
60  * the id within the array element.
61  */
62 #define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
63 #define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64  RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65  __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66 #define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
67  ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68 #define __RTE_QSBR_THRID_INDEX_SHIFT 6
69 #define __RTE_QSBR_THRID_MASK 0x3f
70 #define RTE_QSBR_THRID_INVALID 0xffffffff
71 
72 /* Worker thread counter */
73 struct rte_rcu_qsbr_cnt {
74  uint64_t cnt;
80  uint32_t lock_cnt;
83 
84 #define __RTE_QSBR_CNT_THR_OFFLINE 0
85 #define __RTE_QSBR_CNT_INIT 1
86 
87 /* RTE Quiescent State variable structure.
88  * This structure has two elements that vary in size based on the
89  * 'max_threads' parameter.
90  * 1) Quiescent state counter array
91  * 2) Register thread ID array
92  */
93 struct rte_rcu_qsbr {
94  uint64_t token __rte_cache_aligned;
97  uint32_t num_elems __rte_cache_aligned;
99  uint32_t num_threads;
101  uint32_t max_threads;
104  struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
111 
126 __rte_experimental
127 size_t
128 rte_rcu_qsbr_get_memsize(uint32_t max_threads);
129 
148 __rte_experimental
149 int
150 rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
151 
175 __rte_experimental
176 int
177 rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
178 
197 __rte_experimental
198 int
199 rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
200 
229 __rte_experimental
230 static __rte_always_inline void
231 rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
232 {
233  uint64_t t;
234 
235  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
236 
237  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
238  v->qsbr_cnt[thread_id].lock_cnt);
239 
240  /* Copy the current value of token.
241  * The fence at the end of the function will ensure that
242  * the following will not move down after the load of any shared
243  * data structure.
244  */
245  t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
246 
247  /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
248  * 'cnt' (64b) is accessed atomically.
249  */
250  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
251  t, __ATOMIC_RELAXED);
252 
253  /* The subsequent load of the data structure should not
254  * move above the store. Hence a store-load barrier
255  * is required.
256  * If the load of the data structure moves above the store,
257  * writer might not see that the reader is online, even though
258  * the reader is referencing the shared data structure.
259  */
260 #ifdef RTE_ARCH_X86_64
261  /* rte_smp_mb() for x86 is lighter */
262  rte_smp_mb();
263 #else
264  __atomic_thread_fence(__ATOMIC_SEQ_CST);
265 #endif
266 }
267 
291 __rte_experimental
292 static __rte_always_inline void
293 rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
294 {
295  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
296 
297  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
298  v->qsbr_cnt[thread_id].lock_cnt);
299 
300  /* The reader can go offline only after the load of the
301  * data structure is completed. i.e. any load of the
302  * data strcture can not move after this store.
303  */
304 
305  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
306  __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
307 }
308 
332 __rte_experimental
333 static __rte_always_inline void
334 rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
335  __rte_unused unsigned int thread_id)
336 {
337  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
338 
339 #if defined(RTE_LIBRTE_RCU_DEBUG)
340  /* Increment the lock counter */
341  __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
342  1, __ATOMIC_ACQUIRE);
343 #endif
344 }
345 
369 __rte_experimental
370 static __rte_always_inline void
371 rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
372  __rte_unused unsigned int thread_id)
373 {
374  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
375 
376 #if defined(RTE_LIBRTE_RCU_DEBUG)
377  /* Decrement the lock counter */
378  __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
379  1, __ATOMIC_RELEASE);
380 
381  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
382  "Lock counter %u. Nested locks?\n",
383  v->qsbr_cnt[thread_id].lock_cnt);
384 #endif
385 }
386 
403 __rte_experimental
404 static __rte_always_inline uint64_t
405 rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
406 {
407  uint64_t t;
408 
409  RTE_ASSERT(v != NULL);
410 
411  /* Release the changes to the shared data structure.
412  * This store release will ensure that changes to any data
413  * structure are visible to the workers before the token
414  * update is visible.
415  */
416  t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
417 
418  return t;
419 }
420 
436 __rte_experimental
437 static __rte_always_inline void
438 rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
439 {
440  uint64_t t;
441 
442  RTE_ASSERT(v != NULL && thread_id < v->max_threads);
443 
444  __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
445  v->qsbr_cnt[thread_id].lock_cnt);
446 
447  /* Acquire the changes to the shared data structure released
448  * by rte_rcu_qsbr_start.
449  * Later loads of the shared data structure should not move
450  * above this load. Hence, use load-acquire.
451  */
452  t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
453 
454  /* Inform the writer that updates are visible to this reader.
455  * Prior loads of the shared data structure should not move
456  * beyond this store. Hence use store-release.
457  */
458  __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
459  t, __ATOMIC_RELEASE);
460 
461  __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %"PRIu64", Thread ID = %d",
462  __func__, t, thread_id);
463 }
464 
465 /* Check the quiescent state counter for registered threads only, assuming
466  * that not all threads have registered.
467  */
468 static __rte_always_inline int
469 __rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
470 {
471  uint32_t i, j, id;
472  uint64_t bmap;
473  uint64_t c;
474  uint64_t *reg_thread_id;
475 
476  for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
477  i < v->num_elems;
478  i++, reg_thread_id++) {
479  /* Load the current registered thread bit map before
480  * loading the reader thread quiescent state counters.
481  */
482  bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
483  id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
484 
485  while (bmap) {
486  j = __builtin_ctzl(bmap);
487  __RTE_RCU_DP_LOG(DEBUG,
488  "%s: check: token = %"PRIu64", wait = %d, Bit Map = 0x%"PRIx64", Thread ID = %d",
489  __func__, t, wait, bmap, id + j);
490  c = __atomic_load_n(
491  &v->qsbr_cnt[id + j].cnt,
492  __ATOMIC_ACQUIRE);
493  __RTE_RCU_DP_LOG(DEBUG,
494  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
495  __func__, t, wait, c, id+j);
496  /* Counter is not checked for wrap-around condition
497  * as it is a 64b counter.
498  */
499  if (unlikely(c !=
500  __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
501  /* This thread is not in quiescent state */
502  if (!wait)
503  return 0;
504 
505  rte_pause();
506  /* This thread might have unregistered.
507  * Re-read the bitmap.
508  */
509  bmap = __atomic_load_n(reg_thread_id,
510  __ATOMIC_ACQUIRE);
511 
512  continue;
513  }
514 
515  bmap &= ~(1UL << j);
516  }
517  }
518 
519  return 1;
520 }
521 
522 /* Check the quiescent state counter for all threads, assuming that
523  * all the threads have registered.
524  */
525 static __rte_always_inline int
526 __rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
527 {
528  uint32_t i;
529  struct rte_rcu_qsbr_cnt *cnt;
530  uint64_t c;
531 
532  for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
533  __RTE_RCU_DP_LOG(DEBUG,
534  "%s: check: token = %"PRIu64", wait = %d, Thread ID = %d",
535  __func__, t, wait, i);
536  while (1) {
537  c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
538  __RTE_RCU_DP_LOG(DEBUG,
539  "%s: status: token = %"PRIu64", wait = %d, Thread QS cnt = %"PRIu64", Thread ID = %d",
540  __func__, t, wait, c, i);
541  /* Counter is not checked for wrap-around condition
542  * as it is a 64b counter.
543  */
544  if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
545  break;
546 
547  /* This thread is not in quiescent state */
548  if (!wait)
549  return 0;
550 
551  rte_pause();
552  }
553  }
554 
555  return 1;
556 }
557 
592 __rte_experimental
593 static __rte_always_inline int
594 rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
595 {
596  RTE_ASSERT(v != NULL);
597 
598  if (likely(v->num_threads == v->max_threads))
599  return __rte_rcu_qsbr_check_all(v, t, wait);
600  else
601  return __rte_rcu_qsbr_check_selective(v, t, wait);
602 }
603 
625 __rte_experimental
626 void
627 rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
628 
647 __rte_experimental
648 int
649 rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
650 
651 #ifdef __cplusplus
652 }
653 #endif
654 
655 #endif /* _RTE_RCU_QSBR_H_ */
static __rte_experimental __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:594
__rte_experimental int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_always_inline
Definition: rte_common.h:153
#define likely(x)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:231
#define __rte_unused
Definition: rte_common.h:84
__rte_experimental int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:438
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:293
#define unlikely(x)
static void rte_pause(void)
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:371
__rte_experimental int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_experimental __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:405
__rte_experimental void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
#define __rte_cache_aligned
Definition: rte_memory.h:64
static __rte_experimental __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:334
static void rte_smp_mb(void)
__rte_experimental size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
__rte_experimental int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)