diff options
| author | luwenpeng <[email protected]> | 2024-09-13 18:03:05 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-09-13 18:07:40 +0800 |
| commit | e53aeff8912fd124995313c90783623394f3b3da (patch) | |
| tree | 970db6781f219827641ca0b31625a6b91eebd40a /infra/packet_manager | |
| parent | 173a6ced619ebd93d7e52e455936b02c91d8e8b6 (diff) | |
feature: add packet_manager API implementation
Diffstat (limited to 'infra/packet_manager')
| -rw-r--r-- | infra/packet_manager/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 382 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_private.h | 19 | ||||
| -rw-r--r-- | infra/packet_manager/packet_private.h | 15 | ||||
| -rw-r--r-- | infra/packet_manager/packet_utils.c | 14 |
5 files changed, 427 insertions, 6 deletions
diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt index 5a3949f..f764a51 100644 --- a/infra/packet_manager/CMakeLists.txt +++ b/infra/packet_manager/CMakeLists.txt @@ -1,9 +1,10 @@ add_library(packet_manager - packet_utils.c + packet_manager.c packet_parser.c packet_builder.c packet_filter.c packet_dump.c + packet_utils.c checksum.c) target_include_directories(packet_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash) diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c new file mode 100644 index 0000000..baf165b --- /dev/null +++ b/infra/packet_manager/packet_manager.c @@ -0,0 +1,382 @@ +#include "utils.h" +#include "stellar/mq.h" +#include "packet_private.h" +#include "packet_manager_private.h" + +#define PACKET_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) +#define PACKET_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_WARN(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) + +TAILQ_HEAD(packet_queue, packet); + +struct packet_manager_config +{ + uint16_t nr_worker_thread; +}; + +struct packet_manager_stat +{ + uint64_t input_pkts; + uint64_t output_pkts; + + uint64_t take_pkts; + uint64_t schedule_pkts; + + uint64_t curr_queue_len[PACKET_STAGE_MAX]; +}; + +struct packet_manager_schema +{ + struct mq_schema *mq; + int topic_id[PACKET_STAGE_MAX]; +}; + +struct packet_manager_runtime +{ + struct mq_runtime *mq; + struct packet_queue queue[PACKET_STAGE_MAX]; + struct packet_manager_stat stat; +}; + +struct packet_manager +{ + struct packet_manager_config *cfg; + struct packet_manager_schema *schema; + struct packet_manager_runtime *runtime[MAX_THREAD_NUM]; +}; + +/****************************************************************************** + * packet manager config + ******************************************************************************/ + +static void packet_manager_config_free(struct packet_manager_config *cfg) +{ + if (cfg) + { + free(cfg); + cfg = NULL; + } +} + +static struct packet_manager_config *packet_manager_config_new(const char *toml_file) +{ + struct packet_manager_config *cfg = calloc(1, sizeof(struct packet_manager_config)); + if (cfg == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_config"); + return NULL; + } + + if (load_and_validate_toml_integer_config(toml_file, "packet_io.nr_worker_thread", (uint64_t *)&cfg->nr_worker_thread, 1, MAX_THREAD_NUM) != 0) + { + PACKET_MANAGER_LOG_ERROR("failed to load packet_io.nr_worker_thread from %s", toml_file); + free(cfg); + return NULL; + } + + return cfg; +} + +/****************************************************************************** + * packet manager schema + ******************************************************************************/ + +static void on_packet_stage_dispatch(int topic_id, const void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) +{ + struct packet_manager_schema *schema = (struct packet_manager_schema *)dispatch_arg; + struct packet *pkt = (struct packet *)msg; + + enum packet_stage stage = PACKET_STAGE_MAX; + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + if (schema->topic_id[i] == topic_id) + { + stage = i; + break; + } + } + + ((on_packet_stage_callback *)cb)(stage, pkt, cb_arg); +} + +static void packet_manager_schema_free(struct packet_manager_schema *schema) +{ + if (schema) + { + if (schema->mq) + { + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + if (schema->topic_id[i] != -1) + { + mq_schema_destroy_topic(schema->mq, schema->topic_id[i]); + } + } + + mq_schema_free(schema->mq); + } + + free(schema); + schema = NULL; + } +} + +static struct packet_manager_schema *packet_manager_schema_new() +{ + struct packet_manager_schema *schema = calloc(1, sizeof(struct packet_manager_schema)); + if (schema == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_schema"); + return NULL; + } + + schema->mq = mq_schema_new(); + if (schema->mq == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create mq_schema"); + goto error_out; + } + + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + schema->topic_id[i] = -1; + } + schema->topic_id[PACKET_STAGE_PREROUTING] = mq_schema_create_topic(schema->mq, "packet_stage_prerouting", on_packet_stage_dispatch, schema, NULL, NULL); + schema->topic_id[PACKET_STAGE_INPUT] = mq_schema_create_topic(schema->mq, "packet_stage_input", on_packet_stage_dispatch, schema, NULL, NULL); + schema->topic_id[PACKET_STAGE_FORWARD] = mq_schema_create_topic(schema->mq, "packet_stage_forward", on_packet_stage_dispatch, schema, NULL, NULL); + schema->topic_id[PACKET_STAGE_OUTPUT] = mq_schema_create_topic(schema->mq, "packet_stage_output", on_packet_stage_dispatch, schema, NULL, NULL); + schema->topic_id[PACKET_STAGE_POSTROUTING] = mq_schema_create_topic(schema->mq, "packet_stage_postrouting", on_packet_stage_dispatch, schema, NULL, NULL); + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + if (schema->topic_id[i] < 0) + { + PACKET_MANAGER_LOG_ERROR("failed to create topic"); + goto error_out; + } + } + + return schema; + +error_out: + packet_manager_schema_free(schema); + return NULL; +} + +/****************************************************************************** + * packet manager runtime + ******************************************************************************/ + +static void packet_manager_runtime_free(struct packet_manager_runtime *runtime) +{ + if (runtime) + { + if (runtime->mq) + { + mq_runtime_free(runtime->mq); + } + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + struct packet *pkt = NULL; + while ((pkt = TAILQ_FIRST(&runtime->queue[i]))) + { + TAILQ_REMOVE(&runtime->queue[i], pkt, stage_tqe); + + // TODO: free packet and free mbuff + packet_free(pkt); + } + } + } + free(runtime); + runtime = NULL; +} + +static struct packet_manager_runtime *packet_manager_runtime_new(struct packet_manager_schema *schema) +{ + struct packet_manager_runtime *runtime = calloc(1, sizeof(struct packet_manager_runtime)); + if (runtime == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager_runtime"); + return NULL; + } + + runtime->mq = mq_runtime_new(schema->mq); + if (runtime->mq == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create mq_runtime"); + free(runtime); + return NULL; + } + + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + TAILQ_INIT(&runtime->queue[i]); + } + + return runtime; +} + +static void packet_manager_runtime_stat(struct packet_manager_runtime *runtime) +{ + PACKET_MANAGER_LOG_INFO("input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu", + runtime->stat.input_pkts, runtime->stat.output_pkts, runtime->stat.take_pkts, runtime->stat.schedule_pkts); + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + PACKET_MANAGER_LOG_INFO("curr_queue_len[%d]: %lu", i, runtime->stat.curr_queue_len[i]); + } +} + +/****************************************************************************** + * Public API + ******************************************************************************/ + +struct packet_manager *packet_manager_new(const char *toml_file) +{ + struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); + if (pkt_mgr == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_manager"); + return NULL; + } + + pkt_mgr->cfg = packet_manager_config_new(toml_file); + if (pkt_mgr->cfg == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_config"); + goto error_out; + } + + pkt_mgr->schema = packet_manager_schema_new(); + if (pkt_mgr->schema == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_schema"); + goto error_out; + } + + for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) + { + pkt_mgr->runtime[i] = packet_manager_runtime_new(pkt_mgr->schema); + if (pkt_mgr->runtime[i] == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); + goto error_out; + } + } + + return pkt_mgr; + +error_out: + packet_manager_free(pkt_mgr); + return NULL; +} + +void packet_manager_free(struct packet_manager *pkt_mgr) +{ + if (pkt_mgr) + { + packet_manager_config_free(pkt_mgr->cfg); + packet_manager_schema_free(pkt_mgr->schema); + + for (uint16_t i = 0; i < pkt_mgr->cfg->nr_worker_thread; i++) + { + if (pkt_mgr->runtime[i]) + { + PACKET_MANAGER_LOG_INFO("worker thread %d packet manager runtime stat", i); + packet_manager_runtime_stat(pkt_mgr->runtime[i]); + packet_manager_runtime_free(pkt_mgr->runtime[i]); + } + } + + free(pkt_mgr); + pkt_mgr = NULL; + } +} + +void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) +{ + pkt_mgr_rt->stat.input_pkts++; + pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_PREROUTING]++; + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe); +} + +struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt) +{ + struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING]); + if (pkt) + { + pkt_mgr_rt->stat.output_pkts++; + pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]--; + TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING], pkt, stage_tqe); + } + return pkt; +} + +void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) +{ + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + struct packet *pkt = NULL; + while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i]))) + { + pkt_mgr_rt->stat.curr_queue_len[i]--; + TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe); + mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt); + mq_runtime_dispatch(pkt_mgr_rt->mq); + + if (packet_is_stolen(pkt)) + { + continue; + } + + if (i + 1 == PACKET_STAGE_MAX) + { + break; + } + else + { + pkt_mgr_rt->stat.curr_queue_len[i + 1]++; + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe); + } + } + } +} + +struct packet_manager_schema *packet_manager_get_schema(struct packet_manager *pkt_mgr) +{ + if (pkt_mgr) + { + return pkt_mgr->schema; + } + else + { + return NULL; + } +} + +struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager *pkt_mgr, uint16_t thr_idx) +{ + if (pkt_mgr && thr_idx < pkt_mgr->cfg->nr_worker_thread) + { + return pkt_mgr->runtime[thr_idx]; + } + else + { + return NULL; + } +} + +int packet_manager_schema_add_subscriber(struct packet_manager_schema *schema, enum packet_stage stage, on_packet_stage_callback cb, void *args) +{ + return mq_schema_subscribe(schema->mq, schema->topic_id[stage], (on_msg_cb_func *)cb, args); +} + +void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) +{ + pkt_mgr_rt->stat.take_pkts++; + packet_set_stolen(pkt, true); +} + +void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage) +{ + pkt_mgr_rt->stat.schedule_pkts++; + packet_set_stolen(pkt, false); + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); +}
\ No newline at end of file diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h new file mode 100644 index 0000000..85c5106 --- /dev/null +++ b/infra/packet_manager/packet_manager_private.h @@ -0,0 +1,19 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "stellar/packet_manager.h" + +struct packet_manager *packet_manager_new(const char *toml_file); +void packet_manager_free(struct packet_manager *pkt_mgr); + +void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); +struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt); +void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); + +#ifdef __cplusplus +} +#endif diff --git a/infra/packet_manager/packet_private.h b/infra/packet_manager/packet_private.h index 69a2fb3..915b763 100644 --- a/infra/packet_manager/packet_private.h +++ b/infra/packet_manager/packet_private.h @@ -5,6 +5,9 @@ extern "C" { #endif +#include <stdbool.h> +#include <sys/queue.h> + #include "tuple.h" #include "stellar/packet.h" @@ -25,7 +28,8 @@ struct metadata uint64_t session_id; uint64_t domain; uint16_t link_id; - int is_ctrl; + bool is_ctrl; + bool is_stolen; enum packet_direction direction; enum packet_action action; @@ -59,6 +63,8 @@ struct packet uint16_t data_len; uint16_t trim_len; // trim eth padding + TAILQ_ENTRY(packet) stage_tqe; + struct metadata meta; }; @@ -93,8 +99,11 @@ uint64_t packet_get_domain(const struct packet *pkt); void packet_set_link_id(struct packet *pkt, uint16_t id); uint16_t packet_get_link_id(const struct packet *pkt); -void packet_set_ctrl(struct packet *pkt, uint8_t ctrl); -uint8_t packet_is_ctrl(const struct packet *pkt); +void packet_set_ctrl(struct packet *pkt, bool ctrl); +bool packet_is_ctrl(const struct packet *pkt); + +void packet_set_stolen(struct packet *pkt, bool stolen); +bool packet_is_stolen(const struct packet *pkt); void packet_set_direction(struct packet *pkt, enum packet_direction dir); diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c index 79485ec..3ec12f3 100644 --- a/infra/packet_manager/packet_utils.c +++ b/infra/packet_manager/packet_utils.c @@ -91,16 +91,26 @@ uint16_t packet_get_link_id(const struct packet *pkt) return pkt->meta.link_id; } -void packet_set_ctrl(struct packet *pkt, uint8_t ctrl) +void packet_set_ctrl(struct packet *pkt, bool ctrl) { pkt->meta.is_ctrl = ctrl; } -uint8_t packet_is_ctrl(const struct packet *pkt) +bool packet_is_ctrl(const struct packet *pkt) { return pkt->meta.is_ctrl; } +void packet_set_stolen(struct packet *pkt, bool stolen) +{ + pkt->meta.is_stolen = stolen; +} + +bool packet_is_stolen(const struct packet *pkt) +{ + return pkt->meta.is_stolen; +} + void packet_set_direction(struct packet *pkt, enum packet_direction dir) { pkt->meta.direction = dir; |
