From e34aa3f5e23d7fa0b95944269c499d5c1e7c23aa Mon Sep 17 00:00:00 2001 From: luwenpeng Date: Wed, 9 Aug 2023 18:47:16 +0800 Subject: TSG-16531 PacketAdapter适配容器环境,使用mrzcpd收包,通过RAW Socket注RST包 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- common/src/packet_io.cpp | 280 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 280 insertions(+) create mode 100644 common/src/packet_io.cpp (limited to 'common/src/packet_io.cpp') diff --git a/common/src/packet_io.cpp b/common/src/packet_io.cpp new file mode 100644 index 0000000..0209514 --- /dev/null +++ b/common/src/packet_io.cpp @@ -0,0 +1,280 @@ +#include +#include +#include +#include +#include +#include + +#include "log.h" +#include "packet_io.h" + +#define MAX_RX_BURST 128 + +#define LOG_PKTIO "PACKET_IO" +#define MIN(a, b) ((a) > (b) ? (b) : (a)) + +struct config +{ + int thread_num; + int cpu_mask[MAX_THREAD_NUM]; + + int rx_burst_max; + int bypass_traffic; + + char app_symbol[256]; + char app_device[256]; +}; + +struct packet_io +{ + struct config config; + + struct mr_vdev *mr_dev; + struct mr_sendpath *mr_path; + struct mr_instance *mr_instance; + + void *cb_args; + packet_handle_cb *callback; +}; + +static int packet_io_config(const char *profile, struct config *config) +{ + MESA_load_profile_int_def(profile, "PACKET_IO", "thread_num", (int *)&(config->thread_num), 1); + MESA_load_profile_uint_range(profile, "PACKET_IO", "cpu_mask", MAX_THREAD_NUM, (unsigned int *)config->cpu_mask); + + MESA_load_profile_int_def(profile, "PACKET_IO", "rx_burst_max", (int *)&(config->rx_burst_max), 1); + MESA_load_profile_int_def(profile, "PACKET_IO", "bypass_traffic", (int *)&(config->bypass_traffic), 0); + + MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_symbol", config->app_symbol, sizeof(config->app_symbol)); + MESA_load_profile_string_nodef(profile, "PACKET_IO", "app_device", config->app_device, sizeof(config->app_device)); + + config->thread_num = MIN(config->thread_num, MAX_THREAD_NUM); + + if (config->rx_burst_max > MAX_RX_BURST) + { + LOG_ERROR("%s: invalid rx_burst_max, exceeds limit %d", LOG_PKTIO, MAX_RX_BURST); + return -1; + } + + if (strlen(config->app_symbol) == 0) + { + LOG_ERROR("%s: invalid app_symbol in %s", LOG_PKTIO, profile); + return -1; + } + + if (strlen(config->app_device) == 0) + { + LOG_ERROR("%s: invalid app_device in %s", LOG_PKTIO, profile); + return -1; + } + + LOG_DEBUG("%s: PACKET_IO->thread_num : %d", LOG_PKTIO, config->thread_num); + LOG_DEBUG("%s: PACKET_IO->rx_burst_max : %d", LOG_PKTIO, config->rx_burst_max); + LOG_DEBUG("%s: PACKET_IO->bypass_traffic : %d", LOG_PKTIO, config->bypass_traffic); + LOG_DEBUG("%s: PACKET_IO->app_symbol : %s", LOG_PKTIO, config->app_symbol); + LOG_DEBUG("%s: PACKET_IO->app_device : %s", LOG_PKTIO, config->app_device); + + return 0; +} + +static int marsio_buff_is_keepalive(char *raw_data, int raw_len) +{ + if (raw_data == NULL || raw_len < (int)(sizeof(struct ethhdr))) + { + return 0; + } + + struct ethhdr *eth_hdr = (struct ethhdr *)raw_data; + if (eth_hdr->h_proto == 0xAAAA) + { + return 1; + } + else + { + return 0; + } +} + +struct packet_io *packet_io_create(const char *profile) +{ + int opt = 1; + cpu_set_t coremask; + struct packet_io *handle = (struct packet_io *)calloc(1, sizeof(struct packet_io)); + assert(handle != NULL); + + if (packet_io_config(profile, &(handle->config)) != 0) + { + goto error_out; + } + + CPU_ZERO(&coremask); + for (int i = 0; i < handle->config.thread_num; i++) + { + CPU_SET(handle->config.cpu_mask[i], &coremask); + } + + handle->mr_instance = marsio_create(); + if (handle->mr_instance == NULL) + { + LOG_ERROR("%s: unable to create marsio instance", LOG_PKTIO); + goto error_out; + } + + if (marsio_option_set(handle->mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(cpu_set_t)) != 0) + { + LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_PKTIO); + goto error_out; + } + + if (marsio_option_set(handle->mr_instance, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt)) != 0) + { + LOG_ERROR("%s: unable to set MARSIO_OPT_EXIT_WHEN_ERR option for marsio instance", LOG_PKTIO); + goto error_out; + } + + if (marsio_init(handle->mr_instance, handle->config.app_symbol) != 0) + { + LOG_ERROR("%s: unable to initialize marsio instance", LOG_PKTIO); + goto error_out; + } + + handle->mr_dev = marsio_open_device(handle->mr_instance, handle->config.app_device, handle->config.thread_num, handle->config.thread_num); + if (handle->mr_dev == NULL) + { + LOG_ERROR("%s: unable to open device %s", LOG_PKTIO, handle->config.app_device); + goto error_out; + } + + handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev); + if (handle->mr_path == NULL) + { + LOG_ERROR("%s: unable to create sendpath for device %s", LOG_PKTIO, handle->config.app_device); + goto error_out; + } + + return handle; + +error_out: + packet_io_destory(handle); + return NULL; +} + +void packet_io_destory(struct packet_io *handle) +{ + if (handle) + { + if (handle->mr_path) + { + marsio_sendpath_destory(handle->mr_path); + handle->mr_path = NULL; + } + + if (handle->mr_dev) + { + marsio_close_device(handle->mr_dev); + handle->mr_dev = NULL; + } + + if (handle->mr_instance) + { + marsio_destory(handle->mr_instance); + handle->mr_instance = NULL; + } + + free(handle); + handle = NULL; + } +} + +void packet_io_set_callback(struct packet_io *handle, packet_handle_cb *cb, void *args) +{ + handle->callback = cb; + handle->cb_args = args; +} + +int packet_io_thread_init(struct packet_io *handle, int thread_index) +{ + if (marsio_thread_init(handle->mr_instance) != 0) + { + LOG_ERROR("%s: unable to init marsio thread %d", LOG_PKTIO, thread_index); + return -1; + } + + return 0; +} + +void packet_io_thread_wait(struct packet_io *handle, int thread_index, int timeout_ms) +{ + struct mr_vdev *vdevs[] = { + handle->mr_dev, + }; + + marsio_poll_wait(handle->mr_instance, vdevs, 1, thread_index, timeout_ms); +} + +int packet_io_thread_polling(struct packet_io *handle, int thread_index) +{ + int raw_len; + char *raw_data; + marsio_buff_t *rx_buff; + marsio_buff_t *rx_buffs[MAX_RX_BURST]; + int nr_recv = marsio_recv_burst(handle->mr_dev, thread_index, rx_buffs, handle->config.rx_burst_max); + if (nr_recv <= 0) + { + return 0; + } + + if (handle->config.bypass_traffic == 1) + { + marsio_send_burst(handle->mr_path, thread_index, rx_buffs, nr_recv); + return nr_recv; + } + + for (int j = 0; j < nr_recv; j++) + { + rx_buff = rx_buffs[j]; + raw_len = marsio_buff_datalen(rx_buff); + raw_data = marsio_buff_mtod(rx_buff); + + if (marsio_buff_is_keepalive(raw_data, raw_len)) + { + marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1); + } + else if (marsio_buff_is_ctrlbuf(rx_buff)) + { + marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1); + } + else + { + if (handle->callback) + { + if (handle->callback(raw_data, raw_len, handle->cb_args) == ACTION_BYPASS) + { + marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1); + } + else + { + marsio_buff_free(handle->mr_instance, &rx_buff, 1, 0, thread_index); + } + } + else + { + marsio_send_burst(handle->mr_path, thread_index, &rx_buff, 1); + } + } + } + + return nr_recv; +} + +int packet_io_thread_number(struct packet_io *handle) +{ + if (handle) + { + return handle->config.thread_num; + } + else + { + return 0; + } +} \ No newline at end of file -- cgit v1.2.3