diff options
| author | luwenpeng <[email protected]> | 2024-11-22 15:30:53 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-11-22 15:30:53 +0800 |
| commit | efc6f46ca0c6a6b4164407d1c346971499fa0005 (patch) | |
| tree | 1aa5190bbb217e23a4f00aefec93075f07a4abcb | |
| parent | 6c9e6e3fbe0f31ad79d8f47c4f983e639dfef30d (diff) | |
support packet_manager_register_node(), remove packet_manager_subscribe()
| -rw-r--r-- | include/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | include/stellar/packet.h | 37 | ||||
| -rw-r--r-- | infra/packet_manager/packet_internal.h | 6 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 283 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager.h | 11 | ||||
| -rw-r--r-- | infra/packet_manager/packet_parser.c | 2 | ||||
| -rw-r--r-- | infra/packet_manager/packet_utils.c | 41 | ||||
| -rw-r--r-- | infra/packet_manager/test/gtest_packet_manager.cpp | 199 | ||||
| -rw-r--r-- | infra/session_manager/session_manager.c | 8 | ||||
| -rw-r--r-- | infra/version.map | 4 | ||||
| -rw-r--r-- | test/CMakeLists.txt | 2 |
11 files changed, 285 insertions, 309 deletions
diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 4968b32..1c084d4 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -2,6 +2,5 @@ install(FILES stellar/utils.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/packet.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/session.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/stellar.h DESTINATION include/stellar/ COMPONENT LIBRARIES) -install(FILES stellar/mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/log.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
\ No newline at end of file diff --git a/include/stellar/packet.h b/include/stellar/packet.h index b2d3f22..8eb7b74 100644 --- a/include/stellar/packet.h +++ b/include/stellar/packet.h @@ -16,6 +16,7 @@ extern "C" #include <linux/if_ether.h> #include <linux/mpls.h> +#include "stellar/packet_tag.h" #include "stellar/exdata.h" #include "stellar/module.h" @@ -167,8 +168,18 @@ enum packet_type PACKET_TYPE_PSEUDO = 1, }; -enum packet_type packet_get_type(const struct packet *pkt); +enum packet_stage +{ + PACKET_STAGE_PREROUTING, + PACKET_STAGE_INPUT, + PACKET_STAGE_FORWARD, + PACKET_STAGE_OUTPUT, + PACKET_STAGE_POSTROUTING, + PACKET_STAGE_MAX, +}; + void packet_set_type(struct packet *pkt, enum packet_type type); +enum packet_type packet_get_type(const struct packet *pkt); void packet_set_action(struct packet *pkt, enum packet_action action); enum packet_action packet_get_action(const struct packet *pkt); @@ -188,33 +199,27 @@ void *packet_get_exdata(const struct packet *pkt, int idx); void packet_tag_set(struct packet *pkt, uint64_t key_bits, uint64_t val_bits); void packet_tag_get(const struct packet *pkt, uint64_t *key_bits, uint64_t *val_bits); +int packet_get_ip_proto(const struct packet *pkt); +enum packet_stage packet_get_stage(const struct packet *pkt); + /****************************************************************************** * packet manager ******************************************************************************/ -enum packet_stage -{ - PACKET_STAGE_PREROUTING, - PACKET_STAGE_INPUT, - PACKET_STAGE_FORWARD, - PACKET_STAGE_OUTPUT, - PACKET_STAGE_POSTROUTING, - PACKET_STAGE_MAX, -}; - #define PACKET_MANAGER_MODULE_NAME "packet_manager_module" struct packet_manager; struct packet_manager *module_to_packet_manager(struct module *mod); int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg); -typedef void on_packet_stage_callback(struct packet *pkt, enum packet_stage stage, void *arg); -int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *arg); - +typedef void on_packet_callback(struct packet *pkt, void *arg); +int packet_manager_register_node(struct packet_manager *pkt_mgr, const char *name, enum packet_stage stage, + uint64_t interested_tag_key_bits, + uint64_t interested_tag_val_bits, + on_packet_callback *cb, void *arg); // if two modules claim the same packet at the same stage, the second 'claim' fails. // return 0 on success // return -1 on failure -typedef void on_packet_claimed_callback(struct packet *pkt, void *arg); -int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_claimed_callback cb, void *arg); +int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_callback *cb, void *arg); void packet_manager_schedule_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, enum packet_stage stage); /* diff --git a/infra/packet_manager/packet_internal.h b/infra/packet_manager/packet_internal.h index e15f25a..d5f8643 100644 --- a/infra/packet_manager/packet_internal.h +++ b/infra/packet_manager/packet_internal.h @@ -75,6 +75,7 @@ struct packet int8_t need_free; int8_t is_defraged; int8_t is_claim; + int8_t ip_proto; // innermost ip proto const char *data_ptr; uint16_t data_len; @@ -89,6 +90,7 @@ struct packet struct packet_queue frag_list; // for defraged packet struct metadata meta; + enum packet_stage stage; enum packet_type type; enum packet_action action; struct packet_origin origin; @@ -181,6 +183,10 @@ void packet_set_user_data(struct packet *pkt, void *data); void packet_set_origin(struct packet *pkt, struct packet_origin *origin); struct packet_origin *packet_get_origin(struct packet *pkt); +void packet_set_stage(struct packet *pkt, enum packet_stage stage); + +const char *packet_stage_to_str(enum packet_stage stage); + #ifdef __cplusplus } #endif diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index d71c986..cb2a5d8 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -2,66 +2,56 @@ #include "utils_internal.h" #include "packet_internal.h" -#include "packet_manager.h" #include "packet_builder.h" +#include "packet_manager.h" #include "fieldstat/fieldstat_easy.h" #define PACKET_MANAGER_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) #define PACKET_MANAGER_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) #define PACKET_MANAGER_LOG_INFO(format, ...) STELLAR_LOG_INFO(__thread_local_logger, "packet manager", format, ##__VA_ARGS__) +struct node +{ + char name[64]; + uint64_t interested_tag_key_bits; + uint64_t interested_tag_val_bits; + on_packet_callback *node_entry; + void *arg; +}; + +#define MAX_NODE_PER_STAGE 128 +struct node_array +{ + struct node array[MAX_NODE_PER_STAGE]; + uint16_t used; +}; + struct packet_manager_rte { enum packet_stage curr_stage; struct packet_queue queue[PACKET_QUEUE_MAX]; void *claim_arg; - on_packet_claimed_callback *claim_cb; + on_packet_callback *claim_cb; - struct mq_runtime *mq_rte; struct packet_manager_stat stat; }; -struct packet_manager_sche -{ - struct exdata_schema *ex_sche; - struct mq_schema *mq_sche; - int pkt_msg_id[PACKET_STAGE_MAX]; -}; - struct packet_manager { uint16_t thread_num; - struct packet_manager_sche *sche; + struct exdata_schema *ex_sche; + struct node_array nodes[PACKET_STAGE_MAX]; struct packet_manager_rte *rte[MAX_THREAD_NUM]; struct fieldstat_easy *fs; - int pkt_mgr_fs_idx[PKT_MGR_STAT_MAX]; + int fs_idx[PKT_MGR_STAT_MAX]; }; /****************************************************************************** * utils ******************************************************************************/ -const char *packet_stage_to_str(enum packet_stage stage) -{ - switch (stage) - { - case PACKET_STAGE_PREROUTING: - return "PACKET_STAGE_PREROUTING"; - case PACKET_STAGE_INPUT: - return "PACKET_STAGE_INPUT"; - case PACKET_STAGE_FORWARD: - return "PACKET_STAGE_FORWARD"; - case PACKET_STAGE_OUTPUT: - return "PACKET_STAGE_OUTPUT"; - case PACKET_STAGE_POSTROUTING: - return "PACKET_STAGE_POSTROUTING"; - default: - return "PACKET_STAGE_UNKNOWN"; - } -} - uint64_t packet_manager_stat_get(struct packet_manager_stat *stat, enum pkt_mgr_stat_type type) { switch (type) @@ -78,27 +68,7 @@ uint64_t packet_manager_stat_get(struct packet_manager_stat *stat, enum pkt_mgr_ * packet manager rte ******************************************************************************/ -static void packet_manager_rte_free(struct packet_manager_rte *pkt_mgr_rte) -{ - struct packet *pkt = NULL; - - if (pkt_mgr_rte) - { - for (int i = 0; i < PACKET_QUEUE_MAX; i++) - { - while ((pkt = TAILQ_FIRST(&pkt_mgr_rte->queue[i]))) - { - TAILQ_REMOVE(&pkt_mgr_rte->queue[i], pkt, stage_tqe); - packet_free(pkt); - } - } - - free(pkt_mgr_rte); - pkt_mgr_rte = NULL; - } -} - -static struct packet_manager_rte *packet_manager_rte_new(struct mq_runtime *mq_rte) +static struct packet_manager_rte *packet_manager_rte_new() { struct packet_manager_rte *pkt_mgr_rte = calloc(1, sizeof(struct packet_manager_rte)); if (pkt_mgr_rte == NULL) @@ -111,100 +81,35 @@ static struct packet_manager_rte *packet_manager_rte_new(struct mq_runtime *mq_r { TAILQ_INIT(&pkt_mgr_rte->queue[i]); } - pkt_mgr_rte->mq_rte = mq_rte; return pkt_mgr_rte; } -/****************************************************************************** - * packet manager sche - ******************************************************************************/ - -static void on_packet_stage_dispatch(int pkt_msg_id, void *msg, on_msg_cb_func *cb, void *cb_arg, void *dispatch_arg) +static void packet_manager_rte_free(struct packet_manager_rte *pkt_mgr_rte) { - assert(msg); - assert(dispatch_arg); - - enum packet_stage stage = PACKET_STAGE_MAX; - struct packet *pkt = (struct packet *)msg; - struct packet_manager_sche *pkt_mgr_sche = (struct packet_manager_sche *)dispatch_arg; - - for (int i = 0; i < PACKET_STAGE_MAX; i++) - { - if (pkt_mgr_sche->pkt_msg_id[i] == pkt_msg_id) - { - stage = i; - break; - } - } - - ((on_packet_stage_callback *)(void *)cb)(pkt, stage, cb_arg); -} + struct packet *pkt = NULL; -static void packet_manager_sche_free(struct packet_manager_sche *pkt_mgr_sche) -{ - if (pkt_mgr_sche) + if (pkt_mgr_rte) { - if (pkt_mgr_sche->mq_sche) + for (int i = 0; i < PACKET_QUEUE_MAX; i++) { - for (int i = 0; i < PACKET_STAGE_MAX; i++) + while ((pkt = TAILQ_FIRST(&pkt_mgr_rte->queue[i]))) { - if (pkt_mgr_sche->pkt_msg_id[i] >= 0) - { - mq_schema_destroy_topic(pkt_mgr_sche->mq_sche, pkt_mgr_sche->pkt_msg_id[i]); - } + TAILQ_REMOVE(&pkt_mgr_rte->queue[i], pkt, stage_tqe); + packet_free(pkt); } } - if (pkt_mgr_sche->ex_sche) - { - exdata_schema_free(pkt_mgr_sche->ex_sche); - } - - free(pkt_mgr_sche); - pkt_mgr_sche = NULL; - } -} - -static struct packet_manager_sche *packet_manager_sche_new(struct mq_schema *mq_sche) -{ - struct packet_manager_sche *pkt_mgr_sche = calloc(1, sizeof(struct packet_manager_sche)); - if (pkt_mgr_sche == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to allocate memory for packet_schema"); - return NULL; - } - - pkt_mgr_sche->mq_sche = mq_sche; - pkt_mgr_sche->ex_sche = exdata_schema_new(); - if (pkt_mgr_sche->ex_sche == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create exdata_schema"); - goto error_out; - } - - for (int i = 0; i < PACKET_STAGE_MAX; i++) - { - pkt_mgr_sche->pkt_msg_id[i] = mq_schema_create_topic(pkt_mgr_sche->mq_sche, packet_stage_to_str(i), &on_packet_stage_dispatch, pkt_mgr_sche, NULL, NULL); - if (pkt_mgr_sche->pkt_msg_id[i] < 0) - { - PACKET_MANAGER_LOG_ERROR("failed to create topic %s", packet_stage_to_str(i)); - goto error_out; - } + free(pkt_mgr_rte); + pkt_mgr_rte = NULL; } - - return pkt_mgr_sche; - -error_out: - packet_manager_sche_free(pkt_mgr_sche); - return NULL; } /****************************************************************************** * packet manager ******************************************************************************/ -struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t thread_num) +struct packet_manager *packet_manager_new(uint16_t thread_num) { struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); if (pkt_mgr == NULL) @@ -214,10 +119,10 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t th } pkt_mgr->thread_num = thread_num; - pkt_mgr->sche = packet_manager_sche_new(mq_sche); - if (pkt_mgr->sche == NULL) + pkt_mgr->ex_sche = exdata_schema_new(); + if (pkt_mgr->ex_sche == NULL) { - PACKET_MANAGER_LOG_ERROR("failed to create packet_schema"); + PACKET_MANAGER_LOG_ERROR("failed to create exdata_schema"); goto error_out; } @@ -234,7 +139,7 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_sche, uint16_t th } for (int i = 0; i < PKT_MGR_STAT_MAX; i++) { - pkt_mgr->pkt_mgr_fs_idx[i] = fieldstat_easy_register_counter(pkt_mgr->fs, pkt_mgr_stat_str[i]); + pkt_mgr->fs_idx[i] = fieldstat_easy_register_counter(pkt_mgr->fs, pkt_mgr_stat_str[i]); } return pkt_mgr; @@ -253,7 +158,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr) fieldstat_easy_free(pkt_mgr->fs); } - packet_manager_sche_free(pkt_mgr->sche); + exdata_schema_free(pkt_mgr->ex_sche); free(pkt_mgr); pkt_mgr = NULL; @@ -263,22 +168,42 @@ void packet_manager_free(struct packet_manager *pkt_mgr) int packet_manager_new_packet_exdata_index(struct packet_manager *pkt_mgr, const char *name, exdata_free *func, void *arg) { assert(pkt_mgr); - return exdata_schema_new_index(pkt_mgr->sche->ex_sche, name, func, arg); + return exdata_schema_new_index(pkt_mgr->ex_sche, name, func, arg); } -int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback *cb, void *arg) +int packet_manager_register_node(struct packet_manager *pkt_mgr, const char *name, enum packet_stage stage, + uint64_t interested_tag_key_bits, + uint64_t interested_tag_val_bits, + on_packet_callback *node_entry, void *arg) { assert(pkt_mgr); - return mq_schema_subscribe(pkt_mgr->sche->mq_sche, pkt_mgr->sche->pkt_msg_id[stage], (on_msg_cb_func *)(void *)cb, arg); + assert(stage < PACKET_STAGE_MAX); + assert(node_entry); + + struct node_array *nodes = &pkt_mgr->nodes[stage]; + if (nodes->used >= MAX_NODE_PER_STAGE) + { + PACKET_MANAGER_LOG_ERROR("exceed max node per stage %d", MAX_NODE_PER_STAGE); + return -1; + } + + struct node *node = &nodes->array[nodes->used]; + strncpy(node->name, name, sizeof(node->name)); + node->interested_tag_key_bits = interested_tag_key_bits; + node->interested_tag_val_bits = interested_tag_val_bits; + node->node_entry = node_entry; + node->arg = arg; + + nodes->used++; + return 0; } -int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rte) +int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id) { assert(pkt_mgr); assert(thread_id < pkt_mgr->thread_num); - assert(mq_rte); - pkt_mgr->rte[thread_id] = packet_manager_rte_new(mq_rte); + pkt_mgr->rte[thread_id] = packet_manager_rte_new(); if (pkt_mgr->rte[thread_id] == NULL) { PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_rte"); @@ -303,8 +228,7 @@ void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id) void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) { struct packet_manager_rte *pkt_mgr_rte = pkt_mgr->rte[thread_id]; - struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); - packet_set_user_data(pkt, ex_rte); + packet_set_user_data(pkt, exdata_runtime_new(pkt_mgr->ex_sche)); pkt_mgr_rte->stat.pkts_ingress++; pkt_mgr_rte->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; @@ -321,9 +245,8 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th pkt_mgr_rte->stat.pkts_egress++; pkt_mgr_rte->stat.queue[PACKET_STAGE_MAX].pkts_out++; TAILQ_REMOVE(&pkt_mgr_rte->queue[PACKET_STAGE_MAX], pkt, stage_tqe); + exdata_runtime_free((struct exdata_runtime *)packet_get_user_data(pkt)); - struct exdata_runtime *ex_rte = packet_get_user_data(pkt); - exdata_runtime_free(ex_rte); return pkt; } else @@ -334,15 +257,37 @@ struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t th void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id) { + uint64_t pkt_tag_key_bits = 0; + uint64_t pkt_tag_val_bits = 0; + struct packet *pkt = NULL; + struct node *node = NULL; + struct node_array *nodes = NULL; struct packet_manager_rte *pkt_mgr_rte = pkt_mgr->rte[thread_id]; for (int i = 0; i < PACKET_STAGE_MAX; i++) { pkt_mgr_rte->curr_stage = i; + nodes = &pkt_mgr->nodes[pkt_mgr_rte->curr_stage]; - struct packet *pkt = NULL; while ((pkt = TAILQ_FIRST(&pkt_mgr_rte->queue[pkt_mgr_rte->curr_stage]))) { + packet_set_stage(pkt, pkt_mgr_rte->curr_stage); + switch (packet_get_ip_proto(pkt)) + { + case IPPROTO_TCP: + packet_tag_set(pkt, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP); + break; + case IPPROTO_UDP: + packet_tag_set(pkt, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_UDP); + break; + case IPPROTO_ICMP: /* fall through */ + case IPPROTO_ICMPV6: + packet_tag_set(pkt, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_ICMP); + break; + default: + break; + } + packet_set_claim(pkt, false); pkt_mgr_rte->claim_cb = NULL; pkt_mgr_rte->claim_arg = NULL; @@ -350,8 +295,16 @@ void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id) TAILQ_REMOVE(&pkt_mgr_rte->queue[pkt_mgr_rte->curr_stage], pkt, stage_tqe); pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_out++; - mq_runtime_publish_message(pkt_mgr_rte->mq_rte, pkt_mgr_rte->curr_stage, pkt); - mq_runtime_dispatch(pkt_mgr_rte->mq_rte); + for (uint16_t j = 0; j < nodes->used; j++) + { + node = &nodes->array[j]; + packet_tag_get(pkt, &pkt_tag_key_bits, &pkt_tag_val_bits); // pkt_tag may be changed by previous node, so we need to get it again + if ((pkt_tag_key_bits & node->interested_tag_key_bits) && + (pkt_tag_val_bits & node->interested_tag_val_bits)) + { + node->node_entry(pkt, node->arg); + } + } // packet has been claimed and cannot be released if (packet_is_claim(pkt)) @@ -379,9 +332,12 @@ void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id) pkt_mgr_rte->curr_stage = -1; } -int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_claimed_callback cb, void *arg) +int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_callback *cb, void *arg) { assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + assert(pkt); + assert(cb); struct packet_manager_rte *pkt_mgr_rte = pkt_mgr->rte[thread_id]; if (packet_is_claim(pkt)) @@ -402,6 +358,8 @@ int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_ void packet_manager_schedule_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, enum packet_stage stage) { assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + assert(pkt); struct packet_manager_rte *pkt_mgr_rte = pkt_mgr->rte[thread_id]; if (stage >= PACKET_STAGE_MAX) @@ -468,9 +426,7 @@ struct packet *packet_manager_build_tcp_packet(struct packet_manager *pkt_mgr, u return NULL; } pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_build_tcp_succ++; - - struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); - packet_set_user_data(pkt, ex_rte); + packet_set_user_data(pkt, exdata_runtime_new(pkt_mgr->ex_sche)); return pkt; } @@ -486,9 +442,7 @@ struct packet *packet_manager_build_udp_packet(struct packet_manager *pkt_mgr, u return NULL; } pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_build_udp_succ++; - - struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); - packet_set_user_data(pkt, ex_rte); + packet_set_user_data(pkt, exdata_runtime_new(pkt_mgr->ex_sche)); return pkt; } @@ -504,9 +458,7 @@ struct packet *packet_manager_build_l3_packet(struct packet_manager *pkt_mgr, ui return NULL; } pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_build_l3_succ++; - - struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); - packet_set_user_data(pkt, ex_rte); + packet_set_user_data(pkt, exdata_runtime_new(pkt_mgr->ex_sche)); return pkt; } @@ -521,9 +473,7 @@ struct packet *packet_manager_dup_packet(struct packet_manager *pkt_mgr, uint16_ return NULL; } pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_dup_succ++; - - struct exdata_runtime *ex_rte = exdata_runtime_new(pkt_mgr->sche->ex_sche); - packet_set_user_data(pkt, ex_rte); + packet_set_user_data(pkt, exdata_runtime_new(pkt_mgr->ex_sche)); return pkt; } @@ -534,10 +484,7 @@ void packet_manager_free_packet(struct packet_manager *pkt_mgr, uint16_t thread_ { struct packet_manager_rte *pkt_mgr_rte = pkt_mgr->rte[thread_id]; pkt_mgr_rte->stat.queue[pkt_mgr_rte->curr_stage].pkts_drop++; - - struct exdata_runtime *ex_rte = packet_get_user_data(pkt); - exdata_runtime_free(ex_rte); - + exdata_runtime_free((struct exdata_runtime *)packet_get_user_data(pkt)); packet_free(pkt); } } @@ -560,7 +507,7 @@ static void on_polling(struct module_manager *mod_mgr, void *args) for (int i = 0; i < PKT_MGR_STAT_MAX; i++) { uint64_t val = packet_manager_stat_get(pkt_mgr_curr_stat, i) - packet_manager_stat_get(&pkt_mgr_last_stat, i); - fieldstat_easy_counter_incrby(pkt_mgr->fs, thread_id, pkt_mgr->pkt_mgr_fs_idx[i], NULL, 0, val); + fieldstat_easy_counter_incrby(pkt_mgr->fs, thread_id, pkt_mgr->fs_idx[i], NULL, 0, val); } pkt_mgr_last_stat = *pkt_mgr_curr_stat; last_sync_stat_ms = now_ms; @@ -577,11 +524,9 @@ struct packet_manager *module_to_packet_manager(struct module *mod) struct module *packet_manager_on_init(struct module_manager *mod_mgr) { assert(mod_mgr); - struct mq_schema *mq_sche = module_manager_get_mq_schema(mod_mgr); - assert(mq_sche); uint16_t thread_num = module_manager_get_max_thread_num(mod_mgr); - struct packet_manager *pkt_mgr = packet_manager_new(mq_sche, thread_num); + struct packet_manager *pkt_mgr = packet_manager_new(thread_num); if (pkt_mgr == NULL) { return NULL; @@ -613,15 +558,13 @@ void packet_manager_on_exit(struct module_manager *mod_mgr __attribute__((unused } } -struct module *packet_manager_on_thread_init(struct module_manager *mod_mgr, int thread_id, struct module *mod) +struct module *packet_manager_on_thread_init(struct module_manager *mod_mgr __attribute__((unused)), int thread_id, struct module *mod) { struct packet_manager *pkt_mgr = module_get_ctx(mod); assert(pkt_mgr); - struct mq_runtime *mq_rte = module_manager_get_mq_runtime(mod_mgr); - assert(mq_rte); assert(thread_id < pkt_mgr->thread_num); - if (packet_manager_init(pkt_mgr, thread_id, mq_rte) != 0) + if (packet_manager_init(pkt_mgr, thread_id) != 0) { PACKET_MANAGER_LOG_ERROR("failed to init packet_manager_init"); return NULL; diff --git a/infra/packet_manager/packet_manager.h b/infra/packet_manager/packet_manager.h index 46e0fa7..f3e4654 100644 --- a/infra/packet_manager/packet_manager.h +++ b/infra/packet_manager/packet_manager.h @@ -5,7 +5,6 @@ extern "C" { #endif -#include "stellar/mq.h" #include "stellar/packet.h" #define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) @@ -122,19 +121,19 @@ __attribute__((unused)) static const char pkt_mgr_stat_str[PKT_MGR_STAT_MAX][64] #undef XX }; -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num); +struct packet_manager *packet_manager_new(uint16_t thread_num); void packet_manager_free(struct packet_manager *pkt_mgr); -int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rte); +int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id); + void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt); struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id); + void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id); + struct packet_manager_stat *packet_manager_get_stat(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_id); - -const char *packet_stage_to_str(enum packet_stage stage); - uint64_t packet_manager_stat_get(struct packet_manager_stat *stat, enum pkt_mgr_stat_type type); #ifdef __cplusplus diff --git a/infra/packet_manager/packet_parser.c b/infra/packet_manager/packet_parser.c index 65d564f..a4c03d0 100644 --- a/infra/packet_manager/packet_parser.c +++ b/infra/packet_manager/packet_parser.c @@ -904,6 +904,7 @@ static inline const char *parse_l3(struct packet *pkt, uint16_t next_proto, cons static inline const char *parse_l4(struct packet *pkt, uint8_t next_proto, const char *data, uint16_t len) { + pkt->ip_proto = next_proto; switch (next_proto) { case IPPROTO_AH: @@ -944,6 +945,7 @@ const char *packet_parse(struct packet *pkt, const char *data, uint16_t len) pkt->data_ptr = data; pkt->data_len = len; pkt->trim_len = 0; + pkt->ip_proto = 0; return parse_ether(pkt, data, len); } diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c index 042c94d..ce9dc51 100644 --- a/infra/packet_manager/packet_utils.c +++ b/infra/packet_manager/packet_utils.c @@ -976,6 +976,9 @@ void packet_tag_set(struct packet *pkt, uint64_t key_bits, uint64_t val_bits) void packet_tag_get(const struct packet *pkt, uint64_t *key_bits, uint64_t *val_bits) { + *key_bits = 0; + *val_bits = 0; + *key_bits = pkt->tag_key_bits; *val_bits = pkt->tag_val_bits; } @@ -1010,14 +1013,24 @@ bool packet_is_claim(const struct packet *pkt) return pkt->is_claim; } +void packet_set_type(struct packet *pkt, enum packet_type type) +{ + pkt->type = type; +} + enum packet_type packet_get_type(const struct packet *pkt) { return pkt->type; } -void packet_set_type(struct packet *pkt, enum packet_type type) +void packet_set_stage(struct packet *pkt, enum packet_stage stage) { - pkt->type = type; + pkt->stage = stage; +} + +enum packet_stage packet_get_stage(const struct packet *pkt) +{ + return pkt->stage; } void packet_set_action(struct packet *pkt, enum packet_action action) @@ -1029,3 +1042,27 @@ enum packet_action packet_get_action(const struct packet *pkt) { return pkt->action; } + +int packet_get_ip_proto(const struct packet *pkt) +{ + return pkt->ip_proto; +} + +const char *packet_stage_to_str(enum packet_stage stage) +{ + switch (stage) + { + case PACKET_STAGE_PREROUTING: + return "PACKET_STAGE_PREROUTING"; + case PACKET_STAGE_INPUT: + return "PACKET_STAGE_INPUT"; + case PACKET_STAGE_FORWARD: + return "PACKET_STAGE_FORWARD"; + case PACKET_STAGE_OUTPUT: + return "PACKET_STAGE_OUTPUT"; + case PACKET_STAGE_POSTROUTING: + return "PACKET_STAGE_POSTROUTING"; + default: + return "PACKET_STAGE_UNKNOWN"; + } +}
\ No newline at end of file diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index 1be35d4..cef52e4 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -101,23 +101,25 @@ static void check_stat(struct packet_manager_stat *curr_stat, struct packet_mana #if 1 TEST(PACKET_MANAGER, NEW_FREE) { - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); packet_manager_free(pkt_mgr); - - mq_schema_free(mq_schema); } #endif #if 1 -static void on_packet_stage(struct packet *pkt, enum packet_stage stage, void *args) +static void on_packet(struct packet *pkt, void *args) { + enum packet_stage stage = packet_get_stage(pkt); printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); + static int count = 0; EXPECT_TRUE(count == stage); EXPECT_TRUE(packet_get_type(pkt) == PACKET_TYPE_PSEUDO); @@ -125,25 +127,19 @@ static void on_packet_stage(struct packet *pkt, enum packet_stage stage, void *a count++; } -TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) +TEST(PACKET_MANAGER, REGISTER) { - // global init - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - EXPECT_TRUE(mq_rt); - // module init - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, on_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, on_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, on_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, on_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, on_packet, NULL) == 0); // per-thread init - packet_manager_init(pkt_mgr, thread_id, mq_rt); + packet_manager_init(pkt_mgr, thread_id); // per-thread run struct packet pkt; @@ -176,18 +172,21 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) // module free packet_manager_free(pkt_mgr); - - // global free - mq_runtime_free(mq_rt); - mq_schema_free(mq_schema); } #endif #if 1 -static void on_forward_stage_drop_packet(struct packet *pkt, enum packet_stage stage, void *args) +static void drop_packet(struct packet *pkt, void *args) { + enum packet_stage stage = packet_get_stage(pkt); printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); + static int count = 0; EXPECT_TRUE(count == stage); EXPECT_TRUE(packet_get_type(pkt) == PACKET_TYPE_PSEUDO); @@ -202,23 +201,17 @@ static void on_forward_stage_drop_packet(struct packet *pkt, enum packet_stage s TEST(PACKET_MANAGER, DROP_PACKET) { - // global init - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - EXPECT_TRUE(mq_rt); - // module init - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_forward_stage_drop_packet, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_forward_stage_drop_packet, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_forward_stage_drop_packet, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_forward_stage_drop_packet, NULL) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_forward_stage_drop_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, drop_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, drop_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, drop_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, drop_packet, NULL) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, drop_packet, NULL) == 0); // per-thread init - packet_manager_init(pkt_mgr, thread_id, mq_rt); + packet_manager_init(pkt_mgr, thread_id); // per-thread run struct packet pkt; @@ -250,10 +243,6 @@ TEST(PACKET_MANAGER, DROP_PACKET) // module free packet_manager_free(pkt_mgr); - - // global free - mq_runtime_free(mq_rt); - mq_schema_free(mq_schema); } #endif @@ -268,11 +257,17 @@ static void packet_claimed(struct packet *pkt, void *args) free(str); } -static void claim_packet_success(struct packet *pkt, enum packet_stage stage, void *args) +static void claim_packet_success(struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; + enum packet_stage stage = packet_get_stage(pkt); + printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); - printf("claim_packet_success: %s\n", packet_stage_to_str(stage)); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); static int count = 0; EXPECT_TRUE(count == 0); @@ -283,11 +278,17 @@ static void claim_packet_success(struct packet *pkt, enum packet_stage stage, vo count++; } -static void claim_packet_failed(struct packet *pkt, enum packet_stage stage, void *args) +static void claim_packet_failed(struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; + enum packet_stage stage = packet_get_stage(pkt); + printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); - printf("claim_packet_failed: %s\n", packet_stage_to_str(stage)); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); static int count = 0; EXPECT_TRUE(count == 0); @@ -300,29 +301,23 @@ static void claim_packet_failed(struct packet *pkt, enum packet_stage stage, voi TEST(PACKET_MANAGER, CLAIM_PACKET) { - // global init - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - EXPECT_TRUE(mq_rt); - // module init - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, claim_packet_success, pkt_mgr) == 0); - - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_success, pkt_mgr) == 0); + + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_failed, pkt_mgr) == 0); // per-thread init - packet_manager_init(pkt_mgr, thread_id, mq_rt); + packet_manager_init(pkt_mgr, thread_id); // per-thread run struct packet pkt; @@ -354,19 +349,21 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) // module free packet_manager_free(pkt_mgr); - - // global free - mq_runtime_free(mq_rt); - mq_schema_free(mq_schema); } #endif #if 1 -static void on_packet_stage_schedule_packet(struct packet *pkt, enum packet_stage stage, void *args) +static void schedule_packet(struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; + enum packet_stage stage = packet_get_stage(pkt); + printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); - printf("on_packet_stage_schedule_packet: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); EXPECT_TRUE(!packet_is_claim(pkt)); @@ -381,23 +378,17 @@ static void on_packet_stage_schedule_packet(struct packet *pkt, enum packet_stag TEST(PACKET_MANAGER, SCHEDULE_PACKET) { - // global init - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - EXPECT_TRUE(mq_rt); - // module init - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, schedule_packet, pkt_mgr) == 0); // per-thread init - packet_manager_init(pkt_mgr, thread_id, mq_rt); + packet_manager_init(pkt_mgr, thread_id); // per-thread run struct packet pkt; @@ -438,10 +429,6 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) // module free packet_manager_free(pkt_mgr); - - // global free - mq_runtime_free(mq_rt); - mq_schema_free(mq_schema); } #endif @@ -457,11 +444,17 @@ static void schedule_claimed_packet(struct packet *pkt, void *args) packet_manager_schedule_packet(pkt_mgr, thread_id, pkt, PACKET_STAGE_POSTROUTING); } -static void on_packet_stage_claim_packet_to_schedule(struct packet *pkt, enum packet_stage stage, void *args) +static void claim_packet_to_schedule(struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; + enum packet_stage stage = packet_get_stage(pkt); + printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); - printf("on_packet_stage_claim_packet_to_schedule: %s\n", packet_stage_to_str(stage)); + uint64_t tag_key_bits; + uint64_t tag_val_bits; + packet_tag_get(pkt, &tag_key_bits, &tag_val_bits); + EXPECT_TRUE(tag_key_bits == PKT_TAG_KEY_IPPROTO); + EXPECT_TRUE(tag_val_bits == PKT_TAG_VAL_IPPROTO_TCP); static int count = 0; EXPECT_TRUE(packet_get_type(pkt) == PACKET_TYPE_PSEUDO); @@ -485,23 +478,17 @@ static void on_packet_stage_claim_packet_to_schedule(struct packet *pkt, enum pa TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) { - // global init - struct mq_schema *mq_schema = mq_schema_new(); - EXPECT_TRUE(mq_schema); - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - EXPECT_TRUE(mq_rt); - // module init - struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, 1); + struct packet_manager *pkt_mgr = packet_manager_new(1); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_PREROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_INPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_register_node(pkt_mgr, "name", PACKET_STAGE_POSTROUTING, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP, claim_packet_to_schedule, pkt_mgr) == 0); // per-thread init - packet_manager_init(pkt_mgr, thread_id, mq_rt); + packet_manager_init(pkt_mgr, thread_id); // per-thread run struct packet pkt; @@ -533,10 +520,6 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) // module free packet_manager_free(pkt_mgr); - - // global free - mq_runtime_free(mq_rt); - mq_schema_free(mq_schema); } #endif diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index e48efee..66bed74 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -77,7 +77,7 @@ static void notify_sess_closed_by_pseudo_pkt(struct session_manager *sess_mgr, i SESSION_MANAGER_LOG_INFO("notify session %lu %s closed by pseudo packet: %p", session_get_id(sess), session_get_readable_addr(sess), pseudo); } -static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void *args) +static void on_packet_forward(struct packet *pkt, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr); @@ -143,7 +143,7 @@ static void on_packet_forward(struct packet *pkt, enum packet_stage stage, void } } -static void on_packet_output(struct packet *pkt, enum packet_stage stage, void *args) +static void on_packet_output(struct packet *pkt, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; int thread_id = module_manager_get_thread_id(sess_mgr->mod_mgr); @@ -289,12 +289,12 @@ static struct session_manager *session_manager_new(struct packet_manager *pkt_mg goto error_out; } - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_forward, sess_mgr)) + if (packet_manager_register_node(pkt_mgr, "session_manager", PACKET_STAGE_FORWARD, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP | PKT_TAG_VAL_IPPROTO_UDP, on_packet_forward, sess_mgr)) { SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_FORWARD"); goto error_out; } - if (packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_output, sess_mgr)) + if (packet_manager_register_node(pkt_mgr, "session_manager", PACKET_STAGE_OUTPUT, PKT_TAG_KEY_IPPROTO, PKT_TAG_VAL_IPPROTO_TCP | PKT_TAG_VAL_IPPROTO_UDP, on_packet_output, sess_mgr)) { SESSION_MANAGER_LOG_ERROR("failed to subscribe PACKET_STAGE_OUTPUT"); goto error_out; diff --git a/infra/version.map b/infra/version.map index e714fa4..a7bc7a3 100644 --- a/infra/version.map +++ b/infra/version.map @@ -18,13 +18,15 @@ global: packet_get_exdata; packet_tag_set; packet_tag_get; + packet_get_ip_proto; + packet_get_stage; packet_manager_on_init; packet_manager_on_exit; packet_manager_on_thread_init; packet_manager_on_thread_exit; packet_manager_new_packet_exdata_index; - packet_manager_subscribe; + packet_manager_register_node; packet_manager_claim_packet; packet_manager_schedule_packet; packet_manager_build_tcp_packet; diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5f93a2e..8b080ac 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,4 +6,4 @@ add_subdirectory(packet_tool) #add_subdirectory(decoders/socks) #add_subdirectory(decoders/stratum) #add_subdirectory(decoders/session_flags) -add_subdirectory(monitor)
\ No newline at end of file +#add_subdirectory(monitor)
\ No newline at end of file |
