DPDK 21.11.9
rte_rcu_qsbr.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018-2020 Arm Limited
3 */
4
5#ifndef _RTE_RCU_QSBR_H_
6#define _RTE_RCU_QSBR_H_
7
28#ifdef __cplusplus
29extern "C" {
30#endif
31
32#include <stdbool.h>
33#include <stdio.h>
34#include <stdint.h>
35#include <inttypes.h>
36#include <errno.h>
37#include <rte_common.h>
38#include <rte_memory.h>
39#include <rte_lcore.h>
40#include <rte_debug.h>
41#include <rte_atomic.h>
42#include <rte_ring.h>
43
44extern int rte_rcu_log_type;
45
46#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
47#define __RTE_RCU_DP_LOG(level, fmt, args...) \
48 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
49 "%s(): " fmt "\n", __func__, ## args)
50#else
51#define __RTE_RCU_DP_LOG(level, fmt, args...)
52#endif
53
54#if defined(RTE_LIBRTE_RCU_DEBUG)
55#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...) do {\
56 if (v->qsbr_cnt[thread_id].lock_cnt) \
57 rte_log(RTE_LOG_ ## level, rte_rcu_log_type, \
58 "%s(): " fmt "\n", __func__, ## args); \
59} while (0)
60#else
61#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, fmt, args...)
62#endif
63
64/* Registered thread IDs are stored as a bitmap of 64b element array.
65 * Given thread id needs to be converted to index into the array and
66 * the id within the array element.
67 */
68#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(uint64_t) * 8)
69#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
70 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
71 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
72#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t *) \
73 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
74#define __RTE_QSBR_THRID_INDEX_SHIFT 6
75#define __RTE_QSBR_THRID_MASK 0x3f
76#define RTE_QSBR_THRID_INVALID 0xffffffff
77
78/* Worker thread counter */
79struct rte_rcu_qsbr_cnt {
80 uint64_t cnt;
86 uint32_t lock_cnt;
89
90#define __RTE_QSBR_CNT_THR_OFFLINE 0
91#define __RTE_QSBR_CNT_INIT 1
92#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
93#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
94
95/* RTE Quiescent State variable structure.
96 * This structure has two elements that vary in size based on the
97 * 'max_threads' parameter.
98 * 1) Quiescent state counter array
99 * 2) Register thread ID array
100 */
101struct rte_rcu_qsbr {
102 uint64_t token __rte_cache_aligned;
104 uint64_t acked_token;
109 uint32_t num_elems __rte_cache_aligned;
111 uint32_t num_threads;
113 uint32_t max_threads;
116 struct rte_rcu_qsbr_cnt qsbr_cnt[0] __rte_cache_aligned;
123
137typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
138
139#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
140
149#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
150
155 const char *name;
157 uint32_t flags;
159 uint32_t size;
166 uint32_t esize;
188 void *p;
193 struct rte_rcu_qsbr *v;
195};
196
197/* RTE defer queue structure.
198 * This structure holds the defer queue. The defer queue is used to
199 * hold the deleted entries from the data structure that are not
200 * yet freed.
201 */
202struct rte_rcu_qsbr_dq;
203
215size_t
216rte_rcu_qsbr_get_memsize(uint32_t max_threads);
217
233int
234rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
235
256int
257rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
258
274int
275rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
276
302static __rte_always_inline void
303rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
304{
305 uint64_t t;
306
307 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
308
309 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
310 v->qsbr_cnt[thread_id].lock_cnt);
311
312 /* Copy the current value of token.
313 * The fence at the end of the function will ensure that
314 * the following will not move down after the load of any shared
315 * data structure.
316 */
317 t = __atomic_load_n(&v->token, __ATOMIC_RELAXED);
318
319 /* __atomic_store_n(cnt, __ATOMIC_RELAXED) is used to ensure
320 * 'cnt' (64b) is accessed atomically.
321 */
322 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
323 t, __ATOMIC_RELAXED);
324
325 /* The subsequent load of the data structure should not
326 * move above the store. Hence a store-load barrier
327 * is required.
328 * If the load of the data structure moves above the store,
329 * writer might not see that the reader is online, even though
330 * the reader is referencing the shared data structure.
331 */
332 rte_atomic_thread_fence(__ATOMIC_SEQ_CST);
333}
334
355static __rte_always_inline void
356rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
357{
358 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
359
360 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
361 v->qsbr_cnt[thread_id].lock_cnt);
362
363 /* The reader can go offline only after the load of the
364 * data structure is completed. i.e. any load of the
365 * data structure can not move after this store.
366 */
367
368 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
369 __RTE_QSBR_CNT_THR_OFFLINE, __ATOMIC_RELEASE);
370}
371
392static __rte_always_inline void
393rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
394 __rte_unused unsigned int thread_id)
395{
396 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
397
398#if defined(RTE_LIBRTE_RCU_DEBUG)
399 /* Increment the lock counter */
400 __atomic_fetch_add(&v->qsbr_cnt[thread_id].lock_cnt,
401 1, __ATOMIC_ACQUIRE);
402#endif
403}
404
425static __rte_always_inline void
426rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
427 __rte_unused unsigned int thread_id)
428{
429 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
430
431#if defined(RTE_LIBRTE_RCU_DEBUG)
432 /* Decrement the lock counter */
433 __atomic_fetch_sub(&v->qsbr_cnt[thread_id].lock_cnt,
434 1, __ATOMIC_RELEASE);
435
436 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
437 "Lock counter %u. Nested locks?\n",
438 v->qsbr_cnt[thread_id].lock_cnt);
439#endif
440}
441
455static __rte_always_inline uint64_t
456rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
457{
458 uint64_t t;
459
460 RTE_ASSERT(v != NULL);
461
462 /* Release the changes to the shared data structure.
463 * This store release will ensure that changes to any data
464 * structure are visible to the workers before the token
465 * update is visible.
466 */
467 t = __atomic_add_fetch(&v->token, 1, __ATOMIC_RELEASE);
468
469 return t;
470}
471
484static __rte_always_inline void
485rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
486{
487 uint64_t t;
488
489 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
490
491 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u\n",
492 v->qsbr_cnt[thread_id].lock_cnt);
493
494 /* Acquire the changes to the shared data structure released
495 * by rte_rcu_qsbr_start.
496 * Later loads of the shared data structure should not move
497 * above this load. Hence, use load-acquire.
498 */
499 t = __atomic_load_n(&v->token, __ATOMIC_ACQUIRE);
500
501 /* Check if there are updates available from the writer.
502 * Inform the writer that updates are visible to this reader.
503 * Prior loads of the shared data structure should not move
504 * beyond this store. Hence use store-release.
505 */
506 if (t != __atomic_load_n(&v->qsbr_cnt[thread_id].cnt, __ATOMIC_RELAXED))
507 __atomic_store_n(&v->qsbr_cnt[thread_id].cnt,
508 t, __ATOMIC_RELEASE);
509
510 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
511 __func__, t, thread_id);
512}
513
514/* Check the quiescent state counter for registered threads only, assuming
515 * that not all threads have registered.
516 */
517static __rte_always_inline int
518__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
519{
520 uint32_t i, j, id;
521 uint64_t bmap;
522 uint64_t c;
523 uint64_t *reg_thread_id;
524 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
525
526 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
527 i < v->num_elems;
528 i++, reg_thread_id++) {
529 /* Load the current registered thread bit map before
530 * loading the reader thread quiescent state counters.
531 */
532 bmap = __atomic_load_n(reg_thread_id, __ATOMIC_ACQUIRE);
533 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
534
535 while (bmap) {
536 j = __builtin_ctzl(bmap);
537 __RTE_RCU_DP_LOG(DEBUG,
538 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
539 __func__, t, wait, bmap, id + j);
540 c = __atomic_load_n(
541 &v->qsbr_cnt[id + j].cnt,
542 __ATOMIC_ACQUIRE);
543 __RTE_RCU_DP_LOG(DEBUG,
544 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
545 __func__, t, wait, c, id+j);
546
547 /* Counter is not checked for wrap-around condition
548 * as it is a 64b counter.
549 */
550 if (unlikely(c !=
551 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
552 /* This thread is not in quiescent state */
553 if (!wait)
554 return 0;
555
556 rte_pause();
557 /* This thread might have unregistered.
558 * Re-read the bitmap.
559 */
560 bmap = __atomic_load_n(reg_thread_id,
561 __ATOMIC_ACQUIRE);
562
563 continue;
564 }
565
566 /* This thread is in quiescent state. Use the counter
567 * to find the least acknowledged token among all the
568 * readers.
569 */
570 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
571 acked_token = c;
572
573 bmap &= ~(1UL << j);
574 }
575 }
576
577 /* All readers are checked, update least acknowledged token.
578 * There might be multiple writers trying to update this. There is
579 * no need to update this very accurately using compare-and-swap.
580 */
581 if (acked_token != __RTE_QSBR_CNT_MAX)
582 __atomic_store_n(&v->acked_token, acked_token,
583 __ATOMIC_RELAXED);
584
585 return 1;
586}
587
588/* Check the quiescent state counter for all threads, assuming that
589 * all the threads have registered.
590 */
591static __rte_always_inline int
592__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
593{
594 uint32_t i;
595 struct rte_rcu_qsbr_cnt *cnt;
596 uint64_t c;
597 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
598
599 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
600 __RTE_RCU_DP_LOG(DEBUG,
601 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
602 __func__, t, wait, i);
603 while (1) {
604 c = __atomic_load_n(&cnt->cnt, __ATOMIC_ACQUIRE);
605 __RTE_RCU_DP_LOG(DEBUG,
606 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
607 __func__, t, wait, c, i);
608
609 /* Counter is not checked for wrap-around condition
610 * as it is a 64b counter.
611 */
612 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
613 break;
614
615 /* This thread is not in quiescent state */
616 if (!wait)
617 return 0;
618
619 rte_pause();
620 }
621
622 /* This thread is in quiescent state. Use the counter to find
623 * the least acknowledged token among all the readers.
624 */
625 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
626 acked_token = c;
627 }
628
629 /* All readers are checked, update least acknowledged token.
630 * There might be multiple writers trying to update this. There is
631 * no need to update this very accurately using compare-and-swap.
632 */
633 if (acked_token != __RTE_QSBR_CNT_MAX)
634 __atomic_store_n(&v->acked_token, acked_token,
635 __ATOMIC_RELAXED);
636
637 return 1;
638}
639
671static __rte_always_inline int
672rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
673{
674 RTE_ASSERT(v != NULL);
675
676 /* Check if all the readers have already acknowledged this token */
677 if (likely(t <= v->acked_token)) {
678 __RTE_RCU_DP_LOG(DEBUG,
679 "%s: check: token = %" PRIu64 ", wait = %d",
680 __func__, t, wait);
681 __RTE_RCU_DP_LOG(DEBUG,
682 "%s: status: least acked token = %" PRIu64,
683 __func__, v->acked_token);
684 return 1;
685 }
686
687 if (likely(v->num_threads == v->max_threads))
688 return __rte_rcu_qsbr_check_all(v, t, wait);
689 else
690 return __rte_rcu_qsbr_check_selective(v, t, wait);
691}
692
711void
712rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
713
729int
730rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
731
748__rte_experimental
749struct rte_rcu_qsbr_dq *
751
783__rte_experimental
784int
785rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
786
812__rte_experimental
813int
814rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
815 unsigned int *freed, unsigned int *pending, unsigned int *available);
816
838__rte_experimental
839int
840rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
841
842#ifdef __cplusplus
843}
844#endif
845
846#endif /* _RTE_RCU_QSBR_H_ */
static void rte_atomic_thread_fence(int memorder)
#define likely(x)
#define unlikely(x)
#define __rte_cache_aligned
Definition: rte_common.h:420
#define __rte_unused
Definition: rte_common.h:123
#define __rte_always_inline
Definition: rte_common.h:233
static void rte_pause(void)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:456
__rte_experimental struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:426
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:303
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:393
__rte_experimental int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:356
__rte_experimental int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
__rte_experimental int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:137
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:672
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:485
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:193
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:186