13. Packet copying using DMAdev library

13.1. Overview

This sample is intended as a demonstration of the basic components of a DPDK forwarding application and an example of how to use the DMAdev API to make a packet copy application.

Also, while forwarding, the MAC addresses are affected as follows:

  • The source MAC address is replaced by the TX port MAC address

  • The destination MAC address is replaced by 02:00:00:00:00:TX_PORT_ID

This application can be used to compare performance of using software packet copy with copy done using a DMA device for different sizes of packets. The example will print out statistics each second. The stats shows received/send packets and packets dropped or failed to copy.

13.2. Compiling the Application

To compile the sample application, see Compiling the Sample Applications.

The application is located in the dma sub-directory.

13.3. Running the Application

In order to run the hardware copy application, the copying device needs to be bound to user-space IO driver.

Refer to the Direct Memory Access (DMA) Device Library for information on using the library.

The application requires a number of command line options:

./<build_dir>/examples/dpdk-dma [EAL options] -- [-p MASK] [-q NQ] [-s RS] [-c <sw|hw>]
    [--[no-]mac-updating] [-b BS] [-f FS] [-i SI]

where,

  • p MASK: A hexadecimal bitmask of the ports to configure (default is all)

  • q NQ: Number of Rx queues used per port equivalent to DMA channels per port (default is 1)

  • c CT: Performed packet copy type: software (sw) or hardware using DMA (hw) (default is hw)

  • s RS: Size of dmadev descriptor ring for hardware copy mode or rte_ring for software copy mode (default is 2048)

  • –[no-]mac-updating: Whether MAC address of packets should be changed or not (default is mac-updating)

  • b BS: set the DMA batch size

  • f FS: set the max frame size

  • i SI: set the interval, in second, between statistics prints (default is 1)

The application can be launched in various configurations depending on the provided parameters. The app can use up to 2 lcores: one of them receives incoming traffic and makes a copy of each packet. The second lcore then updates the MAC address and sends the copy. If one lcore per port is used, both operations are done sequentially. For each configuration, an additional lcore is needed since the main lcore does not handle traffic but is responsible for configuration, statistics printing and safe shutdown of all ports and devices.

The application can use a maximum of 8 ports.

To run the application in a Linux environment with 3 lcores (the main lcore, plus two forwarding cores), a single port (port 0), software copying and MAC updating issue the command:

$ ./<build_dir>/examples/dpdk-dma -l 0-2 -n 2 -- -p 0x1 --mac-updating -c sw

To run the application in a Linux environment with 2 lcores (the main lcore, plus one forwarding core), 2 ports (ports 0 and 1), hardware copying and no MAC updating issue the command:

$ ./<build_dir>/examples/dpdk-dma -l 0-1 -n 1 -- -p 0x3 --no-mac-updating -c hw

Refer to the DPDK Getting Started Guide for general information on running applications and the Environment Abstraction Layer (EAL) options.

13.4. Explanation

The following sections provide an explanation of the main components of the code.

All DPDK library functions used in the sample code are prefixed with rte_ and are explained in detail in the DPDK API Documentation.

13.4.1. The Main Function

The main() function performs the initialization and calls the execution threads for each lcore.

The first task is to initialize the Environment Abstraction Layer (EAL). The argc and argv arguments are provided to the rte_eal_init() function. The value returned is the number of parsed arguments:

ret = rte_eal_init(argc, argv);
if (ret < 0)
	rte_exit(EXIT_FAILURE, "Invalid EAL arguments\n");

The main() also allocates a mempool to hold the mbufs (Message Buffers) used by the application:

nb_mbufs = RTE_MAX(nb_ports * (nb_queues * (nb_rxd + nb_txd +
	4 * MAX_PKT_BURST + ring_size) + ring_size +
	rte_lcore_count() * MEMPOOL_CACHE_SIZE),
	MIN_POOL_SIZE);

/* Create the mbuf pool */
sz = max_frame_size + RTE_PKTMBUF_HEADROOM;
sz = RTE_MAX(sz, (size_t)RTE_MBUF_DEFAULT_BUF_SIZE);
dma_pktmbuf_pool = rte_pktmbuf_pool_create("mbuf_pool", nb_mbufs,
	MEMPOOL_CACHE_SIZE, 0, sz, rte_socket_id());
