diff options
Diffstat (limited to 'worker/runtime.c')
| -rw-r--r-- | worker/runtime.c | 626 |
1 files changed, 626 insertions, 0 deletions
diff --git a/worker/runtime.c b/worker/runtime.c new file mode 100644 index 0000000..6c1ee47 --- /dev/null +++ b/worker/runtime.c @@ -0,0 +1,626 @@ +/*- + * BSD LICENSE + * + * Copyright(c) 2010-2013 Intel Corporation. All rights reserved. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Intel Corporation nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdint.h> +#include <inttypes.h> +#include <sys/types.h> +#include <string.h> +#include <sys/queue.h> +#include <stdarg.h> +#include <errno.h> +#include <getopt.h> +#include <unistd.h> + +#include <sys/time.h> + + +#include <rte_common.h> +#include <rte_byteorder.h> +#include <rte_log.h> +#include <rte_memory.h> +#include <rte_memcpy.h> +#include <rte_memzone.h> +#include <rte_tailq.h> +#include <rte_eal.h> +#include <rte_per_lcore.h> +#include <rte_launch.h> +#include <rte_atomic.h> +#include <rte_cycles.h> +#include <rte_prefetch.h> +#include <rte_lcore.h> +#include <rte_per_lcore.h> +#include <rte_branch_prediction.h> +#include <rte_interrupts.h> +#include <rte_pci.h> +#include <rte_random.h> +#include <rte_debug.h> +#include <rte_ether.h> +#include <rte_ethdev.h> +#include <rte_ring.h> +#include <rte_mempool.h> +#include <rte_mbuf.h> +#include <rte_ip.h> +#include <rte_tcp.h> +#include <rte_lpm.h> +#include <rte_spinlock.h> +#include <rte_ether.h> +#include <rte_ip.h> +#include <rte_tcp.h> +#include <rte_udp.h> + +#include "main.h" +#include "watchdog.h" +#include "nstat.h" + +#if APP_PACKET_MODIFY_DMAC || APP_PACKET_MODIFY_SMAC +#include "vlan.h" +#endif + +#ifndef APP_ENERGY_PAUSE +#define APP_ENERGY_PAUSE 0 +#endif + + +/* Implement by Lu Qiuwen <[email protected]> */ +/* Need to profiled */ + +//#include <sys/time.h> +//extern FILE *warn_output_stream; +//extern FILE *err_output_stream; + + +static inline uint32_t +power_idle_heuristic(uint32_t zero_rx_packet_count) +{ + /* If zero count is less than 100, use it as the sleep time in us */ + if (zero_rx_packet_count < app.energy.sleep_gear1_threshold) + return zero_rx_packet_count; + /* If zero count is less than 1000, sleep time should be 100 us */ + else if ((zero_rx_packet_count >= app.energy.sleep_gear1_threshold) && + (zero_rx_packet_count < app.energy.sleep_gear2_threshold)) + return app.energy.sleep_gear1_threshold; + /* If zero count is greater than 1000, sleep time should be 1000 us */ + else if (zero_rx_packet_count >= app.energy.sleep_gear2_threshold) + return app.energy.sleep_gear2_threshold; + + return 0; +} + +static inline int +app_pkt_map_call_extern_func(struct rte_mbuf *pkt, uint32_t rx_port, uint32_t tx_port) +{ + uint32_t lcore = rte_lcore_id(); + uint64_t tsc_start,tsc_end; + int ret; + + unsigned char * pkt_data = rte_pktmbuf_mtod(pkt,void *); + int pkt_len = rte_pktmbuf_data_len(pkt); + +#if RTE_VER_MAJOR >= 1 && RTE_VER_MINOR >= 8 + int in_port = pkt->port; +#else + int in_port = pkt->pkt.in_port; +#endif + +#if APP_STATS + tsc_start = rte_rdtsc(); +#endif + + if(app.pkt_callback_f.rx_pkt_process != NULL) + { + ret = (*app.pkt_callback_f.rx_pkt_process)(pkt_data,pkt_len,in_port,lcore); + } + else if(app.pkt_callback_f.rx_pkt_process_dst != NULL) + { + + int dst = app_port2stream(rx_port,tx_port); + ret = (*app.pkt_callback_f.rx_pkt_process_dst)(pkt_data,pkt_len,dst,lcore); + } + +#if APP_STATS + tsc_end = rte_rdtsc(); + nstat_client_count_wk_runtime(lcore,tsc_end - tsc_start); +#endif + + return ret; +} + +int app_pkt_process(struct rte_mbuf *pkt, uint8_t *port) +{ + + if(app.map_type == e_APP_MAP_TYPE_PORTMAP) { + uint32_t i = 0; + for(i = 0; i < app.n_rxtx_port_map ; i++) { + if(app.rxtx_port_map[i].rx_port == pkt->pkt.in_port) { + uint32_t j = 0; + for(j = 0; j < app.rxtx_port_map[i].n_tx_port ; j++) { + int rx_port = app.rxtx_port_map[i].rx_port; + int tx_port = app.rxtx_port_map[i].tx_port[j]; + *port = tx_port; + //DEBUG: + if(app_pkt_map_call_extern_func(pkt,rx_port,tx_port)) { + //TODO:Debug,Send to multi-stream will be crash + //Because if we send the pkt, the pkt will be released. + //After that, what we send again will be unused pointer. +#if APP_PACKET_MODIFY_DMAC + vlan_tag_pkt(pkt,0); +#endif + return 0; + } + else + { + return -1; //Drop packets. + } + } + } + } + return -1; + } + + return -1; +} + + +/* End */ + +/* Port from DPDK 1.7.1 examples/load_balancer/runtime.c */ +/* By Lu Qiuwen <[email protected]> at 2014-12-05 */ + +static inline void +app_lcore_worker( + struct app_lcore_params_worker *lp, + uint32_t bsz_rd, + uint32_t bsz_wr) +{ + uint32_t i; + uint64_t lcore_id = rte_lcore_id(); + + // Save energy + uint32_t lcore_rx_idle_count = 0; + uint32_t lcore_idle_hint = 0; + + uint32_t * zero_rx_packet_count = app.energy.wk_zero_rx_packet_count[lcore_id]; + uint32_t * zero_idle_hint = app.energy.wk_idle_hint[lcore_id]; + + for (i = 0; i < lp->n_rings_in; i ++) { + struct rte_ring *ring_in = lp->rings_in[i]; + uint32_t j; + uint32_t pos; + int ret; + + ret = rte_ring_sc_dequeue_bulk( + ring_in, + (void **) lp->mbuf_in.array, + bsz_rd); + + if (unlikely(ret == -ENOENT)) { + zero_rx_packet_count[i]++; + if(zero_rx_packet_count[i] < app.energy.min_zero_pull_count) + continue; + zero_idle_hint[i] = power_idle_heuristic(zero_rx_packet_count[i]); + lcore_rx_idle_count++; + + continue; + } + else { + zero_rx_packet_count[i] = zero_rx_packet_count[i] == 0 ? 0 : zero_rx_packet_count[i]--; + nstat_client_count_wk_pkts(lp->mbuf_in.array, bsz_rd, lcore_id); + } + + +#if APP_WORKER_DROP_ALL_PACKETS + for (j = 0; j < bsz_rd; j ++) { + struct rte_mbuf *pkt = lp->mbuf_in.array[j]; + rte_pktmbuf_free(pkt); + } + + continue; +#endif + + APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *)); + APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]); + + for (j = 0; j < bsz_rd; j ++) { + struct rte_mbuf *pkt; + uint8_t port; + + if (likely(j < bsz_rd - 1)) { + APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *)); + } + if (likely(j < bsz_rd - 2)) { + APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]); + } + + pkt = lp->mbuf_in.array[j]; + +#if 0 + if(unlikely(app_portmap_lookup(pkt->pkt.in_port,&port) != 0)) { + port = pkt->pkt.in_port; + } +#endif + +#if 0 + if (unlikely(rte_lpm_lookup(lp->lpm_table, ipv4_dst, &port) != 0)) { + port = pkt->pkt.in_port; + } +#endif + + if(unlikely(app_pkt_process(pkt, &port))) { + //Drop Packet + rte_pktmbuf_free(pkt); + } + else + { + //Forward Packet + pos = lp->mbuf_out[port].n_mbufs; + lp->mbuf_out[port].array[pos ++] = pkt; + } + if (likely(pos < bsz_wr)) { + lp->mbuf_out[port].n_mbufs = pos; + continue; + } + + ret = rte_ring_sp_enqueue_bulk( + lp->rings_out[port], + (void **) lp->mbuf_out[port].array, + bsz_wr); + + + if (unlikely(ret == -ENOBUFS)) { + uint32_t k; + nstat_client_count_wk_drop(lp->mbuf_out[port].array, bsz_wr,lcore_id); + for (k = 0; k < bsz_wr; k ++) { + struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k]; + rte_pktmbuf_free(pkt_to_free); + } + } + + lp->mbuf_out[port].n_mbufs = 0; + lp->mbuf_out_flush[port] = 0; + } + } + + if(unlikely(lcore_rx_idle_count == lp->n_rings_in)) + { + for(i = 0, lcore_idle_hint = zero_idle_hint[0]; i < lp->n_rings_in; i++) + { + lcore_idle_hint = lcore_idle_hint > zero_idle_hint[i] ? zero_idle_hint[i] : lcore_idle_hint; + } + } + + if(app.energy.enable) + { + if(lcore_idle_hint < app.energy.sleep_gear1_threshold) + rte_delay_us(lcore_idle_hint); + else + usleep(lcore_idle_hint); + } + + return; +} + +static inline void +app_lcore_worker_flush(struct app_lcore_params_worker *lp) +{ + uint32_t port; + uint8_t lcore_id = rte_lcore_id(); + + for (port = 0; port < APP_MAX_NIC_PORTS; port ++) { + int ret; + + if (unlikely(lp->rings_out[port] == NULL)) { + continue; + } + + if (likely((lp->mbuf_out_flush[port] == 0) || + (lp->mbuf_out[port].n_mbufs == 0))) { + lp->mbuf_out_flush[port] = 1; + continue; + } + + ret = rte_ring_sp_enqueue_bulk( + lp->rings_out[port], + (void **) lp->mbuf_out[port].array, + lp->mbuf_out[port].n_mbufs); + + if (unlikely(ret < 0)) { + nstat_client_count_wk_drop(lp->mbuf_out[port].array,lp->mbuf_out[port].n_mbufs,lcore_id); + uint32_t k; + for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) { + struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k]; + rte_pktmbuf_free(pkt_to_free); + } + } + + lp->mbuf_out[port].n_mbufs = 0; + lp->mbuf_out_flush[port] = 1; + } +} + + +#if 0 +int app_lcore_worker_tx_buffer_to_send (struct rte_mbuf *pkt, uint8_t port) +{ + uint32_t lcore = rte_lcore_id(); + uint32_t bsz_wr = app.burst_size_worker_write; + uint32_t n_pkts; + uint16_t tx_queueid = lcore % app.worker_core_num; + struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker; + uint32_t pos; +#if APP_STATS + uint64_t tsc_start, tsc_end; +#endif + + + pos = lp->mbuf_out[port].n_mbufs; + + lp->mbuf_out[port].array[pos ++] = pkt; + if (likely(pos < bsz_wr)) { + lp->mbuf_out[port].n_mbufs = pos; + return 0; + } +#if APP_STATS + get_stats(lp->mbuf_out[port].array, bsz_wr, lcore, port, tx_queueid, RSYS_TX); + tsc_start = rte_rdtsc(); + get_lcores_stats(lcore, tsc_start, RSYS_LCORE_TX); + +#endif + + + n_pkts = rte_eth_tx_burst( + port, + tx_queueid, + lp->mbuf_out[port].array, + (uint16_t) bsz_wr); +#ifdef APP_DEBUG + int debugi; + for(debugi = 0; debugi < (int)n_pkts; debugi++) + fprintf(stderr,"ETH_TX_BURST/WK:%d(%d)\n", + lp->mbuf_out[port].array[debugi]->pkt.data_len, + lcore); +#endif + + +#if APP_STATS + tsc_end = rte_rdtsc(); + get_lcores_stats(lcore, tsc_end, RSYS_LCORE_APP); + app_stats[lcore][port][tx_queueid][RSYS_TX].core_cycles[current_index] += (tsc_end - tsc_start); + +#endif + + if (unlikely(n_pkts < bsz_wr)) { + uint32_t k; +#if APP_STATS + remove_stats(lp->mbuf_out[port].array, bsz_wr - n_pkts, lcore, port, tx_queueid, RSYS_TX); +#endif + + for (k = n_pkts; k < bsz_wr; k ++) { + struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k]; + rte_pktmbuf_free(pkt_to_free); + } + } + + lp->mbuf_out[port].n_mbufs = 0; + lp->mbuf_out_flush[port] = 0; + + return 0; +} + + + +#endif + + +#if 0 + +static inline void +app_lcore_worker( + struct app_lcore_params_worker *lp, + uint32_t bsz_rd) +{ + uint32_t i,n; +#if APP_STATS + unsigned lcore_id = rte_lcore_id(); + uint64_t tsc_start, tsc_end; +#endif + + for (i = 0; i < lp->n_rings_in; i ++) + { + struct rte_ring *ring_in = lp->rings_in[i]; + uint32_t j; + +#if APP_STATS + tsc_start = rte_rdtsc(); + get_lcores_stats(lcore_id, tsc_start, RSYS_LCORE_RX); + +#endif + + n = rte_ring_sc_dequeue_burst( + ring_in, + (void **) lp->mbuf_in.array, + bsz_rd); + + if (unlikely( n == 0)) { + continue; + } +#if APP_STATS + tsc_end = rte_rdtsc(); + get_lcores_stats(lcore_id, tsc_end, RSYS_LCORE_APP); + app_stats[lcore_id][lcore_conf[lcore_id][i].port_id][lcore_conf[lcore_id][i].queue_id][RSYS_RX].core_cycles[current_index] += (tsc_end - tsc_start); + get_stats(lp->mbuf_in.array, n, lcore_id, 0, 0, RSYS_RX); +#endif + //usleep(5000); + +#if APP_WORKER_DROP_ALL_PACKETS + for (j = 0; j < n; j ++) { + struct rte_mbuf *pkt = lp->mbuf_in.array[j]; + rte_pktmbuf_free(pkt); + } + + continue; +#endif + + APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *)); + APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]); + + for (j = 0; j < n; j ++) { + struct rte_mbuf *pkt; + uint8_t port; + + if (likely(j < n - 1)) { + APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *)); + } + if (likely(j < n - 2)) { + APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]); + } + + pkt = lp->mbuf_in.array[j]; + + if(app_pkt_process(pkt, &port)) { + struct rte_mbuf *pkt = lp->mbuf_in.array[j]; + rte_pktmbuf_free(pkt); + } + + } + } +} + +static inline void +app_lcore_worker_flush(struct app_lcore_params_worker *lp) +{ + uint32_t port; + uint32_t lcore = rte_lcore_id(); + uint16_t tx_queueid = lcore % app.worker_core_num; + + uint32_t n_pkts; +#if APP_STATS + uint64_t tsc_start, tsc_end; +#endif + + + for (port = 0; port < APP_MAX_NIC_PORTS; port ++) { + if (likely((lp->mbuf_out_flush[port] == 0) || + (lp->mbuf_out[port].n_mbufs == 0))) { + lp->mbuf_out_flush[port] = 1; + continue; + } + +#if APP_STATS + get_stats(lp->mbuf_out[port].array, lp->mbuf_out[port].n_mbufs, lcore, port, tx_queueid, RSYS_TX); + tsc_start = rte_rdtsc(); + get_lcores_stats(lcore, tsc_start, RSYS_LCORE_TX); + +#endif + n_pkts = rte_eth_tx_burst( + port, + tx_queueid, + lp->mbuf_out[port].array, + lp->mbuf_out[port].n_mbufs); + +#ifdef APP_DEBUG + //DEBUG: + int debugi; + fprintf(stderr,"ETH_TX_BURST/WK_FL-n_mbufs:%d\n",n_pkts); + for(debugi = 0; debugi < (int)n_pkts; debugi++) + fprintf(stderr,"ETH_TX_BURST/WK_FL:%d(%d)\n", + lp->mbuf_out[port].array[debugi]->pkt.data_len, + lcore); +#endif + +#if APP_STATS + tsc_end = rte_rdtsc(); + get_lcores_stats(lcore, tsc_end, RSYS_LCORE_APP); + app_stats[lcore][port][tx_queueid][RSYS_TX].core_cycles[current_index] += (tsc_end - tsc_start); + +#endif + + if (unlikely(n_pkts < lp->mbuf_out[port].n_mbufs)) { + uint32_t k; +#if APP_STATS + remove_stats(lp->mbuf_out[port].array, lp->mbuf_out[port].n_mbufs - n_pkts, lcore, port, tx_queueid, RSYS_TX); +#endif + + for (k = n_pkts; k < lp->mbuf_out[port].n_mbufs; k ++) { + struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k]; + rte_pktmbuf_free(pkt_to_free); + } + } + lp->mbuf_out[port].n_mbufs = 0; + lp->mbuf_out_flush[port] = 1; + } +} + + +#endif + +static void +app_lcore_main_loop_worker(void) { + uint32_t lcore = rte_lcore_id(); + struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker; + uint64_t i = 0; + + uint32_t bsz_rd = app.burst_size_worker_read; + uint32_t bsz_wr = app.burst_size_worker_write; + + for ( ; ; ) { + if (APP_LCORE_WORKER_FLUSH && (unlikely(i == APP_LCORE_WORKER_FLUSH))) { + app_lcore_worker_flush(lp); + i = 0; + } + + app_lcore_worker(lp, bsz_rd, bsz_wr); + i ++; + } +} + +int +app_lcore_main_loop(__attribute__((unused)) void *arg) +{ + struct app_lcore_params *lp; + unsigned lcore; + + lcore = rte_lcore_id(); + lp = &app.lcore_params[lcore]; + + if (lp->type == e_APP_LCORE_WORKER) { + printf("Logical core %u (worker %u) main loop.\n", + lcore, + (unsigned) lp->worker.worker_id); + app_lcore_main_loop_worker(); + } + + return 0; +} |
