Skip to content

Commit eb5902d

Browse files
author
logwang
committed
Api: add packet dispatch callback function register.
In some cases, for example, packets are forwarded to your server through IP tunnel, and they will be received on fixed queues, since RSS doesn't support tunnels.So we need to dispatch them again. With this commit, we can implement a dispatcher callback function and regist it, packets retrieved from rx queue will be dispatched again according to the dispatcher result.
1 parent c855fce commit eb5902d

File tree

3 files changed

+97
-36
lines changed

3 files changed

+97
-36
lines changed

lib/ff_api.h

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ int ff_init(int argc, char * const argv[]);
5050

5151
void ff_run(loop_func_t loop, void *arg);
5252

53+
/* POSIX-LIKE api begin */
54+
5355
int ff_fcntl(int fd, int cmd, ...);
5456

5557
int ff_sysctl(const int *name, u_int namelen, void *oldp, size_t *oldlenp,
@@ -109,8 +111,13 @@ int ff_kevent_do_each(int kq, const struct kevent *changelist, int nchanges,
109111

110112
int ff_gettimeofday(struct timeval *tv, struct timezone *tz);
111113

114+
/* POSIX-LIKE api end */
115+
116+
117+
/* Tests if fd is used by F-Stack */
112118
extern int ff_fdisused(int fd);
113119

120+
114121
/* route api begin */
115122
enum FF_ROUTE_CTL {
116123
FF_ROUTE_ADD,
@@ -133,6 +140,34 @@ int ff_route_ctl(enum FF_ROUTE_CTL req, enum FF_ROUTE_FLAG flag,
133140

134141
/* route api end */
135142

143+
144+
/* dispatch api begin */
145+
146+
/*
147+
* Packet dispatch callback function.
148+
* Implemented by user.
149+
*
150+
* @param data
151+
* The data pointer of the packet.
152+
* @param len
153+
* The length of the packet.
154+
* @param nb_queues
155+
* Number of queues to be dispatched.
156+
*
157+
* @return 0 to (nb_queues - 1)
158+
* The queue id that the packet will be dispatched to.
159+
* @return -1
160+
* Error occurs or packet is handled by user, packet will be freed.
161+
*
162+
*/
163+
typedef int (*dispatch_func_t)(void *data, uint16_t len, uint16_t nb_queues);
164+
165+
/* regist a packet dispath function */
166+
void ff_regist_packet_dispatcher(dispatch_func_t func);
167+
168+
/* dispatch api end */
169+
170+
136171
/* internal api begin */
137172

138173
/*

lib/ff_dpdk_if.c

Lines changed: 59 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161

6262
#define MEMPOOL_CACHE_SIZE 256
6363

64-
#define ARP_RING_SIZE 2048
64+
#define DISPATCH_RING_SIZE 2048
6565

6666
#define MSG_RING_SIZE 32
6767

@@ -162,7 +162,8 @@ static struct lcore_conf lcore_conf;
162162

163163
static struct rte_mempool *pktmbuf_pool[NB_SOCKETS];
164164

165-
static struct rte_ring **arp_ring[RTE_MAX_ETHPORTS];
165+
static struct rte_ring **dispatch_ring[RTE_MAX_ETHPORTS];
166+
static dispatch_func_t packet_dispatcher;
166167

167168
static uint16_t rss_reta_size[RTE_MAX_ETHPORTS];
168169

@@ -337,6 +338,10 @@ init_lcore_conf(void)
337338
lcore_conf.nb_queue_list[port_id] = pconf->nb_lcores;
338339
}
339340

341+
if (lcore_conf.nb_rx_queue == 0) {
342+
rte_exit(EXIT_FAILURE, "lcore %u has nothing to do\n", lcore_id);
343+
}
344+
340345
return 0;
341346
}
342347

@@ -355,7 +360,7 @@ init_mem_pool(void)
355360
nb_lcores*MEMPOOL_CACHE_SIZE +
356361
nb_ports*KNI_MBUF_MAX +
357362
nb_ports*KNI_QUEUE_SIZE +
358-
nb_lcores*nb_ports*ARP_RING_SIZE),
363+
nb_lcores*nb_ports*DISPATCH_RING_SIZE),
359364
(unsigned)8192);
360365

361366
unsigned socketid = 0;
@@ -418,7 +423,7 @@ create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
418423
}
419424

420425
static int
421-
init_arp_ring(void)
426+
init_dispatch_ring(void)
422427
{
423428
int j;
424429
char name_buf[RTE_RING_NAMESIZE];
@@ -432,28 +437,29 @@ init_arp_ring(void)
432437
uint16_t portid = ff_global_cfg.dpdk.portid_list[j];
433438
struct ff_port_cfg *pconf = &ff_global_cfg.dpdk.port_cfgs[portid];
434439
int nb_queues = pconf->nb_lcores;
435-
if (arp_ring[portid] == NULL) {
440+
if (dispatch_ring[portid] == NULL) {
436441
snprintf(name_buf, RTE_RING_NAMESIZE, "ring_ptr_p%d", portid);
437442

438-
arp_ring[portid] = rte_zmalloc(name_buf,
439-
sizeof(struct rte_ring *) * nb_queues,
440-
RTE_CACHE_LINE_SIZE);
441-
if (arp_ring[portid] == NULL) {
443+
dispatch_ring[portid] = rte_zmalloc(name_buf,
444+
sizeof(struct rte_ring *) * nb_queues,
445+
RTE_CACHE_LINE_SIZE);
446+
if (dispatch_ring[portid] == NULL) {
442447
rte_exit(EXIT_FAILURE, "rte_zmalloc(%s (struct rte_ring*)) "
443-
"failed\n", name_buf);
448+
"failed\n", name_buf);
444449
}
445450
}
446451

447452
for(queueid = 0; queueid < nb_queues; ++queueid) {
448-
snprintf(name_buf, RTE_RING_NAMESIZE, "arp_ring_p%d_q%d", portid, queueid);
449-
arp_ring[portid][queueid] = create_ring(name_buf, ARP_RING_SIZE,
450-
socketid, RING_F_SC_DEQ);
453+
snprintf(name_buf, RTE_RING_NAMESIZE, "dispatch_ring_p%d_q%d",
454+
portid, queueid);
455+
dispatch_ring[portid][queueid] = create_ring(name_buf,
456+
DISPATCH_RING_SIZE, socketid, RING_F_SC_DEQ);
451457

452-
if (arp_ring[portid][queueid] == NULL)
458+
if (dispatch_ring[portid][queueid] == NULL)
453459
rte_panic("create ring:%s failed!\n", name_buf);
454460

455461
printf("create ring:%s success, %u ring entries are now free!\n",
456-
name_buf, rte_ring_free_count(arp_ring[portid][queueid]));
462+
name_buf, rte_ring_free_count(dispatch_ring[portid][queueid]));
457463
}
458464
}
459465

@@ -807,7 +813,7 @@ ff_dpdk_init(int argc, char **argv)
807813

808814
init_mem_pool();
809815

810-
init_arp_ring();
816+
init_dispatch_ring();
811817

812818
init_msg_ring();
813819

@@ -897,38 +903,56 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
897903
uint16_t count, const struct ff_dpdk_if_context *ctx, int pkts_from_ring)
898904
{
899905
struct lcore_conf *qconf = &lcore_conf;
906+
uint16_t nb_queues = qconf->nb_queue_list[port_id];
900907

901908
uint16_t i;
902909
for (i = 0; i < count; i++) {
903910
struct rte_mbuf *rtem = bufs[i];
904911

905912
if (unlikely(qconf->pcap[port_id] != NULL)) {
906-
ff_dump_packets(qconf->pcap[port_id], rtem);
913+
if (!pkts_from_ring) {
914+
ff_dump_packets(qconf->pcap[port_id], rtem);
915+
}
907916
}
908917

909918
void *data = rte_pktmbuf_mtod(rtem, void*);
910919
uint16_t len = rte_pktmbuf_data_len(rtem);
911920

921+
if (!pkts_from_ring && packet_dispatcher) {
922+
int ret = (*packet_dispatcher)(data, len, nb_queues);
923+
if (ret < 0 || ret >= nb_queues) {
924+
rte_pktmbuf_free(rtem);
925+
continue;
926+
}
927+
928+
if (ret != queue_id) {
929+
ret = rte_ring_enqueue(dispatch_ring[port_id][ret], rtem);
930+
if (ret < 0)
931+
rte_pktmbuf_free(rtem);
932+
933+
continue;
934+
}
935+
}
936+
912937
enum FilterReturn filter = protocol_filter(data, len);
913938
if (filter == FILTER_ARP) {
914939
struct rte_mempool *mbuf_pool;
915940
struct rte_mbuf *mbuf_clone;
916-
if (pkts_from_ring == 0) {
917-
uint16_t i;
918-
uint16_t nb_queues = qconf->nb_queue_list[port_id];
919-
for(i = 0; i < nb_queues; ++i) {
920-
if(i == queue_id)
941+
if (!pkts_from_ring) {
942+
uint16_t j;
943+
for(j = 0; j < nb_queues; ++j) {
944+
if(j == queue_id)
921945
continue;
922946

923947
unsigned socket_id = 0;
924948
if (numa_on) {
925-
uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[i];
949+
uint16_t lcore_id = qconf->port_cfgs[port_id].lcore_list[j];
926950
socket_id = rte_lcore_to_socket_id(lcore_id);
927951
}
928952
mbuf_pool = pktmbuf_pool[socket_id];
929953
mbuf_clone = rte_pktmbuf_clone(rtem, mbuf_pool);
930954
if(mbuf_clone) {
931-
int ret = rte_ring_enqueue(arp_ring[port_id][i], mbuf_clone);
955+
int ret = rte_ring_enqueue(dispatch_ring[port_id][j], mbuf_clone);
932956
if (ret < 0)
933957
rte_pktmbuf_free(mbuf_clone);
934958
}
@@ -954,12 +978,12 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
954978
}
955979

956980
static inline int
957-
process_arp_ring(uint8_t port_id, uint16_t queue_id,
981+
process_dispatch_ring(uint8_t port_id, uint16_t queue_id,
958982
struct rte_mbuf **pkts_burst, const struct ff_dpdk_if_context *ctx)
959983
{
960984
/* read packet from ring buf and to process */
961985
uint16_t nb_rb;
962-
nb_rb = rte_ring_dequeue_burst(arp_ring[port_id][queue_id],
986+
nb_rb = rte_ring_dequeue_burst(dispatch_ring[port_id][queue_id],
963987
(void **)pkts_burst, MAX_PKT_BURST);
964988

965989
if(nb_rb > 0) {
@@ -1233,7 +1257,6 @@ main_loop(void *arg)
12331257
struct loop_routine *lr = (struct loop_routine *)arg;
12341258

12351259
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
1236-
unsigned lcore_id;
12371260
uint64_t prev_tsc, diff_tsc, cur_tsc, usch_tsc, div_tsc, usr_tsc, sys_tsc, end_tsc;
12381261
int i, j, nb_rx, idle;
12391262
uint8_t port_id, queue_id;
@@ -1245,14 +1268,8 @@ main_loop(void *arg)
12451268
prev_tsc = 0;
12461269
usch_tsc = 0;
12471270

1248-
lcore_id = rte_lcore_id();
12491271
qconf = &lcore_conf;
12501272

1251-
if (qconf->nb_rx_queue == 0) {
1252-
printf("lcore %u has nothing to do\n", lcore_id);
1253-
return 0;
1254-
}
1255-
12561273
while (1) {
12571274
cur_tsc = rte_rdtsc();
12581275
if (unlikely(freebsd_clock.expire < cur_tsc)) {
@@ -1296,7 +1313,7 @@ main_loop(void *arg)
12961313
ff_kni_process(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
12971314
}
12981315

1299-
process_arp_ring(port_id, queue_id, pkts_burst, ctx);
1316+
process_dispatch_ring(port_id, queue_id, pkts_burst, ctx);
13001317

13011318
nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst,
13021319
MAX_PKT_BURST);
@@ -1350,6 +1367,8 @@ main_loop(void *arg)
13501367

13511368
ff_status.loops++;
13521369
}
1370+
1371+
return 0;
13531372
}
13541373

13551374
int
@@ -1446,3 +1465,9 @@ ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
14461465

14471466
return ((hash & (reta_size - 1)) % nb_queues) == queueid;
14481467
}
1468+
1469+
void
1470+
ff_regist_packet_dispatcher(dispatch_func_t func)
1471+
{
1472+
packet_dispatcher = func;
1473+
}

lib/ff_dpdk_kni.c

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,8 +257,9 @@ protocol_filter_ip(const void *data, uint16_t len)
257257
const struct ipv4_hdr *hdr;
258258
hdr = (const struct ipv4_hdr *)data;
259259

260-
void *next = (void *)data + sizeof(struct ipv4_hdr);
261-
uint16_t next_len = len - sizeof(struct ipv4_hdr);
260+
int hdr_len = (hdr->version_ihl & 0x0f) << 2;
261+
void *next = (void *)data + hdr_len;
262+
uint16_t next_len = len - hdr_len;
262263

263264
switch (hdr->next_proto_id) {
264265
case IPPROTO_TCP:

0 commit comments

Comments
 (0)