if (dma_pktmbuf_pool == NULL)
	rte_exit(EXIT_FAILURE, "Cannot init mbuf pool\n");

Mbufs are the packet buffer structure used by DPDK. They are explained in detail in the “Mbuf Library” section of the DPDK Programmer’s Guide.

The main() function also initializes the ports:

cfg.nb_ports = 0;
RTE_ETH_FOREACH_DEV(portid)
	port_init(portid, dma_pktmbuf_pool, nb_queues);

Each port is configured using port_init() function. The Ethernet ports are configured with local settings using the rte_eth_dev_configure() function and the port_conf struct. The RSS is enabled so that multiple Rx queues could be used for packet receiving and copying by multiple DMA channels per port:

static const struct rte_eth_conf port_conf = {
	.rxmode = {
		.mq_mode = RTE_ETH_MQ_RX_RSS,
	},
	.rx_adv_conf = {
		.rss_conf = {
			.rss_key = NULL,
			.rss_hf = RTE_ETH_RSS_PROTO_MASK,
		}
	}
};

For this example, the ports are set up with the number of Rx queues provided with -q option and 1 Tx queue using the rte_eth_rx_queue_setup() and rte_eth_tx_queue_setup() functions.

The Ethernet port is then started:

ret = rte_eth_dev_start(portid);
if (ret < 0)
	rte_exit(EXIT_FAILURE,
		"rte_eth_dev_start:err=%d, port=%u\n",
		ret, portid);

Finally, the Rx port is set in promiscuous mode:

rte_eth_promiscuous_enable(portid);

After that, each port application assigns resources needed.

while (!check_link_status(dma_enabled_port_mask) && !force_quit)
	sleep(1);

/* Check if there is enough lcores for all ports. */
cfg.nb_lcores = rte_lcore_count() - 1;
if (cfg.nb_lcores < 1)
	rte_exit(EXIT_FAILURE,
		"There should be at least one worker lcore.\n");

if (copy_mode == COPY_MODE_DMA_NUM)
	assign_dmadevs();

assign_rings();

Ring structures are assigned for exchanging packets between lcores for both SW and HW copy modes.

static void
assign_rings(void)
{
	uint32_t i;

	for (i = 0; i < cfg.nb_ports; i++) {
		char ring_name[RTE_RING_NAMESIZE];

		snprintf(ring_name, sizeof(ring_name), "rx_to_tx_ring_%u", i);
		/* Create ring for inter core communication */
		cfg.ports[i].rx_to_tx_ring = rte_ring_create(
			ring_name, ring_size,
			rte_socket_id(), RING_F_SP_ENQ | RING_F_SC_DEQ);

		if (cfg.ports[i].rx_to_tx_ring == NULL)
			rte_exit(EXIT_FAILURE, "Ring create failed: %s\n",
				rte_strerror(rte_errno));
	}
}

When using hardware copy each Rx queue of the port is assigned a DMA device (assign_dmadevs()) using DMAdev library API functions:

static void
assign_dmadevs(void)
{
	uint16_t nb_dmadev = 0;
	int16_t dev_id = rte_dma_next_dev(0);
	uint32_t i, j;

	for (i = 0; i < cfg.nb_ports; i++) {
		for (j = 0; j < cfg.ports[i].nb_queues; j++) {
			if (dev_id == -1)
				goto end;

			cfg.ports[i].dmadev_ids[j] = dev_id;
			configure_dmadev_queue(cfg.ports[i].dmadev_ids[j]);
			dev_id = rte_dma_next_dev(dev_id + 1);
			++nb_dmadev;
		}
	}
end:
	if (nb_dmadev < cfg.nb_ports * cfg.ports[0].nb_queues)
		rte_exit(EXIT_FAILURE,
			"Not enough dmadevs (%u) for all queues (%u).\n",
			nb_dmadev, cfg.nb_ports * cfg.ports[0].nb_queues);
	RTE_LOG(INFO, DMA, "Number of used dmadevs: %u.\n", nb_dmadev);
}

The initialization of hardware device is done by rte_dma_configure() and rte_dma_vchan_setup() functions using the rte_dma_conf and rte_dma_vchan_conf structs. After configuration the device is started using rte_dma_start() function. Each of the above operations is done in configure_dmadev_queue().

