DPDK 25.03.0-rc0
rte_rcu_qsbr.h
Go to the documentation of this file.
1/* SPDX-License-Identifier: BSD-3-Clause
2 * Copyright (c) 2018-2020 Arm Limited
3 */
4
5#ifndef _RTE_RCU_QSBR_H_
6#define _RTE_RCU_QSBR_H_
7
24#include <inttypes.h>
25#include <stdalign.h>
26#include <stdbool.h>
27#include <stdio.h>
28#include <stdint.h>
29
30#include <rte_common.h>
31#include <rte_debug.h>
32#include <rte_atomic.h>
33#include <rte_ring.h>
34
35#ifdef __cplusplus
36extern "C" {
37#endif
38
39extern int rte_rcu_log_type;
40#define RTE_LOGTYPE_RCU rte_rcu_log_type
41
42#if RTE_LOG_DP_LEVEL >= RTE_LOG_DEBUG
43#define __RTE_RCU_DP_LOG(level, ...) \
44 RTE_LOG_DP_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__)
45#else
46#define __RTE_RCU_DP_LOG(level, ...)
47#endif
48
49#if defined(RTE_LIBRTE_RCU_DEBUG)
50#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...) do { \
51 if (v->qsbr_cnt[thread_id].lock_cnt) \
52 RTE_LOG_LINE_PREFIX(level, RCU, "%s(): ", __func__, __VA_ARGS__); \
53} while (0)
54#else
55#define __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, level, ...)
56#endif
57
58/* Registered thread IDs are stored as a bitmap of 64b element array.
59 * Given thread id needs to be converted to index into the array and
60 * the id within the array element.
61 */
62#define __RTE_QSBR_THRID_ARRAY_ELM_SIZE (sizeof(RTE_ATOMIC(uint64_t)) * 8)
63#define __RTE_QSBR_THRID_ARRAY_SIZE(max_threads) \
64 RTE_ALIGN(RTE_ALIGN_MUL_CEIL(max_threads, \
65 __RTE_QSBR_THRID_ARRAY_ELM_SIZE) >> 3, RTE_CACHE_LINE_SIZE)
66#define __RTE_QSBR_THRID_ARRAY_ELM(v, i) ((uint64_t __rte_atomic *) \
67 ((struct rte_rcu_qsbr_cnt *)(v + 1) + v->max_threads) + i)
68#define __RTE_QSBR_THRID_INDEX_SHIFT 6
69#define __RTE_QSBR_THRID_MASK 0x3f
70#define RTE_QSBR_THRID_INVALID 0xffffffff
71
72/* Worker thread counter */
73struct __rte_cache_aligned rte_rcu_qsbr_cnt {
74 RTE_ATOMIC(uint64_t) cnt;
80 RTE_ATOMIC(uint32_t) lock_cnt;
82};
83
84#define __RTE_QSBR_CNT_THR_OFFLINE 0
85#define __RTE_QSBR_CNT_INIT 1
86#define __RTE_QSBR_CNT_MAX ((uint64_t)~0)
87#define __RTE_QSBR_TOKEN_SIZE sizeof(uint64_t)
88
89/* RTE Quiescent State variable structure.
90 * This structure has two elements that vary in size based on the
91 * 'max_threads' parameter.
92 * 1) Quiescent state counter array
93 * 2) Register thread ID array
94 */
95struct __rte_cache_aligned rte_rcu_qsbr {
96 alignas(RTE_CACHE_LINE_SIZE) RTE_ATOMIC(uint64_t) token;
98 RTE_ATOMIC(uint64_t) acked_token;
103 alignas(RTE_CACHE_LINE_SIZE) uint32_t num_elems;
105 RTE_ATOMIC(uint32_t) num_threads;
107 uint32_t max_threads;
110 alignas(RTE_CACHE_LINE_SIZE) struct rte_rcu_qsbr_cnt qsbr_cnt[];
116};
117
131typedef void (*rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n);
132
133#define RTE_RCU_QSBR_DQ_NAMESIZE RTE_RING_NAMESIZE
134
143#define RTE_RCU_QSBR_DQ_MT_UNSAFE 1
144
149 const char *name;
151 uint32_t flags;
153 uint32_t size;
160 uint32_t esize;
182 void *p;
187 struct rte_rcu_qsbr *v;
189};
190
191/* RTE defer queue structure.
192 * This structure holds the defer queue. The defer queue is used to
193 * hold the deleted entries from the data structure that are not
194 * yet freed.
195 */
196struct rte_rcu_qsbr_dq;
197
209size_t
210rte_rcu_qsbr_get_memsize(uint32_t max_threads);
211
226int
227rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads);
228
249int
250rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id);
251
267int
268rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id);
269
295static __rte_always_inline void
296rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
297{
298 uint64_t t;
299
300 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
301
302 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
303 v->qsbr_cnt[thread_id].lock_cnt);
304
305 /* Copy the current value of token.
306 * The fence at the end of the function will ensure that
307 * the following will not move down after the load of any shared
308 * data structure.
309 */
310 t = rte_atomic_load_explicit(&v->token, rte_memory_order_relaxed);
311
312 /* rte_atomic_store_explicit(cnt, rte_memory_order_relaxed) is used to ensure
313 * 'cnt' (64b) is accessed atomically.
314 */
315 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
316 t, rte_memory_order_relaxed);
317
318 /* The subsequent load of the data structure should not
319 * move above the store. Hence a store-load barrier
320 * is required.
321 * If the load of the data structure moves above the store,
322 * writer might not see that the reader is online, even though
323 * the reader is referencing the shared data structure.
324 */
325 rte_atomic_thread_fence(rte_memory_order_seq_cst);
326}
327
348static __rte_always_inline void
349rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
350{
351 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
352
353 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
354 v->qsbr_cnt[thread_id].lock_cnt);
355
356 /* The reader can go offline only after the load of the
357 * data structure is completed. i.e. any load of the
358 * data structure can not move after this store.
359 */
360
361 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
362 __RTE_QSBR_CNT_THR_OFFLINE, rte_memory_order_release);
363}
364
385static __rte_always_inline void
386rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v,
387 __rte_unused unsigned int thread_id)
388{
389 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
390
391#if defined(RTE_LIBRTE_RCU_DEBUG)
392 /* Increment the lock counter */
393 rte_atomic_fetch_add_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
394 1, rte_memory_order_acquire);
395#endif
396}
397
418static __rte_always_inline void
419rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v,
420 __rte_unused unsigned int thread_id)
421{
422 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
423
424#if defined(RTE_LIBRTE_RCU_DEBUG)
425 /* Decrement the lock counter */
426 rte_atomic_fetch_sub_explicit(&v->qsbr_cnt[thread_id].lock_cnt,
427 1, rte_memory_order_release);
428
429 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, WARNING,
430 "Lock counter %u. Nested locks?",
431 v->qsbr_cnt[thread_id].lock_cnt);
432#endif
433}
434
448static __rte_always_inline uint64_t
449rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
450{
451 uint64_t t;
452
453 RTE_ASSERT(v != NULL);
454
455 /* Release the changes to the shared data structure.
456 * This store release will ensure that changes to any data
457 * structure are visible to the workers before the token
458 * update is visible.
459 */
460 t = rte_atomic_fetch_add_explicit(&v->token, 1, rte_memory_order_release) + 1;
461
462 return t;
463}
464
477static __rte_always_inline void
478rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
479{
480 uint64_t t;
481
482 RTE_ASSERT(v != NULL && thread_id < v->max_threads);
483
484 __RTE_RCU_IS_LOCK_CNT_ZERO(v, thread_id, ERR, "Lock counter %u",
485 v->qsbr_cnt[thread_id].lock_cnt);
486
487 /* Acquire the changes to the shared data structure released
488 * by rte_rcu_qsbr_start.
489 * Later loads of the shared data structure should not move
490 * above this load. Hence, use load-acquire.
491 */
492 t = rte_atomic_load_explicit(&v->token, rte_memory_order_acquire);
493
494 /* Check if there are updates available from the writer.
495 * Inform the writer that updates are visible to this reader.
496 * Prior loads of the shared data structure should not move
497 * beyond this store. Hence use store-release.
498 */
499 if (t != rte_atomic_load_explicit(&v->qsbr_cnt[thread_id].cnt, rte_memory_order_relaxed))
500 rte_atomic_store_explicit(&v->qsbr_cnt[thread_id].cnt,
501 t, rte_memory_order_release);
502
503 __RTE_RCU_DP_LOG(DEBUG, "%s: update: token = %" PRIu64 ", Thread ID = %d",
504 __func__, t, thread_id);
505}
506
507/* Check the quiescent state counter for registered threads only, assuming
508 * that not all threads have registered.
509 */
510static __rte_always_inline int
511__rte_rcu_qsbr_check_selective(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
512{
513 uint32_t i, j, id;
514 uint64_t bmap;
515 uint64_t c;
516 RTE_ATOMIC(uint64_t) *reg_thread_id;
517 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
518
519 for (i = 0, reg_thread_id = __RTE_QSBR_THRID_ARRAY_ELM(v, 0);
520 i < v->num_elems;
521 i++, reg_thread_id++) {
522 /* Load the current registered thread bit map before
523 * loading the reader thread quiescent state counters.
524 */
525 bmap = rte_atomic_load_explicit(reg_thread_id, rte_memory_order_acquire);
526 id = i << __RTE_QSBR_THRID_INDEX_SHIFT;
527
528 while (bmap) {
529 j = rte_ctz64(bmap);
530 __RTE_RCU_DP_LOG(DEBUG,
531 "%s: check: token = %" PRIu64 ", wait = %d, Bit Map = 0x%" PRIx64 ", Thread ID = %d",
532 __func__, t, wait, bmap, id + j);
533 c = rte_atomic_load_explicit(
534 &v->qsbr_cnt[id + j].cnt,
535 rte_memory_order_acquire);
536 __RTE_RCU_DP_LOG(DEBUG,
537 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
538 __func__, t, wait, c, id+j);
539
540 /* Counter is not checked for wrap-around condition
541 * as it is a 64b counter.
542 */
543 if (unlikely(c !=
544 __RTE_QSBR_CNT_THR_OFFLINE && c < t)) {
545 /* This thread is not in quiescent state */
546 if (!wait)
547 return 0;
548
549 rte_pause();
550 /* This thread might have unregistered.
551 * Re-read the bitmap.
552 */
553 bmap = rte_atomic_load_explicit(reg_thread_id,
554 rte_memory_order_acquire);
555
556 continue;
557 }
558
559 /* This thread is in quiescent state. Use the counter
560 * to find the least acknowledged token among all the
561 * readers.
562 */
563 if (c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c)
564 acked_token = c;
565
566 bmap &= ~(1UL << j);
567 }
568 }
569
570 /* All readers are checked, update least acknowledged token.
571 * There might be multiple writers trying to update this. There is
572 * no need to update this very accurately using compare-and-swap.
573 */
574 if (acked_token != __RTE_QSBR_CNT_MAX)
575 rte_atomic_store_explicit(&v->acked_token, acked_token,
576 rte_memory_order_relaxed);
577
578 return 1;
579}
580
581/* Check the quiescent state counter for all threads, assuming that
582 * all the threads have registered.
583 */
584static __rte_always_inline int
585__rte_rcu_qsbr_check_all(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
586{
587 uint32_t i;
588 struct rte_rcu_qsbr_cnt *cnt;
589 uint64_t c;
590 uint64_t acked_token = __RTE_QSBR_CNT_MAX;
591
592 for (i = 0, cnt = v->qsbr_cnt; i < v->max_threads; i++, cnt++) {
593 __RTE_RCU_DP_LOG(DEBUG,
594 "%s: check: token = %" PRIu64 ", wait = %d, Thread ID = %d",
595 __func__, t, wait, i);
596 while (1) {
597 c = rte_atomic_load_explicit(&cnt->cnt, rte_memory_order_acquire);
598 __RTE_RCU_DP_LOG(DEBUG,
599 "%s: status: token = %" PRIu64 ", wait = %d, Thread QS cnt = %" PRIu64 ", Thread ID = %d",
600 __func__, t, wait, c, i);
601
602 /* Counter is not checked for wrap-around condition
603 * as it is a 64b counter.
604 */
605 if (likely(c == __RTE_QSBR_CNT_THR_OFFLINE || c >= t))
606 break;
607
608 /* This thread is not in quiescent state */
609 if (!wait)
610 return 0;
611
612 rte_pause();
613 }
614
615 /* This thread is in quiescent state. Use the counter to find
616 * the least acknowledged token among all the readers.
617 */
618 if (likely(c != __RTE_QSBR_CNT_THR_OFFLINE && acked_token > c))
619 acked_token = c;
620 }
621
622 /* All readers are checked, update least acknowledged token.
623 * There might be multiple writers trying to update this. There is
624 * no need to update this very accurately using compare-and-swap.
625 */
626 if (acked_token != __RTE_QSBR_CNT_MAX)
627 rte_atomic_store_explicit(&v->acked_token, acked_token,
628 rte_memory_order_relaxed);
629
630 return 1;
631}
632
664static __rte_always_inline int
665rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
666{
667 uint64_t acked_token;
668
669 RTE_ASSERT(v != NULL);
670
671 /* Check if all the readers have already acknowledged this token */
672 acked_token = rte_atomic_load_explicit(&v->acked_token,
673 rte_memory_order_relaxed);
674 if (likely(t <= acked_token)) {
675 __RTE_RCU_DP_LOG(DEBUG,
676 "%s: check: token = %" PRIu64 ", wait = %d",
677 __func__, t, wait);
678 __RTE_RCU_DP_LOG(DEBUG,
679 "%s: status: least acked token = %" PRIu64,
680 __func__, acked_token);
681 return 1;
682 }
683
684 if (likely(v->num_threads == v->max_threads))
685 return __rte_rcu_qsbr_check_all(v, t, wait);
686 else
687 return __rte_rcu_qsbr_check_selective(v, t, wait);
688}
689
708void
709rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id);
710
726int
727rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v);
728
742struct rte_rcu_qsbr_dq *
744
773int
774rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e);
775
798int
799rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n,
800 unsigned int *freed, unsigned int *pending, unsigned int *available);
801
820int
821rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq);
822
823#ifdef __cplusplus
824}
825#endif
826
827#endif /* _RTE_RCU_QSBR_H_ */
static void rte_atomic_thread_fence(rte_memory_order memorder)
static unsigned int rte_ctz64(uint64_t v)
Definition: rte_bitops.h:1014
#define likely(x)
#define unlikely(x)
#define __rte_cache_aligned
Definition: rte_common.h:627
#define __rte_unused
Definition: rte_common.h:171
#define __rte_always_inline
Definition: rte_common.h:413
static void rte_pause(void)
static __rte_always_inline uint64_t rte_rcu_qsbr_start(struct rte_rcu_qsbr *v)
Definition: rte_rcu_qsbr.h:449
int rte_rcu_qsbr_dq_enqueue(struct rte_rcu_qsbr_dq *dq, void *e)
static __rte_always_inline void rte_rcu_qsbr_unlock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:419
int rte_rcu_qsbr_thread_register(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_thread_online(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:296
int rte_rcu_qsbr_thread_unregister(struct rte_rcu_qsbr *v, unsigned int thread_id)
static __rte_always_inline void rte_rcu_qsbr_lock(__rte_unused struct rte_rcu_qsbr *v, __rte_unused unsigned int thread_id)
Definition: rte_rcu_qsbr.h:386
int rte_rcu_qsbr_init(struct rte_rcu_qsbr *v, uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_thread_offline(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:349
struct rte_rcu_qsbr_dq * rte_rcu_qsbr_dq_create(const struct rte_rcu_qsbr_dq_parameters *params)
int rte_rcu_qsbr_dq_reclaim(struct rte_rcu_qsbr_dq *dq, unsigned int n, unsigned int *freed, unsigned int *pending, unsigned int *available)
void(* rte_rcu_qsbr_free_resource_t)(void *p, void *e, unsigned int n)
Definition: rte_rcu_qsbr.h:131
void rte_rcu_qsbr_synchronize(struct rte_rcu_qsbr *v, unsigned int thread_id)
int rte_rcu_qsbr_dump(FILE *f, struct rte_rcu_qsbr *v)
int rte_rcu_qsbr_dq_delete(struct rte_rcu_qsbr_dq *dq)
static __rte_always_inline int rte_rcu_qsbr_check(struct rte_rcu_qsbr *v, uint64_t t, bool wait)
Definition: rte_rcu_qsbr.h:665
size_t rte_rcu_qsbr_get_memsize(uint32_t max_threads)
static __rte_always_inline void rte_rcu_qsbr_quiescent(struct rte_rcu_qsbr *v, unsigned int thread_id)
Definition: rte_rcu_qsbr.h:478
struct rte_rcu_qsbr * v
Definition: rte_rcu_qsbr.h:187
rte_rcu_qsbr_free_resource_t free_fn
Definition: rte_rcu_qsbr.h:180