summaryrefslogtreecommitdiff
path: root/worker/runtime.c
diff options
context:
space:
mode:
Diffstat (limited to 'worker/runtime.c')
-rw-r--r--worker/runtime.c626
1 files changed, 626 insertions, 0 deletions
diff --git a/worker/runtime.c b/worker/runtime.c
new file mode 100644
index 0000000..6c1ee47
--- /dev/null
+++ b/worker/runtime.c
@@ -0,0 +1,626 @@
+/*-
+ * BSD LICENSE
+ *
+ * Copyright(c) 2010-2013 Intel Corporation. All rights reserved.
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * * Neither the name of Intel Corporation nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <inttypes.h>
+#include <sys/types.h>
+#include <string.h>
+#include <sys/queue.h>
+#include <stdarg.h>
+#include <errno.h>
+#include <getopt.h>
+#include <unistd.h>
+
+#include <sys/time.h>
+
+
+#include <rte_common.h>
+#include <rte_byteorder.h>
+#include <rte_log.h>
+#include <rte_memory.h>
+#include <rte_memcpy.h>
+#include <rte_memzone.h>
+#include <rte_tailq.h>
+#include <rte_eal.h>
+#include <rte_per_lcore.h>
+#include <rte_launch.h>
+#include <rte_atomic.h>
+#include <rte_cycles.h>
+#include <rte_prefetch.h>
+#include <rte_lcore.h>
+#include <rte_per_lcore.h>
+#include <rte_branch_prediction.h>
+#include <rte_interrupts.h>
+#include <rte_pci.h>
+#include <rte_random.h>
+#include <rte_debug.h>
+#include <rte_ether.h>
+#include <rte_ethdev.h>
+#include <rte_ring.h>
+#include <rte_mempool.h>
+#include <rte_mbuf.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_lpm.h>
+#include <rte_spinlock.h>
+#include <rte_ether.h>
+#include <rte_ip.h>
+#include <rte_tcp.h>
+#include <rte_udp.h>
+
+#include "main.h"
+#include "watchdog.h"
+#include "nstat.h"
+
+#if APP_PACKET_MODIFY_DMAC || APP_PACKET_MODIFY_SMAC
+#include "vlan.h"
+#endif
+
+#ifndef APP_ENERGY_PAUSE
+#define APP_ENERGY_PAUSE 0
+#endif
+
+
+/* Implement by Lu Qiuwen <[email protected]> */
+/* Need to profiled */
+
+//#include <sys/time.h>
+//extern FILE *warn_output_stream;
+//extern FILE *err_output_stream;
+
+
+static inline uint32_t
+power_idle_heuristic(uint32_t zero_rx_packet_count)
+{
+ /* If zero count is less than 100, use it as the sleep time in us */
+ if (zero_rx_packet_count < app.energy.sleep_gear1_threshold)
+ return zero_rx_packet_count;
+ /* If zero count is less than 1000, sleep time should be 100 us */
+ else if ((zero_rx_packet_count >= app.energy.sleep_gear1_threshold) &&
+ (zero_rx_packet_count < app.energy.sleep_gear2_threshold))
+ return app.energy.sleep_gear1_threshold;
+ /* If zero count is greater than 1000, sleep time should be 1000 us */
+ else if (zero_rx_packet_count >= app.energy.sleep_gear2_threshold)
+ return app.energy.sleep_gear2_threshold;
+
+ return 0;
+}
+
+static inline int
+app_pkt_map_call_extern_func(struct rte_mbuf *pkt, uint32_t rx_port, uint32_t tx_port)
+{
+ uint32_t lcore = rte_lcore_id();
+ uint64_t tsc_start,tsc_end;
+ int ret;
+
+ unsigned char * pkt_data = rte_pktmbuf_mtod(pkt,void *);
+ int pkt_len = rte_pktmbuf_data_len(pkt);
+
+#if RTE_VER_MAJOR >= 1 && RTE_VER_MINOR >= 8
+ int in_port = pkt->port;
+#else
+ int in_port = pkt->pkt.in_port;
+#endif
+
+#if APP_STATS
+ tsc_start = rte_rdtsc();
+#endif
+
+ if(app.pkt_callback_f.rx_pkt_process != NULL)
+ {
+ ret = (*app.pkt_callback_f.rx_pkt_process)(pkt_data,pkt_len,in_port,lcore);
+ }
+ else if(app.pkt_callback_f.rx_pkt_process_dst != NULL)
+ {
+
+ int dst = app_port2stream(rx_port,tx_port);
+ ret = (*app.pkt_callback_f.rx_pkt_process_dst)(pkt_data,pkt_len,dst,lcore);
+ }
+
+#if APP_STATS
+ tsc_end = rte_rdtsc();
+ nstat_client_count_wk_runtime(lcore,tsc_end - tsc_start);
+#endif
+
+ return ret;
+}
+
+int app_pkt_process(struct rte_mbuf *pkt, uint8_t *port)
+{
+
+ if(app.map_type == e_APP_MAP_TYPE_PORTMAP) {
+ uint32_t i = 0;
+ for(i = 0; i < app.n_rxtx_port_map ; i++) {
+ if(app.rxtx_port_map[i].rx_port == pkt->pkt.in_port) {
+ uint32_t j = 0;
+ for(j = 0; j < app.rxtx_port_map[i].n_tx_port ; j++) {
+ int rx_port = app.rxtx_port_map[i].rx_port;
+ int tx_port = app.rxtx_port_map[i].tx_port[j];
+ *port = tx_port;
+ //DEBUG:
+ if(app_pkt_map_call_extern_func(pkt,rx_port,tx_port)) {
+ //TODO:Debug,Send to multi-stream will be crash
+ //Because if we send the pkt, the pkt will be released.
+ //After that, what we send again will be unused pointer.
+#if APP_PACKET_MODIFY_DMAC
+ vlan_tag_pkt(pkt,0);
+#endif
+ return 0;
+ }
+ else
+ {
+ return -1; //Drop packets.
+ }
+ }
+ }
+ }
+ return -1;
+ }
+
+ return -1;
+}
+
+
+/* End */
+
+/* Port from DPDK 1.7.1 examples/load_balancer/runtime.c */
+/* By Lu Qiuwen <[email protected]> at 2014-12-05 */
+
+static inline void
+app_lcore_worker(
+ struct app_lcore_params_worker *lp,
+ uint32_t bsz_rd,
+ uint32_t bsz_wr)
+{
+ uint32_t i;
+ uint64_t lcore_id = rte_lcore_id();
+
+ // Save energy
+ uint32_t lcore_rx_idle_count = 0;
+ uint32_t lcore_idle_hint = 0;
+
+ uint32_t * zero_rx_packet_count = app.energy.wk_zero_rx_packet_count[lcore_id];
+ uint32_t * zero_idle_hint = app.energy.wk_idle_hint[lcore_id];
+
+ for (i = 0; i < lp->n_rings_in; i ++) {
+ struct rte_ring *ring_in = lp->rings_in[i];
+ uint32_t j;
+ uint32_t pos;
+ int ret;
+
+ ret = rte_ring_sc_dequeue_bulk(
+ ring_in,
+ (void **) lp->mbuf_in.array,
+ bsz_rd);
+
+ if (unlikely(ret == -ENOENT)) {
+ zero_rx_packet_count[i]++;
+ if(zero_rx_packet_count[i] < app.energy.min_zero_pull_count)
+ continue;
+ zero_idle_hint[i] = power_idle_heuristic(zero_rx_packet_count[i]);
+ lcore_rx_idle_count++;
+
+ continue;
+ }
+ else {
+ zero_rx_packet_count[i] = zero_rx_packet_count[i] == 0 ? 0 : zero_rx_packet_count[i]--;
+ nstat_client_count_wk_pkts(lp->mbuf_in.array, bsz_rd, lcore_id);
+ }
+
+
+#if APP_WORKER_DROP_ALL_PACKETS
+ for (j = 0; j < bsz_rd; j ++) {
+ struct rte_mbuf *pkt = lp->mbuf_in.array[j];
+ rte_pktmbuf_free(pkt);
+ }
+
+ continue;
+#endif
+
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *));
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]);
+
+ for (j = 0; j < bsz_rd; j ++) {
+ struct rte_mbuf *pkt;
+ uint8_t port;
+
+ if (likely(j < bsz_rd - 1)) {
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *));
+ }
+ if (likely(j < bsz_rd - 2)) {
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]);
+ }
+
+ pkt = lp->mbuf_in.array[j];
+
+#if 0
+ if(unlikely(app_portmap_lookup(pkt->pkt.in_port,&port) != 0)) {
+ port = pkt->pkt.in_port;
+ }
+#endif
+
+#if 0
+ if (unlikely(rte_lpm_lookup(lp->lpm_table, ipv4_dst, &port) != 0)) {
+ port = pkt->pkt.in_port;
+ }
+#endif
+
+ if(unlikely(app_pkt_process(pkt, &port))) {
+ //Drop Packet
+ rte_pktmbuf_free(pkt);
+ }
+ else
+ {
+ //Forward Packet
+ pos = lp->mbuf_out[port].n_mbufs;
+ lp->mbuf_out[port].array[pos ++] = pkt;
+ }
+ if (likely(pos < bsz_wr)) {
+ lp->mbuf_out[port].n_mbufs = pos;
+ continue;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rings_out[port],
+ (void **) lp->mbuf_out[port].array,
+ bsz_wr);
+
+
+ if (unlikely(ret == -ENOBUFS)) {
+ uint32_t k;
+ nstat_client_count_wk_drop(lp->mbuf_out[port].array, bsz_wr,lcore_id);
+ for (k = 0; k < bsz_wr; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 0;
+ }
+ }
+
+ if(unlikely(lcore_rx_idle_count == lp->n_rings_in))
+ {
+ for(i = 0, lcore_idle_hint = zero_idle_hint[0]; i < lp->n_rings_in; i++)
+ {
+ lcore_idle_hint = lcore_idle_hint > zero_idle_hint[i] ? zero_idle_hint[i] : lcore_idle_hint;
+ }
+ }
+
+ if(app.energy.enable)
+ {
+ if(lcore_idle_hint < app.energy.sleep_gear1_threshold)
+ rte_delay_us(lcore_idle_hint);
+ else
+ usleep(lcore_idle_hint);
+ }
+
+ return;
+}
+
+static inline void
+app_lcore_worker_flush(struct app_lcore_params_worker *lp)
+{
+ uint32_t port;
+ uint8_t lcore_id = rte_lcore_id();
+
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ int ret;
+
+ if (unlikely(lp->rings_out[port] == NULL)) {
+ continue;
+ }
+
+ if (likely((lp->mbuf_out_flush[port] == 0) ||
+ (lp->mbuf_out[port].n_mbufs == 0))) {
+ lp->mbuf_out_flush[port] = 1;
+ continue;
+ }
+
+ ret = rte_ring_sp_enqueue_bulk(
+ lp->rings_out[port],
+ (void **) lp->mbuf_out[port].array,
+ lp->mbuf_out[port].n_mbufs);
+
+ if (unlikely(ret < 0)) {
+ nstat_client_count_wk_drop(lp->mbuf_out[port].array,lp->mbuf_out[port].n_mbufs,lcore_id);
+ uint32_t k;
+ for (k = 0; k < lp->mbuf_out[port].n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 1;
+ }
+}
+
+
+#if 0
+int app_lcore_worker_tx_buffer_to_send (struct rte_mbuf *pkt, uint8_t port)
+{
+ uint32_t lcore = rte_lcore_id();
+ uint32_t bsz_wr = app.burst_size_worker_write;
+ uint32_t n_pkts;
+ uint16_t tx_queueid = lcore % app.worker_core_num;
+ struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
+ uint32_t pos;
+#if APP_STATS
+ uint64_t tsc_start, tsc_end;
+#endif
+
+
+ pos = lp->mbuf_out[port].n_mbufs;
+
+ lp->mbuf_out[port].array[pos ++] = pkt;
+ if (likely(pos < bsz_wr)) {
+ lp->mbuf_out[port].n_mbufs = pos;
+ return 0;
+ }
+#if APP_STATS
+ get_stats(lp->mbuf_out[port].array, bsz_wr, lcore, port, tx_queueid, RSYS_TX);
+ tsc_start = rte_rdtsc();
+ get_lcores_stats(lcore, tsc_start, RSYS_LCORE_TX);
+
+#endif
+
+
+ n_pkts = rte_eth_tx_burst(
+ port,
+ tx_queueid,
+ lp->mbuf_out[port].array,
+ (uint16_t) bsz_wr);
+#ifdef APP_DEBUG
+ int debugi;
+ for(debugi = 0; debugi < (int)n_pkts; debugi++)
+ fprintf(stderr,"ETH_TX_BURST/WK:%d(%d)\n",
+ lp->mbuf_out[port].array[debugi]->pkt.data_len,
+ lcore);
+#endif
+
+
+#if APP_STATS
+ tsc_end = rte_rdtsc();
+ get_lcores_stats(lcore, tsc_end, RSYS_LCORE_APP);
+ app_stats[lcore][port][tx_queueid][RSYS_TX].core_cycles[current_index] += (tsc_end - tsc_start);
+
+#endif
+
+ if (unlikely(n_pkts < bsz_wr)) {
+ uint32_t k;
+#if APP_STATS
+ remove_stats(lp->mbuf_out[port].array, bsz_wr - n_pkts, lcore, port, tx_queueid, RSYS_TX);
+#endif
+
+ for (k = n_pkts; k < bsz_wr; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 0;
+
+ return 0;
+}
+
+
+
+#endif
+
+
+#if 0
+
+static inline void
+app_lcore_worker(
+ struct app_lcore_params_worker *lp,
+ uint32_t bsz_rd)
+{
+ uint32_t i,n;
+#if APP_STATS
+ unsigned lcore_id = rte_lcore_id();
+ uint64_t tsc_start, tsc_end;
+#endif
+
+ for (i = 0; i < lp->n_rings_in; i ++)
+ {
+ struct rte_ring *ring_in = lp->rings_in[i];
+ uint32_t j;
+
+#if APP_STATS
+ tsc_start = rte_rdtsc();
+ get_lcores_stats(lcore_id, tsc_start, RSYS_LCORE_RX);
+
+#endif
+
+ n = rte_ring_sc_dequeue_burst(
+ ring_in,
+ (void **) lp->mbuf_in.array,
+ bsz_rd);
+
+ if (unlikely( n == 0)) {
+ continue;
+ }
+#if APP_STATS
+ tsc_end = rte_rdtsc();
+ get_lcores_stats(lcore_id, tsc_end, RSYS_LCORE_APP);
+ app_stats[lcore_id][lcore_conf[lcore_id][i].port_id][lcore_conf[lcore_id][i].queue_id][RSYS_RX].core_cycles[current_index] += (tsc_end - tsc_start);
+ get_stats(lp->mbuf_in.array, n, lcore_id, 0, 0, RSYS_RX);
+#endif
+ //usleep(5000);
+
+#if APP_WORKER_DROP_ALL_PACKETS
+ for (j = 0; j < n; j ++) {
+ struct rte_mbuf *pkt = lp->mbuf_in.array[j];
+ rte_pktmbuf_free(pkt);
+ }
+
+ continue;
+#endif
+
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[0], unsigned char *));
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[1]);
+
+ for (j = 0; j < n; j ++) {
+ struct rte_mbuf *pkt;
+ uint8_t port;
+
+ if (likely(j < n - 1)) {
+ APP_WORKER_PREFETCH1(rte_pktmbuf_mtod(lp->mbuf_in.array[j+1], unsigned char *));
+ }
+ if (likely(j < n - 2)) {
+ APP_WORKER_PREFETCH0(lp->mbuf_in.array[j+2]);
+ }
+
+ pkt = lp->mbuf_in.array[j];
+
+ if(app_pkt_process(pkt, &port)) {
+ struct rte_mbuf *pkt = lp->mbuf_in.array[j];
+ rte_pktmbuf_free(pkt);
+ }
+
+ }
+ }
+}
+
+static inline void
+app_lcore_worker_flush(struct app_lcore_params_worker *lp)
+{
+ uint32_t port;
+ uint32_t lcore = rte_lcore_id();
+ uint16_t tx_queueid = lcore % app.worker_core_num;
+
+ uint32_t n_pkts;
+#if APP_STATS
+ uint64_t tsc_start, tsc_end;
+#endif
+
+
+ for (port = 0; port < APP_MAX_NIC_PORTS; port ++) {
+ if (likely((lp->mbuf_out_flush[port] == 0) ||
+ (lp->mbuf_out[port].n_mbufs == 0))) {
+ lp->mbuf_out_flush[port] = 1;
+ continue;
+ }
+
+#if APP_STATS
+ get_stats(lp->mbuf_out[port].array, lp->mbuf_out[port].n_mbufs, lcore, port, tx_queueid, RSYS_TX);
+ tsc_start = rte_rdtsc();
+ get_lcores_stats(lcore, tsc_start, RSYS_LCORE_TX);
+
+#endif
+ n_pkts = rte_eth_tx_burst(
+ port,
+ tx_queueid,
+ lp->mbuf_out[port].array,
+ lp->mbuf_out[port].n_mbufs);
+
+#ifdef APP_DEBUG
+ //DEBUG:
+ int debugi;
+ fprintf(stderr,"ETH_TX_BURST/WK_FL-n_mbufs:%d\n",n_pkts);
+ for(debugi = 0; debugi < (int)n_pkts; debugi++)
+ fprintf(stderr,"ETH_TX_BURST/WK_FL:%d(%d)\n",
+ lp->mbuf_out[port].array[debugi]->pkt.data_len,
+ lcore);
+#endif
+
+#if APP_STATS
+ tsc_end = rte_rdtsc();
+ get_lcores_stats(lcore, tsc_end, RSYS_LCORE_APP);
+ app_stats[lcore][port][tx_queueid][RSYS_TX].core_cycles[current_index] += (tsc_end - tsc_start);
+
+#endif
+
+ if (unlikely(n_pkts < lp->mbuf_out[port].n_mbufs)) {
+ uint32_t k;
+#if APP_STATS
+ remove_stats(lp->mbuf_out[port].array, lp->mbuf_out[port].n_mbufs - n_pkts, lcore, port, tx_queueid, RSYS_TX);
+#endif
+
+ for (k = n_pkts; k < lp->mbuf_out[port].n_mbufs; k ++) {
+ struct rte_mbuf *pkt_to_free = lp->mbuf_out[port].array[k];
+ rte_pktmbuf_free(pkt_to_free);
+ }
+ }
+ lp->mbuf_out[port].n_mbufs = 0;
+ lp->mbuf_out_flush[port] = 1;
+ }
+}
+
+
+#endif
+
+static void
+app_lcore_main_loop_worker(void) {
+ uint32_t lcore = rte_lcore_id();
+ struct app_lcore_params_worker *lp = &app.lcore_params[lcore].worker;
+ uint64_t i = 0;
+
+ uint32_t bsz_rd = app.burst_size_worker_read;
+ uint32_t bsz_wr = app.burst_size_worker_write;
+
+ for ( ; ; ) {
+ if (APP_LCORE_WORKER_FLUSH && (unlikely(i == APP_LCORE_WORKER_FLUSH))) {
+ app_lcore_worker_flush(lp);
+ i = 0;
+ }
+
+ app_lcore_worker(lp, bsz_rd, bsz_wr);
+ i ++;
+ }
+}
+
+int
+app_lcore_main_loop(__attribute__((unused)) void *arg)
+{
+ struct app_lcore_params *lp;
+ unsigned lcore;
+
+ lcore = rte_lcore_id();
+ lp = &app.lcore_params[lcore];
+
+ if (lp->type == e_APP_LCORE_WORKER) {
+ printf("Logical core %u (worker %u) main loop.\n",
+ lcore,
+ (unsigned) lp->worker.worker_id);
+ app_lcore_main_loop_worker();
+ }
+
+ return 0;
+}