summaryrefslogtreecommitdiff
path: root/common/src/packet_io.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/packet_io.cpp')
-rw-r--r--common/src/packet_io.cpp280
1 files changed, 280 insertions, 0 deletions
diff --git a/common/src/packet_io.cpp b/common/src/packet_io.cpp
new file mode 100644
index 0000000..0209514
--- /dev/null
+++ b/common/src/packet_io.cpp
@@ -0,0 +1,280 @@
+#include <sched.h>
+#include <stdlib.h>
+#include <string.h>
+#include <assert.h>
+#include <netinet/ether.h>
+#include <MESA/MESA_prof_load.h>
+
+#include "log.h"
+#include "packet_io.h"
+
+#define MAX_RX_BURST 128
+
+#define LOG_PKTIO "PACKET_IO"
+#define MIN(a, b) ((a) > (b) ? (b) : (a))
+
+struct config
+{
+ int thread_num;
+ int cpu_mask[MAX_THREAD_NUM];
+
+ int rx_burst_max;
+ int bypass_traffic;
+
+ char app_symbol[256];
+ char app_device[256];
+};
+
+struct packet_io
+{
+ struct config config;
+
+ struct mr_vdev *mr_dev;
+ struct mr_sendpath *mr_path;
+ struct mr_instance *mr_instance;
+
+ void *cb_args;
+ packet_handle_cb *callback;
+};
+
+static int packet_io_config(const char *profile, struct config *config)
+{
+ MESA_load_profile_int_def(profile, "PACKET_IO", "thread_num", (int *)&(config->thread_num), 1);
+ MESA_load_profile_uint_range(profile, "PACKET_IO", "cpu_mask", MAX_THREAD_NUM, (unsigned int *)config->cpu_mask);
+
+ MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1);
+ MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_traffic", (int *)&(config->bypass_traffic), 0);
+
+ MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol));
+ MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_device", config->app_device, sizeof(config->app_device));
+
+ config->thread_num = MIN(config->thread_num, MAX_THREAD_NUM);
+
+ if (config->rx_burst_max > MAX_RX_BURST)
+ {
+ LOG_ERROR("%s: invalid rx_burst_max, exceeds limit %d", LOG_PKTIO, MAX_RX_BURST);
+ return -1;
+ }
+
+ if (strlen(config->app_symbol) == 0)
+ {
+ LOG_ERROR("%s: invalid app_symbol in %s", LOG_PKTIO, profile);
+ return -1;
+ }
+
+ if (strlen(config->app_device) == 0)
+ {
+ LOG_ERROR("%s: invalid app_device in %s", LOG_PKTIO, profile);
+ return -1;
+ }
+
+ LOG_DEBUG("%s: PACKET_IO->thread_num : %d", LOG_PKTIO, config->thread_num);
+ LOG_DEBUG("%s: PACKET_IO->rx_burst_max : %d", LOG_PKTIO, config->rx_burst_max);
+ LOG_DEBUG("%s: PACKET_IO->bypass_traffic : %d", LOG_PKTIO, config->bypass_traffic);
+ LOG_DEBUG("%s: PACKET_IO->app_symbol : %s", LOG_PKTIO, config->app_symbol);
+ LOG_DEBUG("%s: PACKET_IO->app_device : %s", LOG_PKTIO, config->app_device);
+
+ return 0;
+}
+
+static int marsio_buff_is_keepalive(char *raw_data, int raw_len)
+{
+ if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr)))
+ {
+ return 0;
+ }
+
+ struct ethhdr *eth_hdr = (struct ethhdr *)raw_data;
+ if (eth_hdr->h_proto == 0xAAAA)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+struct packet_io *packet_io_create(const char *profile)
+{
+ int opt = 1;
+ cpu_set_t coremask;
+ struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io));
+ assert(handle != NULL);
+
+ if (packet_io_config(profile, &(handle->config)) != 0)
+ {
+ goto error_out;
+ }
+
+ CPU_ZERO(&coremask);
+ for (int i = 0; i < handle->config.thread_num; i++)
+ {
+ CPU_SET(handle->config.cpu_mask[i], &coremask);
+ }
+
+ handle->mr_instance = marsio_create();
+ if (handle->mr_instance == NULL)
+ {
+ LOG_ERROR("%s: unable to create marsio instance", LOG_PKTIO);
+ goto error_out;
+ }
+
+ if (marsio_option_set(handle->mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(cpu_set_t)) != 0)
+ {
+ LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_PKTIO);
+ goto error_out;
+ }
+
+ if (marsio_option_set(handle->mr_instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0)
+ {
+ LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_PKTIO);
+ goto error_out;
+ }
+
+ if (marsio_init(handle->mr_instance, handle->config.app_symbol) != 0)
+ {
+ LOG_ERROR("%s: unable to initialize marsio instance", LOG_PKTIO);
+ goto error_out;
+ }
+
+ handle->mr_dev = marsio_open_device(handle->mr_instance, handle->config.app_device, handle->config.thread_num, handle->config.thread_num);
+ if (handle->mr_dev == NULL)
+ {
+ LOG_ERROR("%s: unable to open device %s", LOG_PKTIO, handle->config.app_device);
+ goto error_out;
+ }
+
+ handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev);
+ if (handle->mr_path == NULL)
+ {
+ LOG_ERROR("%s: unable to create sendpath for device %s", LOG_PKTIO, handle->config.app_device);
+ goto error_out;
+ }
+
+ return handle;
+
+error_out:
+ packet_io_destory(handle);
+ return NULL;
+}
+
+void packet_io_destory(struct packet_io *handle)
+{
+ if (handle)
+ {
+ if (handle->mr_path)
+ {
+ marsio_sendpath_destory(handle->mr_path);
+ handle->mr_path = NULL;
+ }
+
+ if (handle->mr_dev)
+ {
+ marsio_close_device(handle->mr_dev);
+ handle->mr_dev = NULL;
+ }
+
+ if (handle->mr_instance)
+ {
+ marsio_destory(handle->mr_instance);
+ handle->mr_instance = NULL;
+ }
+
+ free(handle);
+ handle = NULL;
+ }
+}
+
+void packet_io_set_callback(struct packet_io *handle, packet_handle_cb *cb, void *args)
+{
+ handle->callback = cb;
+ handle->cb_args = args;
+}
+
+int packet_io_thread_init(struct packet_io *handle, int thread_index)
+{
+ if (marsio_thread_init(handle->mr_instance) != 0)
+ {
+ LOG_ERROR("%s: unable to init marsio thread %d", LOG_PKTIO, thread_index);
+ return -1;
+ }
+
+ return 0;
+}
+
+void packet_io_thread_wait(struct packet_io *handle, int thread_index, int timeout_ms)
+{
+ struct mr_vdev *vdevs[] = {
+ handle->mr_dev,
+ };
+
+ marsio_poll_wait(handle->mr_instance, vdevs, 1, thread_index, timeout_ms);
+}
+
+int packet_io_thread_polling(struct packet_io *handle, int thread_index)
+{
+ int raw_len;
+ char *raw_data;
+ marsio_buff_t *rx_buff;
+ marsio_buff_t *rx_buffs[MAX_RX_BURST];
+ int nr_recv = marsio_recv_burst(handle->mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max);
+ if (nr_recv <= 0)
+ {
+ return 0;
+ }
+
+ if (handle->config.bypass_traffic == 1)
+ {
+ marsio_send_burst(handle->mr_path, thread_index, rx_buffs, nr_recv);
+ return nr_recv;
+ }
+
+ for (int j = 0; j < nr_recv; j++)
+ {
+ rx_buff = rx_buffs[j];
+ raw_len = marsio_buff_datalen(rx_buff);
+ raw_data = marsio_buff_mtod(rx_buff);
+
+ if (marsio_buff_is_keepalive(raw_data, raw_len))
+ {
+ marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1);
+ }
+ else if (marsio_buff_is_ctrlbuf(rx_buff))
+ {
+ marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1);
+ }
+ else
+ {
+ if (handle->callback)
+ {
+ if (handle->callback(raw_data, raw_len, handle->cb_args) == ACTION_BYPASS)
+ {
+ marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1);
+ }
+ else
+ {
+ marsio_buff_free(handle->mr_instance, &rx_buff, 1, 0, thread_index);
+ }
+ }
+ else
+ {
+ marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1);
+ }
+ }
+ }
+
+ return nr_recv;
+}
+
+int packet_io_thread_number(struct packet_io *handle)
+{
+ if (handle)
+ {
+ return handle->config.thread_num;
+ }
+ else
+ {
+ return 0;
+ }
+} \ No newline at end of file