static void
configure_dmadev_queue(uint32_t dev_id)
{
	struct rte_dma_info info;
	struct rte_dma_conf dev_config = { .nb_vchans = 1 };
	struct rte_dma_vchan_conf qconf = {
		.direction = RTE_DMA_DIR_MEM_TO_MEM,
		.nb_desc = ring_size
	};
	uint16_t vchan = 0;

	if (rte_dma_configure(dev_id, &dev_config) != 0)
		rte_exit(EXIT_FAILURE, "Error with rte_dma_configure()\n");

	if (rte_dma_vchan_setup(dev_id, vchan, &qconf) != 0) {
		printf("Error with queue configuration\n");
		rte_panic();
	}
	rte_dma_info_get(dev_id, &info);
	if (info.nb_vchans != 1) {
		printf("Error, no configured queues reported on device id %u\n", dev_id);
		rte_panic();
	}
	if (rte_dma_start(dev_id) != 0)
		rte_exit(EXIT_FAILURE, "Error with rte_dma_start()\n");
}

If initialization is successful, memory for hardware device statistics is allocated.

Finally, the main() function starts all packet handling lcores and starts printing stats in a loop on the main lcore. The application can be interrupted and closed using Ctrl-C. The main lcore waits for all worker lcores to finish, deallocates resources and exits.

The processing lcores launching function are described below.

13.4.2. The Lcores Launching Functions

As described above, main() function invokes start_forwarding_cores() function in order to start processing for each lcore:

static void start_forwarding_cores(void)
{
	uint32_t lcore_id = rte_lcore_id();

	RTE_LOG(INFO, DMA, "Entering %s on lcore %u\n",
		__func__, rte_lcore_id());

	if (cfg.nb_lcores == 1) {
		lcore_id = rte_get_next_lcore(lcore_id, true, true);
		rte_eal_remote_launch((lcore_function_t *)rxtx_main_loop,
			NULL, lcore_id);
	} else if (cfg.nb_lcores > 1) {
		lcore_id = rte_get_next_lcore(lcore_id, true, true);
		rte_eal_remote_launch((lcore_function_t *)rx_main_loop,
			NULL, lcore_id);

		lcore_id = rte_get_next_lcore(lcore_id, true, true);
		rte_eal_remote_launch((lcore_function_t *)tx_main_loop, NULL,
			lcore_id);
	}
}

The function launches Rx/Tx processing functions on configured lcores using rte_eal_remote_launch(). The configured ports, their number and number of assigned lcores are stored in user-defined rxtx_transmission_config struct:

struct rxtx_transmission_config {
	struct rxtx_port_config ports[RTE_MAX_ETHPORTS];
	uint16_t nb_ports;
	uint16_t nb_lcores;
};

The structure is initialized in ‘main()’ function with the values corresponding to ports and lcores configuration provided by the user.

13.4.3. The Lcores Processing Functions

For receiving packets on each port, the dma_rx_port() function is used. The function receives packets on each configured Rx queue. Depending on the mode the user chose, it will enqueue packets to DMA channels and then invoke copy process (hardware copy), or perform software copy of each packet using pktmbuf_sw_copy() function and enqueue them to an rte_ring:

