summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-08-30 10:21:44 +0800
committerluwenpeng <[email protected]>2024-08-30 18:56:35 +0800
commitd1d5e6e09e92361f16586f2f611351c45b2b932b (patch)
treea49dffe75f62cb8b1f5fde0fd734e77ce76554fb
parent8935e5408b58885773e7db04403c2ae3fca590c7 (diff)
refactor packet IO, rename dumpfile mode to pcap mode, modify related configuration
-rw-r--r--conf/stellar.toml12
-rw-r--r--infra/core/stellar_config.c163
-rw-r--r--infra/core/stellar_config.h5
-rw-r--r--infra/core/stellar_core.c31
-rw-r--r--infra/packet_io/CMakeLists.txt2
-rw-r--r--infra/packet_io/dumpfile_io.h25
-rw-r--r--infra/packet_io/marsio_io.c156
-rw-r--r--infra/packet_io/marsio_io.h20
-rw-r--r--infra/packet_io/packet_io.c345
-rw-r--r--infra/packet_io/packet_io.h42
-rw-r--r--infra/packet_io/pcap_io.c (renamed from infra/packet_io/dumpfile_io.c)161
-rw-r--r--infra/packet_io/pcap_io.h24
-rw-r--r--test/decoders/http/test_based_on_stellar/env/stellar.toml12
-rw-r--r--test/lpi_plugin/CMakeLists.txt4
-rw-r--r--test/packet_inject/conf/stellar.toml12
-rw-r--r--test/packet_inject/packet_inject_test.h10
16 files changed, 528 insertions, 496 deletions
diff --git a/conf/stellar.toml b/conf/stellar.toml
index 360e6fe..880fad6 100644
--- a/conf/stellar.toml
+++ b/conf/stellar.toml
@@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[packet_io]
-mode = "dumpfile" # dumpfile, dumpfilelist, marsio
+mode = "pcapfile" # pcapfile, pcaplist, marsio
app_symbol = "stellar"
dev_symbol = "nf_0_fw"
-
-dumpfile_path = "/tmp/dumpfile/dumpfile.pcap"
-#dumpfile_path = "/tmp/dumpfile/dumpfilelist"
-
-nr_worker_thread = 1 # [1, 256]
+pcap_path = "/tmp/test.pcap"
+nr_worker_thread = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
+idle_yield_interval_ms = 900 # range: [0, 60000] (ms)
[ip_reassembly]
enable = 1
@@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096]
[schedule]
merge_stat_interval = 500 # range: [1, 60000] (ms)
output_stat_interval = 2000 # range: [1, 60000] (ms)
-
-packet_io_yield_interval = 900 # range: [1, 60000] (ms)
diff --git a/infra/core/stellar_config.c b/infra/core/stellar_config.c
index d1511fe..b847d11 100644
--- a/infra/core/stellar_config.c
+++ b/infra/core/stellar_config.c
@@ -44,132 +44,6 @@ static int parse_snowflake_section(toml_table_t *root, struct snowflake_options
// return 0: success
// retuun -1: failed
-static int parse_packet_io_section(toml_table_t *root, struct packet_io_options *opts)
-{
- int ret = -1;
- char *ptr_mode = NULL;
- char *ptr_dumpfile_path = NULL;
- char *ptr_app_symbol = NULL;
- char *ptr_dev_symbol = NULL;
- const char *ptr;
- toml_table_t *table;
- toml_array_t *mask_array;
-
- table = toml_table_in(root, "packet_io");
- if (table == NULL)
- {
- CONFIG_LOG_ERROR("config file missing packet_io section");
- goto error_out;
- }
-
- ptr = toml_raw_in(table, "mode");
- if (ptr == NULL || toml_rtos(ptr, &ptr_mode) != 0)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->mode");
- goto error_out;
- }
- if (strcmp(ptr_mode, "dumpfile") == 0)
- {
- opts->mode = PACKET_IO_DUMPFILE;
- }
- else if (strcmp(ptr_mode, "dumpfilelist") == 0)
- {
- opts->mode = PACKET_IO_DUMPFILELIST;
- }
- else if (strcmp(ptr_mode, "marsio") == 0)
- {
- opts->mode = PACKET_IO_MARSIO;
- }
- else
- {
- CONFIG_LOG_ERROR("config file invalid packet_io->mode %s, only support dumpfile and marsio", ptr);
- goto error_out;
- }
-
- if (opts->mode == PACKET_IO_DUMPFILE || opts->mode == PACKET_IO_DUMPFILELIST)
- {
- ptr = toml_raw_in(table, "dumpfile_path");
- if (ptr == NULL || toml_rtos(ptr, &ptr_dumpfile_path) != 0)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->dumpfile_path");
- goto error_out;
- }
- strcpy(opts->dumpfile_path, ptr_dumpfile_path);
- }
- else
- {
- ptr = toml_raw_in(table, "app_symbol");
- if (ptr == NULL || toml_rtos(ptr, &ptr_app_symbol) != 0)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->app_symbol");
- goto error_out;
- }
- strcpy(opts->app_symbol, ptr_app_symbol);
-
- ptr = toml_raw_in(table, "dev_symbol");
- if (ptr == NULL || toml_rtos(ptr, &ptr_dev_symbol) != 0)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->dev_symbol");
- goto error_out;
- }
- strcpy(opts->dev_symbol, ptr_dev_symbol);
- }
-
- ptr = toml_raw_in(table, "nr_worker_thread");
- if (ptr == NULL)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->nr_worker_thread");
- goto error_out;
- }
- if (atoi(ptr) <= 0 || atoi(ptr) > MAX_THREAD_NUM)
- {
- CONFIG_LOG_ERROR("config file invalid packet_io->nr_worker_thread %d, range [1, %d]", atoi(ptr), MAX_THREAD_NUM);
- goto error_out;
- }
- opts->nr_worker_thread = atoi(ptr);
-
- mask_array = toml_array_in(table, "cpu_mask");
- if (mask_array == NULL)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->cpu_mask");
- goto error_out;
- }
- for (uint16_t i = 0; i < opts->nr_worker_thread; i++)
- {
- ptr = toml_raw_at(mask_array, i);
- if (ptr == NULL)
- {
- CONFIG_LOG_ERROR("config file missing packet_io->cpu_mask[%d]", i);
- goto error_out;
- }
- opts->cpu_mask[i] = atoi(ptr);
- }
-
- ret = 0;
-
-error_out:
- if (ptr_mode)
- {
- free(ptr_mode);
- }
- if (ptr_dumpfile_path)
- {
- free(ptr_dumpfile_path);
- }
- if (ptr_app_symbol)
- {
- free(ptr_app_symbol);
- }
- if (ptr_dev_symbol)
- {
- free(ptr_dev_symbol);
- }
-
- return ret;
-}
-
-// return 0: success
-// retuun -1: failed
static int parse_schedule_options(toml_table_t *root, struct schedule_options *opts)
{
const char *ptr;
@@ -208,19 +82,6 @@ static int parse_schedule_options(toml_table_t *root, struct schedule_options *o
return -1;
}
- ptr = toml_raw_in(table, "packet_io_yield_interval");
- if (ptr == NULL)
- {
- CONFIG_LOG_ERROR("config file missing schedule->packet_io_yield_interval");
- return -1;
- }
- opts->packet_io_yield_interval = atoll(ptr);
- if (opts->packet_io_yield_interval < 1 || opts->packet_io_yield_interval > 60000)
- {
- CONFIG_LOG_ERROR("config file invalid schedule->packet_io_yield_interval %ld, range [1, 60000]", opts->packet_io_yield_interval);
- return -1;
- }
-
return 0;
}
@@ -252,11 +113,6 @@ int stellar_config_load(struct stellar_config *config, const char *file)
goto error_out;
}
- if (parse_packet_io_section(table, &config->pkt_io_opts) != 0)
- {
- goto error_out;
- }
-
if (parse_schedule_options(table, &config->sched_opts) != 0)
{
goto error_out;
@@ -285,32 +141,13 @@ void stellar_config_print(const struct stellar_config *config)
return;
}
- const struct packet_io_options *pkt_io_opts = &config->pkt_io_opts;
const struct snowflake_options *snowflake_opts = &config->snowflake_opts;
// snowflake config
CONFIG_LOG_DEBUG("snowflake->snowflake_base : %d", snowflake_opts->snowflake_base);
CONFIG_LOG_DEBUG("snowflake->snowflake_offset : %d", snowflake_opts->snowflake_offset);
- // packet io config
- CONFIG_LOG_DEBUG("packet_io->mode : %s", pkt_io_opts->mode == PACKET_IO_DUMPFILE ? "dumpfile" : (pkt_io_opts->mode == PACKET_IO_DUMPFILELIST ? "dumpfilelist" : "marsio"));
- if (pkt_io_opts->mode == PACKET_IO_DUMPFILE || pkt_io_opts->mode == PACKET_IO_DUMPFILELIST)
- {
- CONFIG_LOG_DEBUG("packet_io->dumpfile_path : %s", pkt_io_opts->dumpfile_path);
- }
- else
- {
- CONFIG_LOG_DEBUG("packet_io->app_symbol : %s", pkt_io_opts->app_symbol);
- CONFIG_LOG_DEBUG("packet_io->dev_symbol : %s", pkt_io_opts->dev_symbol);
- }
- CONFIG_LOG_DEBUG("packet_io->nr_worker_thread : %d", pkt_io_opts->nr_worker_thread);
- for (uint16_t i = 0; i < pkt_io_opts->nr_worker_thread; i++)
- {
- CONFIG_LOG_DEBUG("packet_io->cpu_mask[%3d] : %d", i, pkt_io_opts->cpu_mask[i]);
- }
-
// schedule config
CONFIG_LOG_DEBUG("schedule->merge_stat_interval : %ld", config->sched_opts.merge_stat_interval);
CONFIG_LOG_DEBUG("schedule->output_stat_interval : %ld", config->sched_opts.output_stat_interval);
- CONFIG_LOG_DEBUG("schedule->packet_io_yield_interval : %ld", config->sched_opts.packet_io_yield_interval);
}
diff --git a/infra/core/stellar_config.h b/infra/core/stellar_config.h
index 1e5f01e..6cd0a08 100644
--- a/infra/core/stellar_config.h
+++ b/infra/core/stellar_config.h
@@ -5,14 +5,10 @@ extern "C"
{
#endif
-#include "packet_io.h"
-
struct schedule_options
{
uint64_t merge_stat_interval; // range: [1, 60000] (ms)
uint64_t output_stat_interval; // range: [1, 60000] (ms)
-
- uint64_t packet_io_yield_interval; // range: [1, 60000] (ms)
};
struct snowflake_options
@@ -23,7 +19,6 @@ struct snowflake_options
struct stellar_config
{
- struct packet_io_options pkt_io_opts;
struct snowflake_options snowflake_opts;
struct schedule_options sched_opts;
};
diff --git a/infra/core/stellar_core.c b/infra/core/stellar_core.c
index 83e9531..8545c7e 100644
--- a/infra/core/stellar_core.c
+++ b/infra/core/stellar_core.c
@@ -60,6 +60,7 @@ struct stellar_runtime
struct stellar_thread threads[MAX_THREAD_NUM];
struct session_manager_config *sess_mgr_cfg;
struct ip_reassembly_config *ip_reass_cfg;
+ struct packet_io_config *pkt_io_cfg;
};
struct stellar
@@ -145,7 +146,6 @@ static void *worker_thread(void *arg)
};
uint64_t merge_stat_interval = config->sched_opts.merge_stat_interval;
- uint64_t packet_io_yield_interval = config->sched_opts.packet_io_yield_interval;
uint16_t thr_idx = thread->idx;
__current_thread_idx = thr_idx;
@@ -300,7 +300,7 @@ static void *worker_thread(void *arg)
if (nr_pkt_received == 0)
{
- packet_io_yield(packet_io, thr_idx, packet_io_yield_interval);
+ packet_io_yield(packet_io, thr_idx);
}
}
@@ -345,7 +345,7 @@ static int stellar_thread_init(struct stellar *st)
struct stellar_config *config = &st->config;
uint64_t now_ms = clock_get_real_time_ms();
- for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++)
+ for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
thread->idx = i;
@@ -383,10 +383,9 @@ static int stellar_thread_init(struct stellar *st)
static void stellar_thread_clean(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
- struct stellar_config *config = &st->config;
CORE_LOG_FATAL("cleaning worker thread context ...");
- for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++)
+ for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (ATOMIC_READ(&thread->is_runing) == 0)
@@ -402,9 +401,8 @@ static void stellar_thread_clean(struct stellar *st)
static int stellar_thread_run(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
- struct stellar_config *config = &st->config;
- for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++)
+ for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
if (pthread_create(&thread->tid, NULL, worker_thread, (void *)thread) < 0)
@@ -420,10 +418,9 @@ static int stellar_thread_run(struct stellar *st)
static void stellar_thread_join(struct stellar *st)
{
struct stellar_runtime *runtime = &st->runtime;
- struct stellar_config *config = &st->config;
CORE_LOG_FATAL("waiting worker thread stop ...");
- for (uint16_t i = 0; i < config->pkt_io_opts.nr_worker_thread; i++)
+ for (uint16_t i = 0; i < runtime->pkt_io_cfg->nr_worker_thread; i++)
{
struct stellar_thread *thread = &runtime->threads[i];
while (ATOMIC_READ(&thread->is_runing) == 1)
@@ -489,7 +486,14 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_ERROR("unable to create ip reassembly config");
goto error_out;
}
+ runtime->pkt_io_cfg = packet_io_config_new(st->stellar_cfg_file);
+ if (runtime->pkt_io_cfg == NULL)
+ {
+ CORE_LOG_ERROR("unable to create packet io config");
+ goto error_out;
+ }
+ packet_io_config_print(runtime->pkt_io_cfg);
session_manager_config_print(runtime->sess_mgr_cfg);
ip_reassembly_config_print(runtime->ip_reass_cfg);
if (stellar_config_load(config, st->stellar_cfg_file) != 0)
@@ -499,7 +503,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
}
stellar_config_print(config);
- runtime->stat = stellar_stat_new(config->pkt_io_opts.nr_worker_thread);
+ runtime->stat = stellar_stat_new(runtime->pkt_io_cfg->nr_worker_thread);
if (runtime->stat == NULL)
{
CORE_LOG_ERROR("unable to create stellar stat");
@@ -512,7 +516,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
goto error_out;
}
- runtime->packet_io = packet_io_new(&config->pkt_io_opts);
+ runtime->packet_io = packet_io_new(runtime->pkt_io_cfg);
if (runtime->packet_io == NULL)
{
CORE_LOG_ERROR("unable to create packet io");
@@ -563,7 +567,7 @@ void stellar_run(struct stellar *st)
}
usleep(1000); // 1ms
- // only available in dumpfile mode
+ // only available in pcap mode
if (packet_io_isbreak(runtime->packet_io))
{
ATOMIC_SET(&runtime->need_exit, 1);
@@ -587,6 +591,7 @@ void stellar_free(struct stellar *st)
packet_io_free(runtime->packet_io);
plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
+ packet_io_config_free(runtime->pkt_io_cfg);
ip_reassembly_config_free(runtime->ip_reass_cfg);
session_manager_config_free(runtime->sess_mgr_cfg);
CORE_LOG_FATAL("stellar exit\n");
@@ -675,7 +680,7 @@ void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
int stellar_get_worker_thread_num(struct stellar *st)
{
- return st->config.pkt_io_opts.nr_worker_thread;
+ return st->runtime.pkt_io_cfg->nr_worker_thread;
}
struct logger *stellar_get_logger(struct stellar *st)
diff --git a/infra/packet_io/CMakeLists.txt b/infra/packet_io/CMakeLists.txt
index 3d77732..4d0baa9 100644
--- a/infra/packet_io/CMakeLists.txt
+++ b/infra/packet_io/CMakeLists.txt
@@ -1,3 +1,3 @@
-add_library(packet_io dumpfile_io.c marsio_io.c packet_io.c)
+add_library(packet_io pcap_io.c marsio_io.c packet_io.c)
target_include_directories(packet_io PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_link_libraries(packet_io marsio pcap packet_parser) \ No newline at end of file
diff --git a/infra/packet_io/dumpfile_io.h b/infra/packet_io/dumpfile_io.h
deleted file mode 100644
index 05f4ccc..0000000
--- a/infra/packet_io/dumpfile_io.h
+++ /dev/null
@@ -1,25 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "packet_io.h"
-
-struct dumpfile_io;
-struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_worker_thread);
-void dumpfile_io_free(struct dumpfile_io *handle);
-int dumpfile_io_isbreak(struct dumpfile_io *handle);
-
-int dumpfile_io_init(struct dumpfile_io *handle, uint16_t thr_idx);
-uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void dumpfile_io_yield(struct dumpfile_io *handle, uint16_t thr_idx, uint64_t timeout_ms);
-struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx);
-
-#ifdef __cplusplus
-}
-#endif
diff --git a/infra/packet_io/marsio_io.c b/infra/packet_io/marsio_io.c
index 96d39fd..c7486d2 100644
--- a/infra/packet_io/marsio_io.c
+++ b/infra/packet_io/marsio_io.c
@@ -10,10 +10,11 @@
#include "packet_parser.h"
#include "packet_private.h"
-#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__)
+#define MARSIO_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "marsio", format, ##__VA_ARGS__)
struct marsio_io
{
+ struct packet_io_config cfg;
struct mr_instance *mr_ins;
struct mr_vdev *mr_dev;
struct mr_sendpath *mr_path;
@@ -41,7 +42,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get route ctx");
+ MARSIO_IO_LOG_ERROR("failed to get route ctx");
}
sids.used = marsio_buff_get_sid_list(mbuff, sids.sid, sizeof(sids.sid) / sizeof(sids.sid[0]));
@@ -51,7 +52,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get sids");
+ MARSIO_IO_LOG_ERROR("failed to get sids");
}
if (marsio_buff_get_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) == sizeof(session_id))
@@ -60,7 +61,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get session id");
+ MARSIO_IO_LOG_ERROR("failed to get session id");
}
// TODO
@@ -71,7 +72,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get domain id");
+ MARSIO_IO_LOG_ERROR("failed to get domain id");
}
#endif
@@ -81,7 +82,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get link id");
+ MARSIO_IO_LOG_ERROR("failed to get link id");
}
is_ctrl = marsio_buff_is_ctrlbuf(mbuff);
@@ -93,7 +94,7 @@ static void metadata_from_mbuff_to_packet(marsio_buff_t *mbuff, struct packet *p
}
else
{
- PACKET_IO_LOG_ERROR("failed to get direction");
+ MARSIO_IO_LOG_ERROR("failed to get direction");
}
packet_set_action(pkt, PACKET_ACTION_FORWARD);
@@ -116,30 +117,30 @@ static void metadata_from_packet_to_mbuff(struct packet *pkt, marsio_buff_t *mbu
if (marsio_buff_set_metadata(mbuff, MR_BUFF_ROUTE_CTX, (void *)route_ctx->data, route_ctx->used) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set route ctx");
+ MARSIO_IO_LOG_ERROR("failed to set route ctx");
}
if (marsio_buff_set_sid_list(mbuff, (sid_t *)sids->sid, sids->used) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set sids");
+ MARSIO_IO_LOG_ERROR("failed to set sids");
}
if (marsio_buff_set_metadata(mbuff, MR_BUFF_SESSION_ID, &session_id, sizeof(session_id)) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set session id");
+ MARSIO_IO_LOG_ERROR("failed to set session id");
}
// TODO
#if 0
if (marsio_buff_set_metadata(mbuff, MR_BUFF_DOMAIN, &domain, sizeof(domain)) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set domain id");
+ MARSIO_IO_LOG_ERROR("failed to set domain id");
}
#endif
if (marsio_buff_set_metadata(mbuff, MR_BUFF_LINK_ID, &link_id, sizeof(link_id)) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set link id");
+ MARSIO_IO_LOG_ERROR("failed to set link id");
}
if (is_ctrl)
@@ -149,7 +150,7 @@ static void metadata_from_packet_to_mbuff(struct packet *pkt, marsio_buff_t *mbu
if (marsio_buff_set_metadata(mbuff, MR_BUFF_DIR, &direction, sizeof(direction)) != 0)
{
- PACKET_IO_LOG_ERROR("failed to set direction");
+ MARSIO_IO_LOG_ERROR("failed to set direction");
}
}
@@ -175,46 +176,48 @@ static inline int is_keepalive_packet(const char *data, int len)
* Public API
******************************************************************************/
-struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_worker_thread)
+void *marsio_io_new(const struct packet_io_config *cfg)
{
int opt = 1;
cpu_set_t coremask;
CPU_ZERO(&coremask);
- for (uint16_t i = 0; i < nr_worker_thread; i++)
- {
- CPU_SET(cpu_mask[i], &coremask);
- }
struct marsio_io *handle = (struct marsio_io *)calloc(1, sizeof(struct marsio_io));
if (handle == NULL)
{
- PACKET_IO_LOG_ERROR("unable to allocate memory for marsio_io");
+ MARSIO_IO_LOG_ERROR("unable to allocate memory for marsio_io");
return NULL;
}
+ memcpy(&handle->cfg, cfg, sizeof(struct packet_io_config));
+
+ for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++)
+ {
+ CPU_SET(handle->cfg.cpu_mask[i], &coremask);
+ }
handle->mr_ins = marsio_create();
if (handle->mr_ins == NULL)
{
- PACKET_IO_LOG_ERROR("unable to create marsio instance");
+ MARSIO_IO_LOG_ERROR("unable to create marsio instance");
goto error_out;
}
marsio_option_set(handle->mr_ins, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &coremask, sizeof(cpu_set_t));
marsio_option_set(handle->mr_ins, MARSIO_OPT_EXIT_WHEN_ERR, &opt, sizeof(opt));
- if (marsio_init(handle->mr_ins, app_symbol) != 0)
+ if (marsio_init(handle->mr_ins, handle->cfg.app_symbol) != 0)
{
- PACKET_IO_LOG_ERROR("unable to init marsio instance");
+ MARSIO_IO_LOG_ERROR("unable to init marsio instance");
goto error_out;
}
- handle->mr_dev = marsio_open_device(handle->mr_ins, dev_symbol, nr_worker_thread, nr_worker_thread);
+ handle->mr_dev = marsio_open_device(handle->mr_ins, handle->cfg.dev_symbol, handle->cfg.nr_worker_thread, handle->cfg.nr_worker_thread);
if (handle->mr_dev == NULL)
{
- PACKET_IO_LOG_ERROR("unable to open marsio device");
+ MARSIO_IO_LOG_ERROR("unable to open marsio device");
goto error_out;
}
handle->mr_path = marsio_sendpath_create_by_vdev(handle->mr_dev);
if (handle->mr_path == NULL)
{
- PACKET_IO_LOG_ERROR("unable to create marsio sendpath");
+ MARSIO_IO_LOG_ERROR("unable to create marsio sendpath");
goto error_out;
}
@@ -225,58 +228,66 @@ error_out:
return NULL;
}
-void marsio_io_free(struct marsio_io *handle)
+void marsio_io_free(void *handle)
{
- if (handle)
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ if (mr_io)
{
- if (handle->mr_path)
+ if (mr_io->mr_path)
{
- marsio_sendpath_destory(handle->mr_path);
- handle->mr_path = NULL;
+ marsio_sendpath_destory(mr_io->mr_path);
+ mr_io->mr_path = NULL;
}
- if (handle->mr_dev)
+ if (mr_io->mr_dev)
{
- marsio_close_device(handle->mr_dev);
- handle->mr_dev = NULL;
+ marsio_close_device(mr_io->mr_dev);
+ mr_io->mr_dev = NULL;
}
- if (handle->mr_ins)
+ if (mr_io->mr_ins)
{
- marsio_destory(handle->mr_ins);
- handle->mr_ins = NULL;
+ marsio_destory(mr_io->mr_ins);
+ mr_io->mr_ins = NULL;
}
- free(handle);
- handle = NULL;
+ free(mr_io);
+ mr_io = NULL;
}
}
-int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx __attribute__((unused)))
+int marsio_io_isbreak(void *handle __attribute__((unused)))
{
- if (marsio_thread_init(handle->mr_ins) != 0)
+ return 0;
+}
+
+int marsio_io_init(void *handle, uint16_t thr_idx __attribute__((unused)))
+{
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ if (marsio_thread_init(mr_io->mr_ins) != 0)
{
- PACKET_IO_LOG_ERROR("unable to init marsio thread");
+ MARSIO_IO_LOG_ERROR("unable to init marsio thread");
return -1;
}
return 0;
}
-uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+uint16_t marsio_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
+ int len;
+ char *data;
+ uint16_t nr_packet_parsed = 0;
struct packet *pkt;
marsio_buff_t *mbuff;
marsio_buff_t *rx_buffs[RX_BURST_MAX];
- struct packet_io_stat *stat = &handle->stat[thr_idx];
- uint16_t nr_parsed = 0;
- int len;
- char *data;
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ struct packet_io_stat *stat = &mr_io->stat[thr_idx];
- int nr_recv = marsio_recv_burst(handle->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts));
- if (nr_recv <= 0)
+ int nr_packet_received = marsio_recv_burst(mr_io->mr_dev, thr_idx, rx_buffs, MIN(RX_BURST_MAX, nr_pkts));
+ if (nr_packet_received <= 0)
{
- return nr_parsed;
+ return nr_packet_parsed;
}
- for (int i = 0; i < nr_recv; i++)
+ for (int i = 0; i < nr_packet_received; i++)
{
mbuff = rx_buffs[i];
data = marsio_buff_mtod(mbuff);
@@ -293,11 +304,11 @@ uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct pack
stat->pkts_tx++;
stat->bytes_tx += len;
- marsio_send_burst(handle->mr_path, thr_idx, &mbuff, 1);
+ marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1);
continue;
}
- pkt = &pkts[nr_parsed++];
+ pkt = &pkts[nr_packet_parsed++];
packet_parse(pkt, data, len);
metadata_from_mbuff_to_packet(mbuff, pkt);
@@ -313,15 +324,16 @@ uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct pack
}
}
- return nr_parsed;
+ return nr_packet_parsed;
}
-void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+void marsio_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
+ int len;
struct packet *pkt;
marsio_buff_t *mbuff;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
- int len;
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ struct packet_io_stat *stat = &mr_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
@@ -346,16 +358,17 @@ void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet
stat->raw_bytes_tx += len;
}
- marsio_send_burst(handle->mr_path, thr_idx, &mbuff, 1);
+ marsio_send_burst(mr_io->mr_path, thr_idx, &mbuff, 1);
packet_free(pkt);
}
}
-void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
struct packet *pkt;
marsio_buff_t *mbuff;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ struct packet_io_stat *stat = &mr_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
@@ -365,29 +378,30 @@ void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *p
{
stat->pkts_dropped++;
stat->bytes_dropped += packet_get_raw_len(pkt);
- marsio_buff_free(handle->mr_ins, &mbuff, 1, 0, thr_idx);
+ marsio_buff_free(mr_io->mr_ins, &mbuff, 1, 0, thr_idx);
}
packet_free(pkt);
}
}
-uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
int len;
char *ptr;
uint16_t nr_inject = 0;
struct packet *pkt;
marsio_buff_t *mbuff;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ struct packet_io_stat *stat = &mr_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
pkt = &pkts[i];
len = packet_get_raw_len(pkt);
- if (marsio_buff_malloc_global(handle->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
+ if (marsio_buff_malloc_global(mr_io->mr_ins, &mbuff, 1, MARSIO_SOCKET_ID_ANY, MARSIO_LCORE_ID_ANY) < 0)
{
- PACKET_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
+ MARSIO_IO_LOG_ERROR("unable to allocate marsio buffer for inject packet");
continue;
}
@@ -406,23 +420,25 @@ uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct pac
memcpy(ptr, packet_get_raw_data(pkt), len);
metadata_from_packet_to_mbuff(pkt, mbuff);
- marsio_send_burst_with_options(handle->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
+ marsio_send_burst_with_options(mr_io->mr_path, thr_idx, &mbuff, 1, MARSIO_SEND_OPT_REHASH);
packet_free(pkt);
}
return nr_inject;
}
-void marsio_io_yield(struct marsio_io *handle, uint16_t thr_idx, uint64_t timeout_ms)
+void marsio_io_yield(void *handle, uint16_t thr_idx)
{
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
struct mr_vdev *vdevs[1] = {
- handle->mr_dev,
+ mr_io->mr_dev,
};
- marsio_poll_wait(handle->mr_ins, vdevs, 1, thr_idx, timeout_ms);
+ marsio_poll_wait(mr_io->mr_ins, vdevs, 1, thr_idx, mr_io->cfg.idle_yield_interval_ms);
}
-struct packet_io_stat *marsio_io_stat(struct marsio_io *handle, uint16_t thr_idx)
+struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx)
{
- return &handle->stat[thr_idx];
+ struct marsio_io *mr_io = (struct marsio_io *)handle;
+ return &mr_io->stat[thr_idx];
}
diff --git a/infra/packet_io/marsio_io.h b/infra/packet_io/marsio_io.h
index daccc4e..876618a 100644
--- a/infra/packet_io/marsio_io.h
+++ b/infra/packet_io/marsio_io.h
@@ -7,17 +7,17 @@ extern "C"
#include "packet_io.h"
-struct marsio_io;
-struct marsio_io *marsio_io_new(const char *app_symbol, const char *dev_symbol, uint16_t *cpu_mask, uint16_t nr_worker_thread);
-void marsio_io_free(struct marsio_io *handle);
+void *marsio_io_new(const struct packet_io_config *cfg);
+void marsio_io_free(void *handle);
+int marsio_io_isbreak(void *handle);
-int marsio_io_init(struct marsio_io *handle, uint16_t thr_idx);
-uint16_t marsio_io_input(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void marsio_io_output(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void marsio_io_drop(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t marsio_io_inject(struct marsio_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void marsio_io_yield(struct marsio_io *handle, uint16_t thr_idx, uint64_t timeout_ms);
-struct packet_io_stat *marsio_io_stat(struct marsio_io *handle, uint16_t thr_idx);
+int marsio_io_init(void *handle, uint16_t thr_idx);
+uint16_t marsio_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void marsio_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void marsio_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+uint16_t marsio_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void marsio_io_yield(void *handle, uint16_t thr_idx);
+struct packet_io_stat *marsio_io_stat(void *handle, uint16_t thr_idx);
#ifdef __cplusplus
}
diff --git a/infra/packet_io/packet_io.c b/infra/packet_io/packet_io.c
index 3bd00e4..42a9743 100644
--- a/infra/packet_io/packet_io.c
+++ b/infra/packet_io/packet_io.c
@@ -1,154 +1,341 @@
+#include <errno.h>
#include <stdlib.h>
#include <string.h>
+#include "toml.h"
+#include "pcap_io.h"
#include "marsio_io.h"
-#include "dumpfile_io.h"
+#include "log_private.h"
+
+#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet_io", format, ##__VA_ARGS__)
+#define PACKET_IO_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet_io", format, ##__VA_ARGS__)
struct packet_io
{
- enum packet_io_mode mode;
- struct marsio_io *marsio;
- struct dumpfile_io *dumpfile;
+ void *handle;
+
+ void *(*new_func)(const struct packet_io_config *cfg);
+ void (*free_func)(void *handle);
+ int (*isbreak_func)(void *handle);
+
+ int (*init_func)(void *handle, uint16_t thr_idx);
+ uint16_t (*input_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+ void (*output_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+ void (*drop_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+ uint16_t (*inject_func)(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+ void (*yield_func)(void *handle, uint16_t thr_idx);
+ struct packet_io_stat *(*stat_func)(void *handle, uint16_t thr_idx);
};
-struct packet_io *packet_io_new(struct packet_io_options *opts)
+int packet_io_config_load(struct packet_io_config *cfg, const char *toml_file)
{
- struct packet_io *packet_io = (struct packet_io *)calloc(1, sizeof(struct packet_io));
- if (packet_io == NULL)
+ int ret = -1;
+ const char *ptr;
+ char *ptr_mode = NULL;
+ char *ptr_dumpfile_path = NULL;
+ char *ptr_app_symbol = NULL;
+ char *ptr_dev_symbol = NULL;
+ char errbuf[200];
+ FILE *fp = NULL;
+ toml_table_t *root = NULL;
+ toml_table_t *table = NULL;
+ toml_array_t *mask;
+
+ fp = fopen(toml_file, "r");
+ if (fp == NULL)
{
- return NULL;
+ PACKET_IO_LOG_ERROR("config file %s open failed, %s", toml_file, strerror(errno));
+ goto error_out;
}
- packet_io->mode = opts->mode;
- if (opts->mode == PACKET_IO_MARSIO)
+ root = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ if (root == NULL)
{
- packet_io->marsio = marsio_io_new(opts->app_symbol, opts->dev_symbol, opts->cpu_mask, opts->nr_worker_thread);
+ PACKET_IO_LOG_ERROR("config file %s parse failed, %s", toml_file, errbuf);
+ goto error_out;
}
- else
+
+ table = toml_table_in(root, "packet_io");
+ if (table == NULL)
{
- packet_io->dumpfile = dumpfile_io_new(opts->dumpfile_path, packet_io->mode, opts->nr_worker_thread);
+ PACKET_IO_LOG_ERROR("config file %s missing packet_io", toml_file);
+ goto error_out;
}
- if (packet_io->marsio == NULL && packet_io->dumpfile == NULL)
+
+ ptr = toml_raw_in(table, "mode");
+ if (ptr == NULL || toml_rtos(ptr, &ptr_mode) != 0)
{
+ PACKET_IO_LOG_ERROR("config file missing packet_io.mode");
+ goto error_out;
+ }
+ if (strcmp(ptr_mode, "pcapfile") == 0)
+ {
+ cfg->mode = PACKET_IO_PCAPFILE;
+ }
+ else if (strcmp(ptr_mode, "pcaplist") == 0)
+ {
+ cfg->mode = PACKET_IO_PCAPLIST;
+ }
+ else if (strcmp(ptr_mode, "marsio") == 0)
+ {
+ cfg->mode = PACKET_IO_MARSIO;
+ }
+ else
+ {
+ PACKET_IO_LOG_ERROR("config file invalid packet_io.mode %s", ptr);
goto error_out;
}
- return packet_io;
-
-error_out:
- packet_io_free(packet_io);
- return NULL;
-}
-
-void packet_io_free(struct packet_io *packet_io)
-{
- if (packet_io)
+ if (cfg->mode == PACKET_IO_PCAPFILE || cfg->mode == PACKET_IO_PCAPLIST)
{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ ptr = toml_raw_in(table, "pcap_path");
+ if (ptr == NULL || toml_rtos(ptr, &ptr_dumpfile_path) != 0)
{
- marsio_io_free(packet_io->marsio);
+ PACKET_IO_LOG_ERROR("config file missing packet_io.pcap_path");
+ goto error_out;
}
- else
+ strcpy(cfg->pcap_path, ptr_dumpfile_path);
+ }
+ else
+ {
+ ptr = toml_raw_in(table, "app_symbol");
+ if (ptr == NULL || toml_rtos(ptr, &ptr_app_symbol) != 0)
{
- dumpfile_io_free(packet_io->dumpfile);
+ PACKET_IO_LOG_ERROR("config file missing packet_io.app_symbol");
+ goto error_out;
}
- free(packet_io);
- packet_io = NULL;
+ strcpy(cfg->app_symbol, ptr_app_symbol);
+
+ ptr = toml_raw_in(table, "dev_symbol");
+ if (ptr == NULL || toml_rtos(ptr, &ptr_dev_symbol) != 0)
+ {
+ PACKET_IO_LOG_ERROR("config file missing packet_io.dev_symbol");
+ goto error_out;
+ }
+ strcpy(cfg->dev_symbol, ptr_dev_symbol);
}
-}
-int packet_io_isbreak(struct packet_io *packet_io) // used for dumpfile mode
-{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ ptr = toml_raw_in(table, "nr_worker_thread");
+ if (ptr == NULL)
{
- return 0;
+ PACKET_IO_LOG_ERROR("config file missing packet_io.nr_worker_thread");
+ goto error_out;
}
- else
+ cfg->nr_worker_thread = atoi(ptr);
+ if (cfg->nr_worker_thread == 0 || cfg->nr_worker_thread > MAX_THREAD_NUM)
{
- return dumpfile_io_isbreak(packet_io->dumpfile);
+ PACKET_IO_LOG_ERROR("config file invalid packet_io.nr_worker_thread %d, range [1, %d]", cfg->nr_worker_thread, MAX_THREAD_NUM);
+ goto error_out;
}
-}
-int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx)
-{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ mask = toml_array_in(table, "cpu_mask");
+ if (mask == NULL)
{
- return marsio_io_init(packet_io->marsio, thr_idx);
+ PACKET_IO_LOG_ERROR("config file missing packet_io.cpu_mask");
+ goto error_out;
}
- else
+ for (uint16_t i = 0; i < cfg->nr_worker_thread; i++)
{
- return dumpfile_io_init(packet_io->dumpfile, thr_idx);
+ ptr = toml_raw_at(mask, i);
+ if (ptr == NULL)
+ {
+ PACKET_IO_LOG_ERROR("config file missing packet_io.cpu_mask[%d]", i);
+ goto error_out;
+ }
+ cfg->cpu_mask[i] = atoi(ptr);
}
-}
-uint16_t packet_io_input(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
-{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ ptr = toml_raw_in(table, "idle_yield_interval_ms");
+ if (ptr == NULL)
{
- return marsio_io_input(packet_io->marsio, thr_idx, pkts, nr_pkts);
+ PACKET_IO_LOG_ERROR("config file missing packet_io.idle_yield_interval_ms");
+ goto error_out;
}
- else
+ cfg->idle_yield_interval_ms = atoll(ptr);
+ if (cfg->idle_yield_interval_ms > 60000)
{
- return dumpfile_io_input(packet_io->dumpfile, thr_idx, pkts, nr_pkts);
+ PACKET_IO_LOG_ERROR("config file invalid packet_io.idle_yield_interval_ms %d, range [0, %d]", cfg->idle_yield_interval_ms, 60000);
+ goto error_out;
}
-}
-void packet_io_output(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
-{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ ret = 0;
+error_out:
+ if (ptr_mode)
{
- marsio_io_output(packet_io->marsio, thr_idx, pkts, nr_pkts);
+ free(ptr_mode);
}
- else
+ if (ptr_dumpfile_path)
+ {
+ free(ptr_dumpfile_path);
+ }
+ if (ptr_app_symbol)
+ {
+ free(ptr_app_symbol);
+ }
+ if (ptr_dev_symbol)
+ {
+ free(ptr_dev_symbol);
+ }
+ if (root)
+ {
+ toml_free(root);
+ }
+ if (fp)
{
- dumpfile_io_output(packet_io->dumpfile, thr_idx, pkts, nr_pkts);
+ fclose(fp);
}
+
+ return ret;
}
-void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+struct packet_io_config *packet_io_config_new(const char *toml_file)
{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ if (toml_file == NULL)
{
- marsio_io_drop(packet_io->marsio, thr_idx, pkts, nr_pkts);
+ return NULL;
}
- else
+
+ struct packet_io_config *cfg = (struct packet_io_config *)calloc(1, sizeof(struct packet_io_config));
+ if (cfg == NULL)
{
- dumpfile_io_drop(packet_io->dumpfile, thr_idx, pkts, nr_pkts);
+ return NULL;
}
+
+ if (packet_io_config_load(cfg, toml_file) == -1)
+ {
+ packet_io_config_free(cfg);
+ return NULL;
+ }
+
+ return cfg;
}
-uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+void packet_io_config_free(struct packet_io_config *cfg)
{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ if (cfg)
{
- return marsio_io_inject(packet_io->marsio, thr_idx, pkts, nr_pkts);
+ free(cfg);
+ cfg = NULL;
}
- else
+}
+
+void packet_io_config_print(const struct packet_io_config *cfg)
+{
+ if (cfg)
{
- return dumpfile_io_inject(packet_io->dumpfile, thr_idx, pkts, nr_pkts);
+ PACKET_IO_LOG_INFO("packet_io.mode : %s", cfg->mode == PACKET_IO_PCAPFILE ? "pcapfile" : (cfg->mode == PACKET_IO_PCAPLIST ? "pcaplist" : "marsio"));
+ if (cfg->mode == PACKET_IO_PCAPFILE || cfg->mode == PACKET_IO_PCAPLIST)
+ {
+ PACKET_IO_LOG_INFO("packet_io.pcap_path : %s", cfg->pcap_path);
+ }
+ else
+ {
+ PACKET_IO_LOG_INFO("packet_io.app_symbol : %s", cfg->app_symbol);
+ PACKET_IO_LOG_INFO("packet_io.dev_symbol : %s", cfg->dev_symbol);
+ }
+ PACKET_IO_LOG_INFO("packet_io.nr_worker_thread : %d", cfg->nr_worker_thread);
+ for (uint16_t i = 0; i < cfg->nr_worker_thread; i++)
+ {
+ PACKET_IO_LOG_INFO("packet_io.cpu_mask[%03d] : %d", i, cfg->cpu_mask[i]);
+ }
+ PACKET_IO_LOG_INFO("packet_io.idle_yield_interval_ms : %lu", cfg->idle_yield_interval_ms);
}
}
-void packet_io_yield(struct packet_io *packet_io, uint16_t thr_idx, uint64_t timeout_ms)
+struct packet_io *packet_io_new(const struct packet_io_config *cfg)
{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ struct packet_io *pkt_io = (struct packet_io *)calloc(1, sizeof(struct packet_io));
+ if (pkt_io == NULL)
{
- marsio_io_yield(packet_io->marsio, thr_idx, timeout_ms);
+ return NULL;
+ }
+
+ if (cfg->mode == PACKET_IO_MARSIO)
+ {
+ pkt_io->new_func = marsio_io_new;
+ pkt_io->free_func = marsio_io_free;
+ pkt_io->isbreak_func = marsio_io_isbreak;
+ pkt_io->init_func = marsio_io_init;
+ pkt_io->input_func = marsio_io_input;
+ pkt_io->output_func = marsio_io_output;
+ pkt_io->drop_func = marsio_io_drop;
+ pkt_io->inject_func = marsio_io_inject;
+ pkt_io->yield_func = marsio_io_yield;
+ pkt_io->stat_func = marsio_io_stat;
}
else
{
- dumpfile_io_yield(packet_io->dumpfile, thr_idx, timeout_ms);
+ pkt_io->new_func = pcap_io_new;
+ pkt_io->free_func = pcap_io_free;
+ pkt_io->isbreak_func = pcap_io_isbreak;
+ pkt_io->init_func = pcap_io_init;
+ pkt_io->input_func = pcap_io_input;
+ pkt_io->output_func = pcap_io_output;
+ pkt_io->drop_func = pcap_io_drop;
+ pkt_io->inject_func = pcap_io_inject;
+ pkt_io->yield_func = pcap_io_yield;
+ pkt_io->stat_func = pcap_io_stat;
}
-}
-struct packet_io_stat *packet_io_stat(struct packet_io *packet_io, uint16_t thr_idx)
-{
- if (likely(packet_io->mode == PACKET_IO_MARSIO))
+ pkt_io->handle = pkt_io->new_func(cfg);
+ if (pkt_io->handle == NULL)
{
- return marsio_io_stat(packet_io->marsio, thr_idx);
+ packet_io_free(pkt_io);
+ return NULL;
}
- else
+
+ return pkt_io;
+}
+
+void packet_io_free(struct packet_io *pkt_io)
+{
+ if (pkt_io)
{
- return dumpfile_io_stat(packet_io->dumpfile, thr_idx);
+ if (pkt_io->handle)
+ {
+ pkt_io->free_func(pkt_io->handle);
+ }
+ free(pkt_io);
+ pkt_io = NULL;
}
}
+
+int packet_io_isbreak(struct packet_io *pkt_io)
+{
+ return pkt_io->isbreak_func(pkt_io->handle);
+}
+
+int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx)
+{
+ return pkt_io->init_func(pkt_io->handle, thr_idx);
+}
+
+uint16_t packet_io_input(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+{
+ return pkt_io->input_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
+}
+
+void packet_io_output(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+{
+ pkt_io->output_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
+}
+
+void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+{
+ pkt_io->drop_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
+}
+
+uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+{
+ return pkt_io->inject_func(pkt_io->handle, thr_idx, pkts, nr_pkts);
+}
+
+void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx)
+{
+ pkt_io->yield_func(pkt_io->handle, thr_idx);
+}
+
+struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx)
+{
+ return pkt_io->stat_func(pkt_io->handle, thr_idx);
+}
diff --git a/infra/packet_io/packet_io.h b/infra/packet_io/packet_io.h
index 733b6c3..985ab39 100644
--- a/infra/packet_io/packet_io.h
+++ b/infra/packet_io/packet_io.h
@@ -48,39 +48,39 @@ struct __attribute__((aligned(64))) packet_io_stat
enum packet_io_mode
{
- PACKET_IO_DUMPFILE = 0,
- PACKET_IO_DUMPFILELIST = 1,
+ PACKET_IO_PCAPFILE = 0,
+ PACKET_IO_PCAPLIST = 1,
PACKET_IO_MARSIO = 2,
};
-struct packet_io_options
+struct packet_io_config
{
enum packet_io_mode mode;
-
- // for dumpfile
- char dumpfile_path[PATH_MAX];
-
- // for marsio
+ char pcap_path[PATH_MAX];
char app_symbol[64];
char dev_symbol[64];
-
- uint16_t nr_worker_thread;
+ uint16_t nr_worker_thread; // range [1, MAX_THREAD_NUM]
uint16_t cpu_mask[MAX_THREAD_NUM];
+ uint64_t idle_yield_interval_ms; // range: [0, 6000] (ms)
};
+struct packet_io_config *packet_io_config_new(const char *toml_file);
+void packet_io_config_free(struct packet_io_config *cfg);
+void packet_io_config_print(const struct packet_io_config *cfg);
+
struct packet;
struct packet_io;
-struct packet_io *packet_io_new(struct packet_io_options *opts);
-void packet_io_free(struct packet_io *packet_io);
-int packet_io_isbreak(struct packet_io *packet_io); // used for dumpfile mode
-
-int packet_io_init(struct packet_io *packet_io, uint16_t thr_idx);
-uint16_t packet_io_input(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void packet_io_output(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void packet_io_drop(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-uint16_t packet_io_inject(struct packet_io *packet_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
-void packet_io_yield(struct packet_io *packet_io, uint16_t thr_idx, uint64_t timeout_ms);
-struct packet_io_stat *packet_io_stat(struct packet_io *packet_io, uint16_t thr_idx);
+struct packet_io *packet_io_new(const struct packet_io_config *cfg);
+void packet_io_free(struct packet_io *pkt_io);
+int packet_io_isbreak(struct packet_io *pkt_io);
+
+int packet_io_init(struct packet_io *pkt_io, uint16_t thr_idx);
+uint16_t packet_io_input(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void packet_io_output(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void packet_io_drop(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+uint16_t packet_io_inject(struct packet_io *pkt_io, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void packet_io_yield(struct packet_io *pkt_io, uint16_t thr_idx);
+struct packet_io_stat *packet_io_stat(struct packet_io *pkt_io, uint16_t thr_idx);
#ifdef __cplusplus
}
diff --git a/infra/packet_io/dumpfile_io.c b/infra/packet_io/pcap_io.c
index ed8fb58..1bf37a0 100644
--- a/infra/packet_io/dumpfile_io.c
+++ b/infra/packet_io/pcap_io.c
@@ -12,21 +12,19 @@
#include "tuple.h"
#include "utils.h"
#include "log_private.h"
-#include "dumpfile_io.h"
+#include "pcap_io.h"
#include "packet_dump.h"
#include "packet_parser.h"
#include "packet_private.h"
-#define PACKET_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__)
-#define PACKET_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "dumpfile", format, ##__VA_ARGS__)
+#define PCAP_IO_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "pcap io", format, ##__VA_ARGS__)
+#define PCAP_IO_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "pcap io", format, ##__VA_ARGS__)
#define MAX_PACKET_QUEUE_SIZE (4096 * 1000)
-struct dumpfile_io
+struct pcap_io
{
- enum packet_io_mode mode;
- uint16_t nr_worker_thread;
- char dumpfile_path[256];
+ struct packet_io_config cfg;
pcap_t *pcap;
struct logger *logger;
@@ -64,14 +62,14 @@ static struct packet_queue *packet_queue_new(uint32_t size)
struct packet_queue *queue = (struct packet_queue *)calloc(1, sizeof(struct packet_queue));
if (queue == NULL)
{
- PACKET_IO_LOG_ERROR("unable to new packet queue");
+ PCAP_IO_LOG_ERROR("unable to new packet queue");
return NULL;
}
queue->queue = (uint64_t *)calloc(size, sizeof(uint64_t));
if (queue->queue == NULL)
{
- PACKET_IO_LOG_ERROR("unable to new packet queue");
+ PCAP_IO_LOG_ERROR("unable to new packet queue");
free(queue);
return NULL;
}
@@ -103,7 +101,7 @@ static int packet_queue_push(struct packet_queue *queue, void *data)
{
if (__sync_val_compare_and_swap(&queue->queue[queue->tail], 0, data) != 0)
{
- PACKET_IO_LOG_ERROR("packet queue is full, retry later");
+ PCAP_IO_LOG_ERROR("packet queue is full, retry later");
return -1;
}
@@ -130,13 +128,13 @@ static void packet_queue_pop(struct packet_queue *queue, void **data)
static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_char *bytes)
{
- struct dumpfile_io *handle = (struct dumpfile_io *)user;
+ struct pcap_io *handle = (struct pcap_io *)user;
// copy packet data to new memory
struct pcap_pkt *pcap_pkt = (struct pcap_pkt *)calloc(1, sizeof(struct pcap_pkt) + h->caplen);
if (pcap_pkt == NULL)
{
- PACKET_IO_LOG_ERROR("unable to alloc packet");
+ PCAP_IO_LOG_ERROR("unable to alloc packet");
return;
}
pcap_pkt->data = (char *)pcap_pkt + sizeof(struct pcap_pkt);
@@ -152,13 +150,13 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_
uint64_t hash = packet_ldbc_hash(&pkt, PKT_LDBC_METH_OUTERMOST_INT_EXT_IP, PACKET_DIRECTION_OUTGOING);
// push packet to queue
- struct packet_queue *queue = handle->queue[hash % handle->nr_worker_thread];
+ struct packet_queue *queue = handle->queue[hash % handle->cfg.nr_worker_thread];
while (packet_queue_push(queue, pcap_pkt) == -1)
{
if (ATOMIC_READ(&handle->io_thread_need_exit))
{
free(pcap_pkt);
- PACKET_IO_LOG_FATAL("dumpfile io thread need exit");
+ PCAP_IO_LOG_FATAL("pcap io thread need exit");
pcap_breakloop(handle->pcap);
break;
}
@@ -167,39 +165,39 @@ static void pcap_pkt_handler(u_char *user, const struct pcap_pkthdr *h, const u_
if (ATOMIC_READ(&handle->io_thread_need_exit))
{
- PACKET_IO_LOG_FATAL("dumpfile io thread need exit");
+ PCAP_IO_LOG_FATAL("pcap io thread need exit");
pcap_breakloop(handle->pcap);
}
}
-static int dumpfile_handler(struct dumpfile_io *handle, const char *pcap_file)
+static int pcap_io_handler(struct pcap_io *handle, const char *pcap_file)
{
char resolved_path[256];
char pcap_errbuf[PCAP_ERRBUF_SIZE];
realpath(pcap_file, resolved_path);
- PACKET_IO_LOG_FATAL("dumpfile %s in-processing", resolved_path)
+ PCAP_IO_LOG_FATAL("pcap %s in-processing", resolved_path)
handle->pcap = pcap_open_offline(resolved_path, pcap_errbuf);
if (handle->pcap == NULL)
{
- PACKET_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
+ PCAP_IO_LOG_ERROR("unable to open pcap file: %s, %s", resolved_path, pcap_errbuf);
return -1;
}
handle->read_pcap_files++;
pcap_loop(handle->pcap, -1, pcap_pkt_handler, (u_char *)handle);
pcap_close(handle->pcap);
- PACKET_IO_LOG_FATAL("dumpfile %s processed", resolved_path)
+ PCAP_IO_LOG_FATAL("pcap %s processed", resolved_path)
return 0;
}
-static int all_packet_consumed(struct dumpfile_io *handle)
+static int all_packet_consumed(struct pcap_io *handle)
{
uint64_t consumed_pkts = 0;
uint64_t read_pcap_pkts = ATOMIC_READ(&handle->read_pcap_pkts);
- for (uint16_t i = 0; i < handle->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++)
{
consumed_pkts += ATOMIC_READ(&handle->stat[i].pkts_rx);
}
@@ -213,32 +211,32 @@ static int all_packet_consumed(struct dumpfile_io *handle)
}
}
-static void *dumpfile_thread(void *arg)
+static void *pcap_io_thread(void *arg)
{
- struct dumpfile_io *handle = (struct dumpfile_io *)arg;
+ struct pcap_io *handle = (struct pcap_io *)arg;
__thread_local_logger = handle->logger;
ATOMIC_SET(&handle->io_thread_is_runing, 1);
- PACKET_IO_LOG_FATAL("dumpfile io thread is running");
+ PCAP_IO_LOG_FATAL("pcap io thread is running");
- if (handle->mode == PACKET_IO_DUMPFILE)
+ if (handle->cfg.mode == PACKET_IO_PCAPFILE)
{
- dumpfile_handler(handle, handle->dumpfile_path);
+ pcap_io_handler(handle, handle->cfg.pcap_path);
}
- else // PACKET_IO_DUMPFILELIST
+ else // PACKET_IO_PCAPLIST
{
FILE *fp = NULL;
- if (strcmp(handle->dumpfile_path, "-") == 0)
+ if (strcmp(handle->cfg.pcap_path, "-") == 0)
{
- PACKET_IO_LOG_ERROR("dumpfile list is empty, read from stdin");
+ PCAP_IO_LOG_ERROR("pcap path is empty, read from stdin");
fp = stdin;
}
else
{
- fp = fopen(handle->dumpfile_path, "r");
+ fp = fopen(handle->cfg.pcap_path, "r");
if (fp == NULL)
{
- PACKET_IO_LOG_ERROR("unable to open dumpfile list: %s", handle->dumpfile_path);
+ PCAP_IO_LOG_ERROR("unable to open pcap path: %s", handle->cfg.pcap_path);
goto erro_out;
}
}
@@ -257,14 +255,14 @@ static void *dumpfile_thread(void *arg)
*pos = '\0';
}
- dumpfile_handler(handle, line);
+ pcap_io_handler(handle, line);
}
if (fp != stdin)
{
fclose(fp);
}
}
- PACKET_IO_LOG_FATAL("dumpfile io thread read all pcap files");
+ PCAP_IO_LOG_FATAL("pcap io thread read all pcap files");
erro_out:
while (ATOMIC_READ(&handle->io_thread_need_exit) == 0)
@@ -277,7 +275,7 @@ erro_out:
usleep(1000); // 1ms
}
- PACKET_IO_LOG_FATAL("dumpfile io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts));
+ PCAP_IO_LOG_FATAL("pcap io thread exit (read_pcap_files: %lu, read_pcap_pkts: %lu)", handle->read_pcap_files, ATOMIC_READ(&handle->read_pcap_pkts));
ATOMIC_SET(&handle->io_thread_is_runing, 0);
return NULL;
@@ -287,60 +285,59 @@ erro_out:
* Public API
******************************************************************************/
-struct dumpfile_io *dumpfile_io_new(const char *dumpfile_path, enum packet_io_mode mode, uint16_t nr_worker_thread)
+void *pcap_io_new(const struct packet_io_config *cfg)
{
pthread_t tid;
- struct dumpfile_io *handle = (struct dumpfile_io *)calloc(1, sizeof(struct dumpfile_io));
+ struct pcap_io *handle = (struct pcap_io *)calloc(1, sizeof(struct pcap_io));
if (handle == NULL)
{
- PACKET_IO_LOG_ERROR("unable to allocate memory for dumpfile_io");
+ PCAP_IO_LOG_ERROR("unable to allocate memory for pcap_io");
return NULL;
}
- handle->mode = mode;
- handle->nr_worker_thread = nr_worker_thread;
handle->logger = __thread_local_logger;
- strncpy(handle->dumpfile_path, dumpfile_path, MIN(strlen(dumpfile_path), sizeof(handle->dumpfile_path)));
+ memcpy(&handle->cfg, cfg, sizeof(struct packet_io_config));
- for (uint16_t i = 0; i < handle->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < handle->cfg.nr_worker_thread; i++)
{
handle->queue[i] = packet_queue_new(MAX_PACKET_QUEUE_SIZE);
if (handle->queue[i] == NULL)
{
- PACKET_IO_LOG_ERROR("unable to create packet queue");
+ PCAP_IO_LOG_ERROR("unable to create packet queue");
goto error_out;
}
}
- if (pthread_create(&tid, NULL, dumpfile_thread, (void *)handle) != 0)
+ if (pthread_create(&tid, NULL, pcap_io_thread, (void *)handle) != 0)
{
- PACKET_IO_LOG_ERROR("unable to create packet io thread");
+ PCAP_IO_LOG_ERROR("unable to create pcap io thread");
goto error_out;
}
return handle;
error_out:
- dumpfile_io_free(handle);
+ pcap_io_free(handle);
return NULL;
}
-void dumpfile_io_free(struct dumpfile_io *handle)
+void pcap_io_free(void *handle)
{
- if (handle)
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+ if (pcap_io)
{
- ATOMIC_SET(&handle->io_thread_need_exit, 1);
+ ATOMIC_SET(&pcap_io->io_thread_need_exit, 1);
- while (ATOMIC_READ(&handle->io_thread_is_runing))
+ while (ATOMIC_READ(&pcap_io->io_thread_is_runing))
{
usleep(1000);
}
struct pcap_pkt *pcap_pkt = NULL;
- for (uint16_t i = 0; i < handle->nr_worker_thread; i++)
+ for (uint16_t i = 0; i < pcap_io->cfg.nr_worker_thread; i++)
{
while (1)
{
- packet_queue_pop(handle->queue[i], (void **)&pcap_pkt);
+ packet_queue_pop(pcap_io->queue[i], (void **)&pcap_pkt);
if (pcap_pkt)
{
free(pcap_pkt);
@@ -351,30 +348,33 @@ void dumpfile_io_free(struct dumpfile_io *handle)
}
}
- packet_queue_free(handle->queue[i]);
+ packet_queue_free(pcap_io->queue[i]);
}
- free(handle);
- handle = NULL;
+ free(pcap_io);
+ pcap_io = NULL;
}
}
-int dumpfile_io_isbreak(struct dumpfile_io *handle)
+int pcap_io_isbreak(void *handle)
{
- return ATOMIC_READ(&handle->io_thread_wait_exit);
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+
+ return ATOMIC_READ(&pcap_io->io_thread_wait_exit);
}
-int dumpfile_io_init(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)))
+int pcap_io_init(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)))
{
return 0;
}
-uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+uint16_t pcap_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
- struct packet_queue *queue = handle->queue[thr_idx];
- struct packet_io_stat *stat = &handle->stat[thr_idx];
+ uint16_t nr_packet_parsed = 0;
+ struct packet *pkt = NULL;
struct pcap_pkt *pcap_pkt = NULL;
- struct packet *pkt;
- uint16_t nr_parsed = 0;
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+ struct packet_queue *queue = pcap_io->queue[thr_idx];
+ struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
@@ -391,24 +391,25 @@ uint16_t dumpfile_io_input(struct dumpfile_io *handle, uint16_t thr_idx, struct
stat->raw_pkts_rx++;
stat->raw_bytes_rx += pcap_pkt->len;
- pkt = &pkts[nr_parsed];
+ pkt = &pkts[nr_packet_parsed];
packet_parse(pkt, pcap_pkt->data, pcap_pkt->len);
memset(&pkt->meta, 0, sizeof(pkt->meta));
packet_set_origin_ctx(pkt, pcap_pkt);
packet_set_action(pkt, PACKET_ACTION_FORWARD);
packet_set_timeval(pkt, &pcap_pkt->ts);
- nr_parsed++;
+ nr_packet_parsed++;
}
}
- return nr_parsed;
+ return nr_packet_parsed;
}
-void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+void pcap_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
int len;
struct packet *pkt = NULL;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+ struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
@@ -430,10 +431,11 @@ void dumpfile_io_output(struct dumpfile_io *handle, uint16_t thr_idx, struct pac
}
}
-void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
struct packet *pkt = NULL;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+ struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
for (uint16_t i = 0; i < nr_pkts; i++)
{
@@ -449,14 +451,15 @@ void dumpfile_io_drop(struct dumpfile_io *handle, uint16_t thr_idx, struct packe
}
}
-uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
+uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts)
{
uint16_t len;
- struct packet *pkt = NULL;
- struct packet_io_stat *stat = &handle->stat[thr_idx];
struct tuple6 tuple;
+ struct packet *pkt = NULL;
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+ struct packet_io_stat *stat = &pcap_io->stat[thr_idx];
- char file[1024] = {0};
+ char file[PATH_MAX] = {0};
char src_addr[INET6_ADDRSTRLEN] = {0};
char dst_addr[INET6_ADDRSTRLEN] = {0};
@@ -491,11 +494,11 @@ uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct
if (packet_dump_pcap(pkt, file) == -1)
{
- PACKET_IO_LOG_ERROR("unable to dump pcap file: %s", file);
+ PCAP_IO_LOG_ERROR("unable to dump pcap file: %s", file);
}
else
{
- PACKET_IO_LOG_FATAL("dump inject packet: %s", file);
+ PCAP_IO_LOG_FATAL("dump inject packet: %s", file);
}
packet_free(pkt);
}
@@ -503,12 +506,14 @@ uint16_t dumpfile_io_inject(struct dumpfile_io *handle, uint16_t thr_idx, struct
return nr_pkts;
}
-void dumpfile_io_yield(struct dumpfile_io *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)), uint64_t timeout_ms __attribute__((unused)))
+void pcap_io_yield(void *handle __attribute__((unused)), uint16_t thr_idx __attribute__((unused)))
{
return;
}
-struct packet_io_stat *dumpfile_io_stat(struct dumpfile_io *handle, uint16_t thr_idx)
+struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx)
{
- return &handle->stat[thr_idx];
+ struct pcap_io *pcap_io = (struct pcap_io *)handle;
+
+ return &pcap_io->stat[thr_idx];
} \ No newline at end of file
diff --git a/infra/packet_io/pcap_io.h b/infra/packet_io/pcap_io.h
new file mode 100644
index 0000000..7c690c4
--- /dev/null
+++ b/infra/packet_io/pcap_io.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "packet_io.h"
+
+void *pcap_io_new(const struct packet_io_config *cfg);
+void pcap_io_free(void *handle);
+int pcap_io_isbreak(void *handle);
+
+int pcap_io_init(void *handle, uint16_t thr_idx);
+uint16_t pcap_io_input(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void pcap_io_output(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void pcap_io_drop(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+uint16_t pcap_io_inject(void *handle, uint16_t thr_idx, struct packet *pkts, uint16_t nr_pkts);
+void pcap_io_yield(void *handle, uint16_t thr_idx);
+struct packet_io_stat *pcap_io_stat(void *handle, uint16_t thr_idx);
+
+#ifdef __cplusplus
+}
+#endif
diff --git a/test/decoders/http/test_based_on_stellar/env/stellar.toml b/test/decoders/http/test_based_on_stellar/env/stellar.toml
index 210ce3d..b472a69 100644
--- a/test/decoders/http/test_based_on_stellar/env/stellar.toml
+++ b/test/decoders/http/test_based_on_stellar/env/stellar.toml
@@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[packet_io]
-mode = "dumpfile" # dumpfile, dumpfilelist, marsio
+mode = "pcapfile" # pcapfile, pcaplist, marsio
app_symbol = "stellar"
dev_symbol = "nf_0_fw"
-
-dumpfile_path = "./pcap/test.pcap"
-#dumpfile_path = "/tmp/dumpfile/dumpfilelist"
-
-nr_worker_thread = 1 # [1, 256]
+pcap_path = "./pcap/test.pcap"
+nr_worker_thread = 1 # range: [1, 256]
cpu_mask = [5, 6, 7, 8, 9, 10, 11, 12]
+idle_yield_interval_ms = 900 # range: [0, 60000] (ms)
[ip_reassembly]
enable = 1
@@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 256 # range: [2, 4096]
[schedule]
merge_stat_interval = 50 # range: [1, 60000] (ms)
output_stat_interval = 10 # range: [1, 60000] (ms)
-
-packet_io_yield_interval = 90 # range: [1, 60000] (ms)
diff --git a/test/lpi_plugin/CMakeLists.txt b/test/lpi_plugin/CMakeLists.txt
index 116ad8f..813451f 100644
--- a/test/lpi_plugin/CMakeLists.txt
+++ b/test/lpi_plugin/CMakeLists.txt
@@ -23,8 +23,8 @@ add_test(NAME ${TEST_NAME}.SETUP COMMAND sh -c "
cp ${CMAKE_CURRENT_SOURCE_DIR}/test_config/spec.toml ${CMAKE_CURRENT_BINARY_DIR}/plugin/ &&
cp ${CMAKE_SOURCE_DIR}/conf/log.toml ${CMAKE_CURRENT_BINARY_DIR}/conf/ &&
cp ${CMAKE_CURRENT_SOURCE_DIR}/test_config/tsg_l7_protocol.conf ${CMAKE_CURRENT_BINARY_DIR}/tsgconf/ &&
- tomlq -t -i '.packet_io.dumpfile_path=\"-\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml &&
- tomlq -t -i '.packet_io.mode=\"dumpfilelist\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml
+ tomlq -t -i '.packet_io.pcap_path=\"-\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml &&
+ tomlq -t -i '.packet_io.mode=\"pcaplist\"' ${CMAKE_CURRENT_BINARY_DIR}/conf/stellar.toml
")
diff --git a/test/packet_inject/conf/stellar.toml b/test/packet_inject/conf/stellar.toml
index d039d0d..4791358 100644
--- a/test/packet_inject/conf/stellar.toml
+++ b/test/packet_inject/conf/stellar.toml
@@ -3,15 +3,13 @@ snowflake_base = 1 # [0, 31]
snowflake_offset = 2 # [0, 127]
[packet_io]
-mode = "dumpfile" # dumpfile, dumpfilelist, marsio
+mode = "pcapfile" # pcapfile, pcaplist, marsio
app_symbol = "stellar"
dev_symbol = "nf_0_fw"
-
-dumpfile_path = "/tmp/dumpfile/dumpfile.pcap"
-#dumpfile_path = "/tmp/dumpfile/dumpfilelist"
-
-nr_worker_thread = 1 # [1, 256]
+pcap_path = "/tmp/test.pcap"
+nr_worker_thread = 1 # range: [1, 256]
cpu_mask = [5]
+idle_yield_interval_ms = 900 # range: [0, 60000] (ms)
[ip_reassembly]
enable = 1
@@ -67,5 +65,3 @@ tcp_reassembly_max_segments = 128 # range: [2, 4096]
[schedule]
merge_stat_interval = 50 # range: [1, 60000] (ms)
output_stat_interval = 2000 # range: [1, 60000] (ms)
-
-packet_io_yield_interval = 900 # range: [1, 60000] (ms)
diff --git a/test/packet_inject/packet_inject_test.h b/test/packet_inject/packet_inject_test.h
index 3728840..dc8bf79 100644
--- a/test/packet_inject/packet_inject_test.h
+++ b/test/packet_inject/packet_inject_test.h
@@ -145,8 +145,8 @@ static inline void expect_cmp_inject(const char *expect_pcap_file, const char *i
static inline void packet_inject_test(struct packet_inject_case *test)
{
// create directory
- char dumpfile_path[PATH_MAX] = {0};
- snprintf(dumpfile_path, sizeof(dumpfile_path), "%s/input/%s", test->work_dir, test->input_pcap);
+ char pcap_path[PATH_MAX] = {0};
+ snprintf(pcap_path, sizeof(pcap_path), "%s/input/%s", test->work_dir, test->input_pcap);
system_cmd("rm -rf %s", test->work_dir);
system_cmd("mkdir -p %s/input/", test->work_dir);
system_cmd("mkdir -p %s/log/", test->work_dir);
@@ -173,9 +173,9 @@ static inline void packet_inject_test(struct packet_inject_case *test)
char temp[PATH_MAX * 2] = {0};
getcwd(cwd, sizeof(cwd));
chdir(test->work_dir);
- snprintf(temp, sizeof(temp), "dumpfile_path = \"%s\"", dumpfile_path);
- EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = dumpfile") == 0);
- EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "dumpfile_path = \"/tmp/dumpfile/dumpfile.pcap\"", temp) == 0);
+ snprintf(temp, sizeof(temp), "pcap_path = \"%s\"", pcap_path);
+ EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "mode = marsio", "mode = pcapfile") == 0);
+ EXPECT_TRUE(replace_file_string("./conf/stellar.toml", "pcap_path = \"/tmp/test.pcap\"", temp) == 0);
const char *stellar_cfg_file = "./conf/stellar.toml";
const char *plugin_cfg_file = "./plugin/spec.toml";