diff options
| author | luwenpeng <[email protected]> | 2024-09-14 18:38:37 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-09-18 14:36:31 +0800 |
| commit | 721d5d1466541cc54a991fc5359cd7013e10f936 (patch) | |
| tree | 0c0ea2064c7540ad9c4b8d096b88751ed3861e60 /infra/packet_manager | |
| parent | f559d67b93df78e9f6d5c3fe301b688b5c857d98 (diff) | |
feature(packet manager): support claim packt and add test case
Diffstat (limited to 'infra/packet_manager')
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 105 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_private.h | 20 | ||||
| -rw-r--r-- | infra/packet_manager/packet_private.h | 6 | ||||
| -rw-r--r-- | infra/packet_manager/packet_utils.c | 24 | ||||
| -rw-r--r-- | infra/packet_manager/test/gtest_packet_manager.cpp | 199 |
5 files changed, 288 insertions, 66 deletions
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index b13802f..4cb92ca 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -16,17 +16,6 @@ struct packet_manager_config uint16_t nr_worker_thread; }; -struct packet_manager_stat -{ - uint64_t input_pkts; - uint64_t output_pkts; - - uint64_t take_pkts; - uint64_t schedule_pkts; - - uint64_t curr_queue_len[PACKET_STAGE_MAX]; -}; - struct packet_manager_schema { struct mq_schema *mq; @@ -36,9 +25,11 @@ struct packet_manager_schema struct packet_manager_runtime { uint16_t idx; + void *cb_args; + on_packet_claimed_callback *claimed_cb; struct mq_runtime *mq; - struct packet_queue queue[PACKET_STAGE_MAX]; - struct packet_manager_stat stat; + struct packet_queue queue[QUEUE_NUM_MAX]; + struct packet_manager_runtime_stat stat; }; struct packet_manager @@ -192,7 +183,7 @@ static void packet_manager_runtime_free(struct packet_manager_runtime *runtime) { if (runtime) { - for (int i = 0; i < PACKET_STAGE_MAX; i++) + for (int i = 0; i < QUEUE_NUM_MAX; i++) { struct packet *pkt = NULL; while ((pkt = TAILQ_FIRST(&runtime->queue[i]))) @@ -219,7 +210,7 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx) runtime->idx = idx; - for (int i = 0; i < PACKET_STAGE_MAX; i++) + for (int i = 0; i < QUEUE_NUM_MAX; i++) { TAILQ_INIT(&runtime->queue[i]); } @@ -227,17 +218,22 @@ static struct packet_manager_runtime *packet_manager_runtime_new(uint16_t idx) return runtime; } -static void packet_manager_runtime_stat(struct packet_manager_runtime *runtime) +void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime) { - PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, take_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}", + PACKET_MANAGER_LOG_DEBUG("runtime[%d] => input_pkts: %lu, output_pkts: %lu, claim_pkts: %lu, schedule_pkts: %lu, queue_len: {pre_routing: %lu, input: %lu, forward: %lu, output: %lu, post_routing: %lu}", runtime->idx, runtime->stat.input_pkts, runtime->stat.output_pkts, - runtime->stat.take_pkts, runtime->stat.schedule_pkts, - runtime->stat.curr_queue_len[PACKET_STAGE_PREROUTING], - runtime->stat.curr_queue_len[PACKET_STAGE_INPUT], - runtime->stat.curr_queue_len[PACKET_STAGE_FORWARD], - runtime->stat.curr_queue_len[PACKET_STAGE_OUTPUT], - runtime->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]); + runtime->stat.claim_pkts, runtime->stat.schedule_pkts, + runtime->stat.queue_len[PACKET_STAGE_PREROUTING], + runtime->stat.queue_len[PACKET_STAGE_INPUT], + runtime->stat.queue_len[PACKET_STAGE_FORWARD], + runtime->stat.queue_len[PACKET_STAGE_OUTPUT], + runtime->stat.queue_len[PACKET_STAGE_POSTROUTING]); +} + +struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime) +{ + return &runtime->stat; } void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt) @@ -248,18 +244,18 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) { pkt_mgr_rt->stat.input_pkts++; - pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_PREROUTING]++; + pkt_mgr_rt->stat.queue_len[PACKET_STAGE_PREROUTING]++; TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe); } struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt) { - struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING]); + struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1]); if (pkt) { pkt_mgr_rt->stat.output_pkts++; - pkt_mgr_rt->stat.curr_queue_len[PACKET_STAGE_POSTROUTING]--; - TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_POSTROUTING], pkt, stage_tqe); + pkt_mgr_rt->stat.queue_len[QUEUE_NUM_MAX - 1]--; + TAILQ_REMOVE(&pkt_mgr_rt->queue[QUEUE_NUM_MAX - 1], pkt, stage_tqe); } return pkt; } @@ -268,47 +264,64 @@ void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) { for (int i = 0; i < PACKET_STAGE_MAX; i++) { - // packet_manager_runtime_stat(pkt_mgr_rt); + packet_manager_runtime_print_stat(pkt_mgr_rt); struct packet *pkt = NULL; while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[i]))) { + packet_set_claim(pkt, false); + pkt_mgr_rt->claimed_cb = NULL; + pkt_mgr_rt->cb_args = NULL; + TAILQ_REMOVE(&pkt_mgr_rt->queue[i], pkt, stage_tqe); - pkt_mgr_rt->stat.curr_queue_len[i]--; + pkt_mgr_rt->stat.queue_len[i]--; mq_runtime_publish_message(pkt_mgr_rt->mq, i, pkt); mq_runtime_dispatch(pkt_mgr_rt->mq); - if (packet_is_stolen(pkt)) + if (packet_is_claim(pkt)) { + if (pkt_mgr_rt->claimed_cb) + { + pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args); + } continue; } - if (i + 1 == PACKET_STAGE_MAX) - { - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i], pkt, stage_tqe); - pkt_mgr_rt->stat.curr_queue_len[i]++; - break; - } - else - { - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe); - pkt_mgr_rt->stat.curr_queue_len[i + 1]++; - } + TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[i + 1], pkt, stage_tqe); + pkt_mgr_rt->stat.queue_len[i + 1]++; } } } -void packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) +int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args) { - pkt_mgr_rt->stat.take_pkts++; - packet_set_stolen(pkt, true); + if (packet_is_claim(pkt)) + { + PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again"); + return -1; + } + else + { + pkt_mgr_rt->claimed_cb = cb; + pkt_mgr_rt->cb_args = args; + packet_set_claim(pkt, true); + pkt_mgr_rt->stat.claim_pkts++; + return 0; + } } void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage) { + if (stage >= PACKET_STAGE_MAX) + { + PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage); + assert(0); + return; + } + pkt_mgr_rt->stat.schedule_pkts++; - packet_set_stolen(pkt, false); + pkt_mgr_rt->stat.queue_len[stage]++; TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); } @@ -364,7 +377,7 @@ void packet_manager_free(struct packet_manager *pkt_mgr) { if (pkt_mgr->runtime[i]) { - packet_manager_runtime_stat(pkt_mgr->runtime[i]); + packet_manager_runtime_print_stat(pkt_mgr->runtime[i]); packet_manager_runtime_free(pkt_mgr->runtime[i]); } } diff --git a/infra/packet_manager/packet_manager_private.h b/infra/packet_manager/packet_manager_private.h index 3f0dcf3..a24fa7b 100644 --- a/infra/packet_manager/packet_manager_private.h +++ b/infra/packet_manager/packet_manager_private.h @@ -14,8 +14,26 @@ void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, stru void packet_manager_runtime_input(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); struct packet *packet_manager_runtime_output(struct packet_manager_runtime *pkt_mgr_rt); void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); -// for debug + +/****************************************************************************** + * for gtest + ******************************************************************************/ + +#define QUEUE_NUM_MAX (PACKET_STAGE_MAX + 1) // the last queue is for egress +struct packet_manager_runtime_stat +{ + uint64_t input_pkts; + uint64_t output_pkts; + + uint64_t claim_pkts; + uint64_t schedule_pkts; + + uint64_t queue_len[QUEUE_NUM_MAX]; +}; + const char *packet_stage_to_str(enum packet_stage stage); +void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime); +struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime); #ifdef __cplusplus } diff --git a/infra/packet_manager/packet_private.h b/infra/packet_manager/packet_private.h index 915b763..e312e1b 100644 --- a/infra/packet_manager/packet_private.h +++ b/infra/packet_manager/packet_private.h @@ -29,7 +29,7 @@ struct metadata uint64_t domain; uint16_t link_id; bool is_ctrl; - bool is_stolen; + bool is_claim; enum packet_direction direction; enum packet_action action; @@ -102,8 +102,8 @@ uint16_t packet_get_link_id(const struct packet *pkt); void packet_set_ctrl(struct packet *pkt, bool ctrl); bool packet_is_ctrl(const struct packet *pkt); -void packet_set_stolen(struct packet *pkt, bool stolen); -bool packet_is_stolen(const struct packet *pkt); +void packet_set_claim(struct packet *pkt, bool claim); +bool packet_is_claim(const struct packet *pkt); void packet_set_direction(struct packet *pkt, enum packet_direction dir); diff --git a/infra/packet_manager/packet_utils.c b/infra/packet_manager/packet_utils.c index 3ec12f3..86cc00b 100644 --- a/infra/packet_manager/packet_utils.c +++ b/infra/packet_manager/packet_utils.c @@ -1,3 +1,5 @@ +#include <assert.h> + #include "tuple.h" #include "uthash.h" #include "log_private.h" @@ -101,14 +103,14 @@ bool packet_is_ctrl(const struct packet *pkt) return pkt->meta.is_ctrl; } -void packet_set_stolen(struct packet *pkt, bool stolen) +void packet_set_claim(struct packet *pkt, bool claim) { - pkt->meta.is_stolen = stolen; + pkt->meta.is_claim = claim; } -bool packet_is_stolen(const struct packet *pkt) +bool packet_is_claim(const struct packet *pkt) { - return pkt->meta.is_stolen; + return pkt->meta.is_claim; } void packet_set_direction(struct packet *pkt, enum packet_direction dir) @@ -929,9 +931,19 @@ struct packet *packet_dup(const struct packet *pkt) void packet_free(struct packet *pkt) { - if (pkt && pkt->need_free) + if (pkt) { - free((void *)pkt); + if (packet_is_claim(pkt)) + { + PACKET_LOG_ERROR("packet has been claimed and cannot be released, please check the module arrangement order"); + assert(0); + return; + } + + if (pkt->need_free) + { + free((void *)pkt); + } } } diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index 2fee4a4..e53b69d 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -67,6 +67,19 @@ unsigned char data[] = { 0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01, 0x04, 0x02}; +static void check_stat(struct packet_manager_runtime_stat *stat, uint64_t input_pkts, uint64_t output_pkts, uint64_t claim_pkts, uint64_t schedule_pkts) +{ + EXPECT_TRUE(stat->input_pkts == input_pkts); + EXPECT_TRUE(stat->output_pkts == output_pkts); + EXPECT_TRUE(stat->claim_pkts == claim_pkts); + EXPECT_TRUE(stat->schedule_pkts == schedule_pkts); + + for (int i = 0; i < PACKET_STAGE_MAX; i++) + { + EXPECT_TRUE(stat->queue_len[i] == 0); + } +} + #if 1 TEST(PACKET_MANAGER, NEW_FREE) { @@ -86,20 +99,19 @@ TEST(PACKET_MANAGER, NEW_FREE) } #endif +#if 1 static void on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) { - printf("stage: %s\n", packet_stage_to_str(stage)); + printf("on_packet_stage: %s\n", packet_stage_to_str(stage)); static int count = 0; EXPECT_TRUE(count == stage); - count++; - EXPECT_TRUE(packet_is_ctrl(pkt)); EXPECT_TRUE(args == NULL); + count++; } -#if 1 -TEST(PACKET_MANAGER, NORNMAL) +TEST(PACKET_MANAGER, SUBSCRIBER) { // global init struct mq_schema *mq_schema = mq_schema_new(); @@ -129,9 +141,12 @@ TEST(PACKET_MANAGER, NORNMAL) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); + check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); packet_manager_runtime_input(runtime, &pkt); packet_manager_runtime_dispatch(runtime); EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); + EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); + check_stat(packet_manager_runtime_get_stat(runtime), 1, 1, 0, 0); // per-thread free @@ -145,18 +160,182 @@ TEST(PACKET_MANAGER, NORNMAL) #endif #if 1 -TEST(PACKET_MANAGER, TAKE) +static void packet_claimed(struct packet *pkt, void *args) { - // TODO - // packet_manager_runtime_take_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); + char *str = (char *)args; + EXPECT_STREQ(str, "hello"); + printf("packet_claimed: with ctx %s\n", str); + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(packet_is_claim(pkt)); + free(str); +} + +static void module_A_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct packet_manager *pkt_mgr = (struct packet_manager *)args; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(pkt_mgr_rt); + + printf("module_A_on_packet_stage: %s claim packet success\n", packet_stage_to_str(stage)); + + static int count = 0; + EXPECT_TRUE(count == 0); + EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(!packet_is_claim(pkt)); + packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello")); + count++; +} + +static void module_B_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct packet_manager *pkt_mgr = (struct packet_manager *)args; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(pkt_mgr_rt); + + printf("module_B_on_packet_stage: %s claim packet failed\n", packet_stage_to_str(stage)); + + static int count = 0; + EXPECT_TRUE(count == 0); + EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); + EXPECT_TRUE(packet_is_ctrl(pkt)); + EXPECT_TRUE(packet_is_claim(pkt)); + count++; +} + +TEST(PACKET_MANAGER, CLAIM_PACKET) +{ + // global init + struct mq_schema *mq_schema = mq_schema_new(); + EXPECT_TRUE(mq_schema); + struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); + EXPECT_TRUE(mq_rt); + + // module init + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); + EXPECT_TRUE(pkt_mgr); + struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); + EXPECT_TRUE(schema); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_A_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_A_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_A_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_A_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_A_on_packet_stage, pkt_mgr) == 0); + + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_B_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_B_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_B_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_B_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_B_on_packet_stage, pkt_mgr) == 0); + + // per-thread init + struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(runtime); + packet_manager_runtime_init(runtime, mq_rt); + + // per-thread run + struct packet pkt; + memset(&pkt, 0, sizeof(pkt)); + packet_parse(&pkt, (const char *)data, sizeof(data)); + packet_set_ctrl(&pkt, true); + + check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); + packet_manager_runtime_input(runtime, &pkt); + packet_manager_runtime_dispatch(runtime); + EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); + check_stat(packet_manager_runtime_get_stat(runtime), 1, 0, 1, 0); + + // per-thread free + + // module free + packet_manager_free(pkt_mgr); + + // global free + mq_runtime_free(mq_rt); + mq_schema_free(mq_schema); +} +#endif + +#if 1 +static void module_C_on_packet_stage(enum packet_stage stage, struct packet *pkt, void *args) +{ + struct packet_manager *pkt_mgr = (struct packet_manager *)args; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(pkt_mgr_rt); + + printf("module_C_on_packet_stage: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt); + + EXPECT_TRUE(!packet_is_claim(pkt)); + + if (stage == PACKET_STAGE_PREROUTING) + { + packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_INPUT); + packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_FORWARD); + packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_OUTPUT); + packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_POSTROUTING); + } +} + +TEST(PACKET_MANAGER, SCHEDULE_PACKET) +{ + // global init + struct mq_schema *mq_schema = mq_schema_new(); + EXPECT_TRUE(mq_schema); + struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); + EXPECT_TRUE(mq_rt); + + // module init + struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); + EXPECT_TRUE(pkt_mgr); + struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); + EXPECT_TRUE(schema); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, module_C_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, module_C_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, module_C_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, module_C_on_packet_stage, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, module_C_on_packet_stage, pkt_mgr) == 0); + + // per-thread init + struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); + EXPECT_TRUE(runtime); + packet_manager_runtime_init(runtime, mq_rt); + + // per-thread run + struct packet pkt; + memset(&pkt, 0, sizeof(pkt)); + packet_parse(&pkt, (const char *)data, sizeof(data)); + packet_set_ctrl(&pkt, true); + + check_stat(packet_manager_runtime_get_stat(runtime), 0, 0, 0, 0); + packet_manager_runtime_input(runtime, &pkt); + packet_manager_runtime_dispatch(runtime); + + struct packet *tmp = NULL; + for (int i = 0; i < 4; i++) + { + tmp = packet_manager_runtime_output(runtime); + EXPECT_TRUE(tmp); + packet_free(tmp); + } + EXPECT_TRUE(packet_manager_runtime_output(runtime) == &pkt); + EXPECT_TRUE(packet_manager_runtime_output(runtime) == NULL); + check_stat(packet_manager_runtime_get_stat(runtime), 1, 5, 0, 4); + + // per-thread free + + // module free + packet_manager_free(pkt_mgr); + + // global free + mq_runtime_free(mq_rt); + mq_schema_free(mq_schema); } #endif #if 1 -TEST(PACKET_MANAGER, SCHEDULE) +TEST(PACKET_MANAGER, CLAIM_AND_SCHEDULE_PACKET) { // TODO - // packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage); } #endif |