static void
dma_rx_port(struct rxtx_port_config *rx_config)
{
	int32_t ret;
	uint32_t nb_rx, nb_enq, i, j;
	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
	struct rte_mbuf *pkts_burst_copy[MAX_PKT_BURST];

	for (i = 0; i < rx_config->nb_queues; i++) {

		nb_rx = rte_eth_rx_burst(rx_config->rxtx_port, i,
			pkts_burst, MAX_PKT_BURST);

		if (nb_rx == 0) {
			if (copy_mode == COPY_MODE_DMA_NUM &&
				(nb_rx = dma_dequeue(pkts_burst, pkts_burst_copy,
					MAX_PKT_BURST, rx_config->dmadev_ids[i])) > 0)
				goto handle_tx;
			continue;
		}

		port_statistics.rx[rx_config->rxtx_port] += nb_rx;

		ret = rte_mempool_get_bulk(dma_pktmbuf_pool,
			(void *)pkts_burst_copy, nb_rx);

		if (unlikely(ret < 0))
			rte_exit(EXIT_FAILURE,
				"Unable to allocate memory.\n");

		for (j = 0; j < nb_rx; j++)
			pktmbuf_metadata_copy(pkts_burst[j],
				pkts_burst_copy[j]);

		if (copy_mode == COPY_MODE_DMA_NUM) {
			/* enqueue packets for  hardware copy */
			nb_enq = dma_enqueue(pkts_burst, pkts_burst_copy,
				nb_rx, dma_batch_sz, rx_config->dmadev_ids[i]);

			/* free any not enqueued packets. */
			rte_mempool_put_bulk(dma_pktmbuf_pool,
				(void *)&pkts_burst[nb_enq],
				nb_rx - nb_enq);
			rte_mempool_put_bulk(dma_pktmbuf_pool,
				(void *)&pkts_burst_copy[nb_enq],
				nb_rx - nb_enq);

			port_statistics.copy_dropped[rx_config->rxtx_port] +=
				(nb_rx - nb_enq);

			/* get completed copies */
			nb_rx = dma_dequeue(pkts_burst, pkts_burst_copy,
				MAX_PKT_BURST, rx_config->dmadev_ids[i]);
		} else {
			/* Perform packet software copy, free source packets */
			for (j = 0; j < nb_rx; j++)
				pktmbuf_sw_copy(pkts_burst[j],
					pkts_burst_copy[j]);
		}

handle_tx:
		rte_mempool_put_bulk(dma_pktmbuf_pool,
			(void *)pkts_burst, nb_rx);

		nb_enq = rte_ring_enqueue_burst(rx_config->rx_to_tx_ring,
			(void *)pkts_burst_copy, nb_rx, NULL);

		/* Free any not enqueued packets. */
		rte_mempool_put_bulk(dma_pktmbuf_pool,
			(void *)&pkts_burst_copy[nb_enq],
			nb_rx - nb_enq);

		port_statistics.copy_dropped[rx_config->rxtx_port] +=
			(nb_rx - nb_enq);
	}
}

The packets are received in burst mode using rte_eth_rx_burst() function. When using hardware copy mode the packets are enqueued in the copying device’s buffer using dma_enqueue_packets() which calls rte_dma_copy(). When all received packets are in the buffer, the copy operations are started by calling rte_dma_submit(). Function rte_dma_copy() operates on physical address of the packet. Structure rte_mbuf contains only physical address to start of the data buffer (buf_iova). Thus, the rte_pktmbuf_iova() API is used to get the address of the start of the data within the mbuf.

static void
dma_rx_port(struct rxtx_port_config *rx_config)
{
	int32_t ret;
	uint32_t nb_rx, nb_enq, i, j;
	struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
	struct rte_mbuf *pkts_burst_copy[MAX_PKT_BURST];

	for (i = 0; i < rx_config->nb_queues; i++) {

		nb_rx = rte_eth_rx_burst(rx_config->rxtx_port, i,
			pkts_burst, MAX_PKT_BURST);

		if (nb_rx == 0) {
			if (copy_mode == COPY_MODE_DMA_NUM &&
				(nb_rx = dma_dequeue(pkts_burst, pkts_burst_copy,
					MAX_PKT_BURST, rx_config->dmadev_ids[i])) > 0)
				goto handle_tx;
			continue;
		}

		port_statistics.rx[rx_config->rxtx_port] += nb_rx;

		ret = rte_mempool_get_bulk(dma_pktmbuf_pool,
			(void *)pkts_burst_copy, nb_rx);

		if (unlikely(ret < 0))
			rte_exit(EXIT_FAILURE,
				"Unable to allocate memory.\n");

		for (j = 0; j < nb_rx; j++)
			pktmbuf_metadata_copy(pkts_burst[j],
				pkts_burst_copy[j]);

		if (copy_mode == COPY_MODE_DMA_NUM) {
			/* enqueue packets for  hardware copy */
			nb_enq = dma_enqueue(pkts_burst, pkts_burst_copy,
				nb_rx, dma_batch_sz, rx_config->dmadev_ids[i]);

			/* free any not enqueued packets. */
			rte_mempool_put_bulk(dma_pktmbuf_pool,
				(void *)&pkts_burst[nb_enq],
				nb_rx - nb_enq);
			rte_mempool_put_bulk(dma_pktmbuf_pool,
				(void *)&pkts_burst_copy[nb_enq],
				nb_rx - nb_enq);

			port_statistics.copy_dropped[rx_config->rxtx_port] +=
				(nb_rx - nb_enq);

			/* get completed copies */
			nb_rx = dma_dequeue(pkts_burst, pkts_burst_copy,
				MAX_PKT_BURST, rx_config->dmadev_ids[i]);
		} else {
			/* Perform packet software copy, free source packets */
			for (j = 0; j < nb_rx; j++)
				pktmbuf_sw_copy(pkts_burst[j],
					pkts_burst_copy[j]);
		}

handle_tx:
		rte_mempool_put_bulk(dma_pktmbuf_pool,
			(void *)pkts_burst, nb_rx);

		nb_enq = rte_ring_enqueue_burst(rx_config->rx_to_tx_ring,
			(void *)pkts_burst_copy, nb_rx, NULL);

		/* Free any not enqueued packets. */
		rte_mempool_put_bulk(dma_pktmbuf_pool,
			(void *)&pkts_burst_copy[nb_enq],
			nb_rx - nb_enq);

		port_statistics.copy_dropped[rx_config->rxtx_port] +=
			(nb_rx - nb_enq);
	}
}

