DPDK 24.11.1
examples/pipeline/thread.c
/* SPDX-License-Identifier: BSD-3-Clause
* Copyright(c) 2020 Intel Corporation
*/
#include <stdlib.h>
#include <errno.h>
#include <rte_atomic.h>
#include <rte_common.h>
#include <rte_lcore.h>
#include "obj.h"
#include "thread.h"
#ifndef THREAD_PIPELINES_MAX
#define THREAD_PIPELINES_MAX 256
#endif
#ifndef THREAD_BLOCKS_MAX
#define THREAD_BLOCKS_MAX 256
#endif
/* Pipeline instruction quanta: Needs to be big enough to do some meaningful
* work, but not too big to avoid starving any other pipelines mapped to the
* same thread. For a pipeline that executes 10 instructions per packet, a
* quanta of 1000 instructions equates to processing 100 packets.
*/
#ifndef PIPELINE_INSTR_QUANTA
#define PIPELINE_INSTR_QUANTA 1000
#endif
struct block {
block_run_f block_func;
void *block;
};
struct __rte_cache_aligned thread {
struct rte_swx_pipeline *pipelines[THREAD_PIPELINES_MAX];
struct block *blocks[THREAD_BLOCKS_MAX];
volatile uint64_t n_pipelines;
volatile uint64_t n_blocks;
int enabled;
};
static struct thread threads[RTE_MAX_LCORE];
int
thread_init(void)
{
uint32_t thread_id;
int status = 0;
struct thread *t = &threads[thread_id];
uint32_t i;
t->enabled = 1;
for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
struct block *b;
b = calloc(1, sizeof(struct block));
if (!b) {
status = -ENOMEM;
goto error;
}
t->blocks[i] = b;
}
}
return 0;
error:
struct thread *t = &threads[thread_id];
uint32_t i;
t->enabled = 0;
for (i = 0; i < THREAD_BLOCKS_MAX; i++) {
free(t->blocks[i]);
t->blocks[i] = NULL;
}
}
return status;
}
static uint32_t
pipeline_find(struct rte_swx_pipeline *p)
{
uint32_t thread_id;
for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
struct thread *t = &threads[thread_id];
uint32_t i;
if (!t->enabled)
continue;
for (i = 0; i < t->n_pipelines; i++)
if (t->pipelines[i] == p)
break;
}
return thread_id;
}
static uint32_t
block_find(void *b)
{
uint32_t thread_id;
for (thread_id = 0; thread_id < RTE_MAX_LCORE; thread_id++) {
struct thread *t = &threads[thread_id];
uint32_t i;
if (!t->enabled)
continue;
for (i = 0; i < t->n_blocks; i++)
if (t->blocks[i]->block == b)
break;
}
return thread_id;
}
int
pipeline_enable(struct rte_swx_pipeline *p, uint32_t thread_id)
{
struct thread *t;
uint64_t n_pipelines;
/* Check input params */
if (!p || thread_id >= RTE_MAX_LCORE)
return -EINVAL;
if (pipeline_find(p) < RTE_MAX_LCORE)
return -EEXIST;
t = &threads[thread_id];
if (!t->enabled)
return -EINVAL;
n_pipelines = t->n_pipelines;
/* Check there is room for at least one more pipeline. */
if (n_pipelines >= THREAD_PIPELINES_MAX)
return -ENOSPC;
/* Install the new pipeline. */
t->pipelines[n_pipelines] = p;
t->n_pipelines = n_pipelines + 1;
return 0;
}
void
pipeline_disable(struct rte_swx_pipeline *p)
{
struct thread *t;
uint64_t n_pipelines;
uint32_t thread_id, i;
/* Check input params */
if (!p)
return;
/* Find the thread that runs this pipeline. */
thread_id = pipeline_find(p);
if (thread_id == RTE_MAX_LCORE)
return;
t = &threads[thread_id];
n_pipelines = t->n_pipelines;
for (i = 0; i < n_pipelines; i++) {
struct rte_swx_pipeline *pipeline = t->pipelines[i];
if (pipeline != p)
continue;
if (i < n_pipelines - 1) {
struct rte_swx_pipeline *pipeline_last = t->pipelines[n_pipelines - 1];
t->pipelines[i] = pipeline_last;
}
t->n_pipelines = n_pipelines - 1;
return;
}
return;
}
int
block_enable(block_run_f block_func, void *block, uint32_t thread_id)
{
struct thread *t;
uint64_t n_blocks;
/* Check input params */
if (!block_func || !block || thread_id >= RTE_MAX_LCORE)
return -EINVAL;
if (block_find(block) < RTE_MAX_LCORE)
return -EEXIST;
t = &threads[thread_id];
if (!t->enabled)
return -EINVAL;
n_blocks = t->n_blocks;
/* Check there is room for at least one more block. */
if (n_blocks >= THREAD_BLOCKS_MAX)
return -ENOSPC;
/* Install the new block. */
t->blocks[n_blocks]->block_func = block_func;
t->blocks[n_blocks]->block = block;
t->n_blocks = n_blocks + 1;
return 0;
}
void
block_disable(void *block)
{
struct thread *t;
uint64_t n_blocks;
uint32_t thread_id, i;
/* Check input params */
if (!block)
return;
/* Find the thread that runs this block. */
thread_id = block_find(block);
if (thread_id == RTE_MAX_LCORE)
return;
t = &threads[thread_id];
n_blocks = t->n_blocks;
for (i = 0; i < n_blocks; i++) {
struct block *b = t->blocks[i];
if (block != b->block)
continue;
if (i < n_blocks - 1) {
struct block *block_last = t->blocks[n_blocks - 1];
t->blocks[i] = block_last;
}
t->n_blocks = n_blocks - 1;
t->blocks[n_blocks - 1] = b;
return;
}
}
int
thread_main(void *arg __rte_unused)
{
struct thread *t;
uint32_t thread_id;
thread_id = rte_lcore_id();
t = &threads[thread_id];
/* Dispatch loop. */
for ( ; ; ) {
uint32_t i;
/* Pipelines. */
for (i = 0; i < t->n_pipelines; i++)
rte_swx_pipeline_run(t->pipelines[i], PIPELINE_INSTR_QUANTA);
/* Blocks. */
for (i = 0; i < t->n_blocks; i++) {
struct block *b = t->blocks[i];
b->block_func(b->block);
}
}
return 0;
}
static void rte_wmb(void)
#define __rte_cache_aligned
Definition: rte_common.h:627
#define __rte_unused
Definition: rte_common.h:171
#define RTE_LCORE_FOREACH_WORKER(i)
Definition: rte_lcore.h:225
static unsigned rte_lcore_id(void)
Definition: rte_lcore.h:78
__rte_experimental void rte_swx_pipeline_run(struct rte_swx_pipeline *p, uint32_t n_instructions)