61
61
62
62
#define MEMPOOL_CACHE_SIZE 256
63
63
64
- #define ARP_RING_SIZE 2048
64
+ #define DISPATCH_RING_SIZE 2048
65
65
66
66
#define MSG_RING_SIZE 32
67
67
@@ -162,7 +162,8 @@ static struct lcore_conf lcore_conf;
162
162
163
163
static struct rte_mempool * pktmbuf_pool [NB_SOCKETS ];
164
164
165
- static struct rte_ring * * arp_ring [RTE_MAX_ETHPORTS ];
165
+ static struct rte_ring * * dispatch_ring [RTE_MAX_ETHPORTS ];
166
+ static dispatch_func_t packet_dispatcher ;
166
167
167
168
static uint16_t rss_reta_size [RTE_MAX_ETHPORTS ];
168
169
@@ -337,6 +338,10 @@ init_lcore_conf(void)
337
338
lcore_conf .nb_queue_list [port_id ] = pconf -> nb_lcores ;
338
339
}
339
340
341
+ if (lcore_conf .nb_rx_queue == 0 ) {
342
+ rte_exit (EXIT_FAILURE , "lcore %u has nothing to do\n" , lcore_id );
343
+ }
344
+
340
345
return 0 ;
341
346
}
342
347
@@ -355,7 +360,7 @@ init_mem_pool(void)
355
360
nb_lcores * MEMPOOL_CACHE_SIZE +
356
361
nb_ports * KNI_MBUF_MAX +
357
362
nb_ports * KNI_QUEUE_SIZE +
358
- nb_lcores * nb_ports * ARP_RING_SIZE ),
363
+ nb_lcores * nb_ports * DISPATCH_RING_SIZE ),
359
364
(unsigned )8192 );
360
365
361
366
unsigned socketid = 0 ;
@@ -418,7 +423,7 @@ create_ring(const char *name, unsigned count, int socket_id, unsigned flags)
418
423
}
419
424
420
425
static int
421
- init_arp_ring (void )
426
+ init_dispatch_ring (void )
422
427
{
423
428
int j ;
424
429
char name_buf [RTE_RING_NAMESIZE ];
@@ -432,28 +437,29 @@ init_arp_ring(void)
432
437
uint16_t portid = ff_global_cfg .dpdk .portid_list [j ];
433
438
struct ff_port_cfg * pconf = & ff_global_cfg .dpdk .port_cfgs [portid ];
434
439
int nb_queues = pconf -> nb_lcores ;
435
- if (arp_ring [portid ] == NULL ) {
440
+ if (dispatch_ring [portid ] == NULL ) {
436
441
snprintf (name_buf , RTE_RING_NAMESIZE , "ring_ptr_p%d" , portid );
437
442
438
- arp_ring [portid ] = rte_zmalloc (name_buf ,
439
- sizeof (struct rte_ring * ) * nb_queues ,
440
- RTE_CACHE_LINE_SIZE );
441
- if (arp_ring [portid ] == NULL ) {
443
+ dispatch_ring [portid ] = rte_zmalloc (name_buf ,
444
+ sizeof (struct rte_ring * ) * nb_queues ,
445
+ RTE_CACHE_LINE_SIZE );
446
+ if (dispatch_ring [portid ] == NULL ) {
442
447
rte_exit (EXIT_FAILURE , "rte_zmalloc(%s (struct rte_ring*)) "
443
- "failed\n" , name_buf );
448
+ "failed\n" , name_buf );
444
449
}
445
450
}
446
451
447
452
for (queueid = 0 ; queueid < nb_queues ; ++ queueid ) {
448
- snprintf (name_buf , RTE_RING_NAMESIZE , "arp_ring_p%d_q%d" , portid , queueid );
449
- arp_ring [portid ][queueid ] = create_ring (name_buf , ARP_RING_SIZE ,
450
- socketid , RING_F_SC_DEQ );
453
+ snprintf (name_buf , RTE_RING_NAMESIZE , "dispatch_ring_p%d_q%d" ,
454
+ portid , queueid );
455
+ dispatch_ring [portid ][queueid ] = create_ring (name_buf ,
456
+ DISPATCH_RING_SIZE , socketid , RING_F_SC_DEQ );
451
457
452
- if (arp_ring [portid ][queueid ] == NULL )
458
+ if (dispatch_ring [portid ][queueid ] == NULL )
453
459
rte_panic ("create ring:%s failed!\n" , name_buf );
454
460
455
461
printf ("create ring:%s success, %u ring entries are now free!\n" ,
456
- name_buf , rte_ring_free_count (arp_ring [portid ][queueid ]));
462
+ name_buf , rte_ring_free_count (dispatch_ring [portid ][queueid ]));
457
463
}
458
464
}
459
465
@@ -807,7 +813,7 @@ ff_dpdk_init(int argc, char **argv)
807
813
808
814
init_mem_pool ();
809
815
810
- init_arp_ring ();
816
+ init_dispatch_ring ();
811
817
812
818
init_msg_ring ();
813
819
@@ -897,38 +903,56 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
897
903
uint16_t count , const struct ff_dpdk_if_context * ctx , int pkts_from_ring )
898
904
{
899
905
struct lcore_conf * qconf = & lcore_conf ;
906
+ uint16_t nb_queues = qconf -> nb_queue_list [port_id ];
900
907
901
908
uint16_t i ;
902
909
for (i = 0 ; i < count ; i ++ ) {
903
910
struct rte_mbuf * rtem = bufs [i ];
904
911
905
912
if (unlikely (qconf -> pcap [port_id ] != NULL )) {
906
- ff_dump_packets (qconf -> pcap [port_id ], rtem );
913
+ if (!pkts_from_ring ) {
914
+ ff_dump_packets (qconf -> pcap [port_id ], rtem );
915
+ }
907
916
}
908
917
909
918
void * data = rte_pktmbuf_mtod (rtem , void * );
910
919
uint16_t len = rte_pktmbuf_data_len (rtem );
911
920
921
+ if (!pkts_from_ring && packet_dispatcher ) {
922
+ int ret = (* packet_dispatcher )(data , len , nb_queues );
923
+ if (ret < 0 || ret >= nb_queues ) {
924
+ rte_pktmbuf_free (rtem );
925
+ continue ;
926
+ }
927
+
928
+ if (ret != queue_id ) {
929
+ ret = rte_ring_enqueue (dispatch_ring [port_id ][ret ], rtem );
930
+ if (ret < 0 )
931
+ rte_pktmbuf_free (rtem );
932
+
933
+ continue ;
934
+ }
935
+ }
936
+
912
937
enum FilterReturn filter = protocol_filter (data , len );
913
938
if (filter == FILTER_ARP ) {
914
939
struct rte_mempool * mbuf_pool ;
915
940
struct rte_mbuf * mbuf_clone ;
916
- if (pkts_from_ring == 0 ) {
917
- uint16_t i ;
918
- uint16_t nb_queues = qconf -> nb_queue_list [port_id ];
919
- for (i = 0 ; i < nb_queues ; ++ i ) {
920
- if (i == queue_id )
941
+ if (!pkts_from_ring ) {
942
+ uint16_t j ;
943
+ for (j = 0 ; j < nb_queues ; ++ j ) {
944
+ if (j == queue_id )
921
945
continue ;
922
946
923
947
unsigned socket_id = 0 ;
924
948
if (numa_on ) {
925
- uint16_t lcore_id = qconf -> port_cfgs [port_id ].lcore_list [i ];
949
+ uint16_t lcore_id = qconf -> port_cfgs [port_id ].lcore_list [j ];
926
950
socket_id = rte_lcore_to_socket_id (lcore_id );
927
951
}
928
952
mbuf_pool = pktmbuf_pool [socket_id ];
929
953
mbuf_clone = rte_pktmbuf_clone (rtem , mbuf_pool );
930
954
if (mbuf_clone ) {
931
- int ret = rte_ring_enqueue (arp_ring [port_id ][i ], mbuf_clone );
955
+ int ret = rte_ring_enqueue (dispatch_ring [port_id ][j ], mbuf_clone );
932
956
if (ret < 0 )
933
957
rte_pktmbuf_free (mbuf_clone );
934
958
}
@@ -954,12 +978,12 @@ process_packets(uint8_t port_id, uint16_t queue_id, struct rte_mbuf **bufs,
954
978
}
955
979
956
980
static inline int
957
- process_arp_ring (uint8_t port_id , uint16_t queue_id ,
981
+ process_dispatch_ring (uint8_t port_id , uint16_t queue_id ,
958
982
struct rte_mbuf * * pkts_burst , const struct ff_dpdk_if_context * ctx )
959
983
{
960
984
/* read packet from ring buf and to process */
961
985
uint16_t nb_rb ;
962
- nb_rb = rte_ring_dequeue_burst (arp_ring [port_id ][queue_id ],
986
+ nb_rb = rte_ring_dequeue_burst (dispatch_ring [port_id ][queue_id ],
963
987
(void * * )pkts_burst , MAX_PKT_BURST );
964
988
965
989
if (nb_rb > 0 ) {
@@ -1233,7 +1257,6 @@ main_loop(void *arg)
1233
1257
struct loop_routine * lr = (struct loop_routine * )arg ;
1234
1258
1235
1259
struct rte_mbuf * pkts_burst [MAX_PKT_BURST ];
1236
- unsigned lcore_id ;
1237
1260
uint64_t prev_tsc , diff_tsc , cur_tsc , usch_tsc , div_tsc , usr_tsc , sys_tsc , end_tsc ;
1238
1261
int i , j , nb_rx , idle ;
1239
1262
uint8_t port_id , queue_id ;
@@ -1245,14 +1268,8 @@ main_loop(void *arg)
1245
1268
prev_tsc = 0 ;
1246
1269
usch_tsc = 0 ;
1247
1270
1248
- lcore_id = rte_lcore_id ();
1249
1271
qconf = & lcore_conf ;
1250
1272
1251
- if (qconf -> nb_rx_queue == 0 ) {
1252
- printf ("lcore %u has nothing to do\n" , lcore_id );
1253
- return 0 ;
1254
- }
1255
-
1256
1273
while (1 ) {
1257
1274
cur_tsc = rte_rdtsc ();
1258
1275
if (unlikely (freebsd_clock .expire < cur_tsc )) {
@@ -1296,7 +1313,7 @@ main_loop(void *arg)
1296
1313
ff_kni_process (port_id , queue_id , pkts_burst , MAX_PKT_BURST );
1297
1314
}
1298
1315
1299
- process_arp_ring (port_id , queue_id , pkts_burst , ctx );
1316
+ process_dispatch_ring (port_id , queue_id , pkts_burst , ctx );
1300
1317
1301
1318
nb_rx = rte_eth_rx_burst (port_id , queue_id , pkts_burst ,
1302
1319
MAX_PKT_BURST );
@@ -1350,6 +1367,8 @@ main_loop(void *arg)
1350
1367
1351
1368
ff_status .loops ++ ;
1352
1369
}
1370
+
1371
+ return 0 ;
1353
1372
}
1354
1373
1355
1374
int
@@ -1446,3 +1465,9 @@ ff_rss_check(void *softc, uint32_t saddr, uint32_t daddr,
1446
1465
1447
1466
return ((hash & (reta_size - 1 )) % nb_queues ) == queueid ;
1448
1467
}
1468
+
1469
+ void
1470
+ ff_regist_packet_dispatcher (dispatch_func_t func )
1471
+ {
1472
+ packet_dispatcher = func ;
1473
+ }
0 commit comments