Once the copies have been completed (this includes gathering the completions in HW copy mode), the copied packets are enqueued to the rx_to_tx_ring, which is used to pass the packets to the Tx function.

All completed copies are processed by dma_tx_port() function. This function dequeues copied packets from the rx_to_tx_ring. Then, each packet MAC address is changed if it was enabled. After that, copies are sent in burst mode using rte_eth_tx_burst().

static void
dma_tx_port(struct rxtx_port_config *tx_config)
{
	uint32_t i, j, nb_dq, nb_tx;
	struct rte_mbuf *mbufs[MAX_PKT_BURST];

	for (i = 0; i < tx_config->nb_queues; i++) {

		/* Dequeue the mbufs from rx_to_tx_ring. */
		nb_dq = rte_ring_dequeue_burst(tx_config->rx_to_tx_ring,
				(void *)mbufs, MAX_PKT_BURST, NULL);
		if (nb_dq == 0)
			continue;

		/* Update macs if enabled */
		if (mac_updating) {
			for (j = 0; j < nb_dq; j++)
				update_mac_addrs(mbufs[j],
					tx_config->rxtx_port);
		}

		nb_tx = rte_eth_tx_burst(tx_config->rxtx_port, 0,
				(void *)mbufs, nb_dq);

		port_statistics.tx[tx_config->rxtx_port] += nb_tx;

		if (unlikely(nb_tx < nb_dq)) {
			port_statistics.tx_dropped[tx_config->rxtx_port] +=
				(nb_dq - nb_tx);
			/* Free any unsent packets. */
			rte_mempool_put_bulk(dma_pktmbuf_pool,
			(void *)&mbufs[nb_tx], nb_dq - nb_tx);
		}
	}
}

13.4.4. The Packet Copying Functions

In order to perform SW packet copy, there are user-defined functions to the first copy the packet metadata (pktmbuf_metadata_copy()) and then the packet data (pktmbuf_sw_copy()):

static inline void
pktmbuf_metadata_copy(const struct rte_mbuf *src, struct rte_mbuf *dst)
{
	dst->data_off = src->data_off;
	memcpy(&dst->rx_descriptor_fields1, &src->rx_descriptor_fields1,
		offsetof(struct rte_mbuf, buf_len) -
		offsetof(struct rte_mbuf, rx_descriptor_fields1));
}

/* Copy packet data */
static inline void
pktmbuf_sw_copy(struct rte_mbuf *src, struct rte_mbuf *dst)
{
	rte_memcpy(rte_pktmbuf_mtod(dst, char *),
		rte_pktmbuf_mtod(src, char *),
		RTE_MAX(src->data_len, force_min_copy_size));
}

The metadata in this example is copied from rx_descriptor_fields1 marker of rte_mbuf struct up to buf_len member.

In order to understand why software packet copying is done as shown above, please refer to the Packet (Mbuf) Library.