#include "args.h"
#include "main.h"
#include "init.h"
#include "../include/conf.h"
#ifdef QW_SOFTWARE_FC
#define SEND_PAUSE_FRAME(port_id, duration) send_pause_frame(port_id, duration)
#else
#define SEND_PAUSE_FRAME(port_id, duration) do { } while(0)
#endif
#define ETHER_TYPE_FLOW_CONTROL 0x8808
struct ether_fc_frame {
uint16_t opcode;
uint16_t param;
} __attribute__((__packed__));
int *quota;
unsigned int *low_watermark;
uint8_t port_pairs[RTE_MAX_ETHPORTS];
struct rte_ring *rings[RTE_MAX_LCORE][RTE_MAX_ETHPORTS];
static void send_pause_frame(uint8_t port_id, uint16_t duration)
{
struct ether_fc_frame *pause_frame;
RTE_LOG(DEBUG, USER1,
"Sending PAUSE frame (duration=%d) on port %d\n",
duration, port_id);
return;
pause_frame = (struct ether_fc_frame *) &hdr[1];
*((uint64_t *)tmp) = 0x010000C28001ULL;
}
static unsigned int
get_previous_lcore_id(unsigned int lcore_id)
{
int i;
for (i = lcore_id - 1; i >= 0; i--)
return i;
return -1;
}
static unsigned int
get_last_lcore_id(void)
{
int i;
for (i = RTE_MAX_LCORE; i >= 0; i--)
return i;
return 0;
}
static void
receive_stage(__attribute__((unused)) void *args)
{
int i, ret;
uint8_t port_id;
uint16_t nb_rx_pkts;
unsigned int lcore_id;
enum ring_state ring_state[RTE_MAX_ETHPORTS] = { RING_READY };
"%s() started on core %u\n", __func__, lcore_id);
while (1) {
for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
if (!is_bit_set(port_id, portmask))
continue;
ring = rings[lcore_id][port_id];
if (ring_state[port_id] != RING_READY) {
continue;
else
ring_state[port_id] = RING_READY;
}
if (ret == -EDQUOT) {
ring_state[port_id] = RING_OVERLOADED;
send_pause_frame(port_id, 1337);
}
else if (ret == -ENOBUFS) {
for (i = 0; i < nb_rx_pkts; i++)
}
}
}
}
static void
pipeline_stage(__attribute__((unused)) void *args)
{
int i, ret;
int nb_dq_pkts;
uint8_t port_id;
unsigned int lcore_id, previous_lcore_id;
void *pkts[MAX_PKT_QUOTA];
enum ring_state ring_state[RTE_MAX_ETHPORTS] = { RING_READY };
previous_lcore_id = get_previous_lcore_id(lcore_id);
"%s() started on core %u - processing packets from core %u\n",
__func__, lcore_id, previous_lcore_id);
while (1) {
for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
if (!is_bit_set(port_id, portmask))
continue;
tx = rings[lcore_id][port_id];
rx = rings[previous_lcore_id][port_id];
if (ring_state[port_id] != RING_READY) {
continue;
else
ring_state[port_id] = RING_READY;
}
continue;
if (ret == -EDQUOT)
ring_state[port_id] = RING_OVERLOADED;
else if (ret == -ENOBUFS) {
for (i = 0; i < nb_dq_pkts; i++)
}
}
}
}
static void
send_stage(__attribute__((unused)) void *args)
{
uint16_t nb_dq_pkts;
uint8_t port_id;
uint8_t dest_port_id;
unsigned int lcore_id, previous_lcore_id;
struct rte_mbuf *tx_pkts[MAX_PKT_QUOTA];
previous_lcore_id = get_previous_lcore_id(lcore_id);
"%s() started on core %u - processing packets from core %u\n",
__func__, lcore_id, previous_lcore_id);
while (1) {
for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++) {
if (!is_bit_set(port_id, portmask))
continue;
dest_port_id = port_pairs[port_id];
tx = rings[previous_lcore_id][port_id];
continue;
}
}
}
int
main(int argc, char **argv)
{
int ret;
unsigned int lcore_id, master_lcore_id, last_lcore_id;
uint8_t port_id;
if (ret < 0)
rte_exit(EXIT_FAILURE,
"Cannot initialize EAL\n");
argc -= ret;
argv += ret;
init_dpdk();
setup_shared_variables();
*quota = 32;
*low_watermark = 60 * RING_SIZE / 100;
last_lcore_id = get_last_lcore_id();
ret = parse_qw_args(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE,
"Invalid quota/watermark argument(s)\n");
if (mbuf_pool == NULL)
for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++)
if (is_bit_set(port_id, portmask)) {
configure_eth_port(port_id);
init_ring(master_lcore_id, port_id);
}
pair_ports();
for (lcore_id = 0 ; lcore_id < last_lcore_id; lcore_id++) {
for (port_id = 0; port_id < RTE_MAX_ETHPORTS; port_id++)
if (is_bit_set(port_id, portmask))
init_ring(lcore_id, port_id);
}
}
receive_stage(NULL);
return 0;
}