diff options
| author | luwenpeng <[email protected]> | 2024-09-19 17:26:34 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-09-20 16:59:19 +0800 |
| commit | 60caf1bda19f8797b72ef93a9c18517c4ca9495c (patch) | |
| tree | 624c56dd26a76dbc59dba8e567121e7b03e7c756 | |
| parent | aedc675c34c48278fbb8103af3927bb9077fa74e (diff) | |
refactor(packet manager): hide packet manager schema and packet manager runtime
| -rw-r--r-- | include/stellar/packet_manager.h | 11 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 256 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_internal.h | 26 | ||||
| -rw-r--r-- | infra/packet_manager/test/gtest_packet_manager.cpp | 160 |
4 files changed, 206 insertions, 247 deletions
diff --git a/include/stellar/packet_manager.h b/include/stellar/packet_manager.h index 5d78ec3..ebbff80 100644 --- a/include/stellar/packet_manager.h +++ b/include/stellar/packet_manager.h @@ -18,21 +18,16 @@ enum packet_stage }; struct packet_manager; -struct packet_manager_schema; -struct packet_manager_runtime; - -struct packet_manager_schema *packet_manager_get_schema(struct packet_manager *pkt_mgr); -struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager *pkt_mgr, uint16_t thr_idx); typedef void on_packet_stage_callback(enum packet_stage stage, struct packet *pkt, void *args); -int packet_manager_schema_add_subscriber(struct packet_manager_schema *pkt_mgr_schema, enum packet_stage stage, on_packet_stage_callback cb, void *args); +int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args); // if two modules claim the same packet at the same stage, the second 'claim' fails. // return 0 on success // return -1 on failure typedef void on_packet_claimed_callback(struct packet *pkt, void *args); -int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args); -void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage); +int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_claimed_callback cb, void *args); +void packet_manager_schedule_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, enum packet_stage stage); #ifdef __cplusplus } diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 9fa12ed..44efca8 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -24,12 +24,14 @@ struct packet_manager_schema struct packet_manager_runtime { + enum packet_stage stage; + struct packet_queue queue[PACKET_QUEUE_MAX]; + void *cb_args; on_packet_claimed_callback *claimed_cb; - enum packet_stage stage; + struct mq_runtime *mq; - struct packet_queue queue[PACKET_QUEUE_MAX]; - struct packet_manager_runtime_stat stat; + struct packet_manager_stat stat; }; struct packet_manager @@ -169,11 +171,6 @@ error_out: return NULL; } -int packet_manager_schema_add_subscriber(struct packet_manager_schema *pkt_mgr_schema, enum packet_stage stage, on_packet_stage_callback cb, void *args) -{ - return mq_schema_subscribe(pkt_mgr_schema->mq, pkt_mgr_schema->topic_id[stage], (on_msg_cb_func *)cb, args); -} - /****************************************************************************** * packet manager runtime ******************************************************************************/ @@ -215,120 +212,6 @@ static struct packet_manager_runtime *packet_manager_runtime_new() return runtime; } -void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime) -{ - PACKET_MANAGER_LOG_DEBUG("runtime[%p] current stage: %s, pkts_ingress: %lu, pkts_egress: %lu", - runtime, packet_stage_to_str(runtime->stage), - runtime->stat.total.pkts_ingress, runtime->stat.total.pkts_egress); - for (int i = 0; i < PACKET_QUEUE_MAX; i++) - { - PACKET_MANAGER_LOG_DEBUG("runtime[%p] (%11s) queue stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu", - runtime, - packet_stage_to_str(i), - runtime->stat.queue[i].pkts_in, - runtime->stat.queue[i].pkts_out, - runtime->stat.queue[i].pkts_claim, - runtime->stat.queue[i].pkts_schedule); - } -} - -struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime) -{ - return &runtime->stat; -} - -void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt) -{ - pkt_mgr_rt->mq = mq_rt; -} - -void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt) -{ - pkt_mgr_rt->stat.total.pkts_ingress++; - pkt_mgr_rt->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe); -} - -struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt) -{ - struct packet *pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[PACKET_STAGE_MAX]); - if (pkt) - { - pkt_mgr_rt->stat.total.pkts_egress++; - pkt_mgr_rt->stat.queue[PACKET_STAGE_MAX].pkts_out++; - TAILQ_REMOVE(&pkt_mgr_rt->queue[PACKET_STAGE_MAX], pkt, stage_tqe); - } - return pkt; -} - -void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt) -{ - for (int i = 0; i < PACKET_STAGE_MAX; i++) - { - pkt_mgr_rt->stage = i; - packet_manager_runtime_print_stat(pkt_mgr_rt); - - struct packet *pkt = NULL; - while ((pkt = TAILQ_FIRST(&pkt_mgr_rt->queue[pkt_mgr_rt->stage]))) - { - packet_set_claim(pkt, false); - pkt_mgr_rt->claimed_cb = NULL; - pkt_mgr_rt->cb_args = NULL; - - TAILQ_REMOVE(&pkt_mgr_rt->queue[pkt_mgr_rt->stage], pkt, stage_tqe); - pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_out++; - - mq_runtime_publish_message(pkt_mgr_rt->mq, pkt_mgr_rt->stage, pkt); - mq_runtime_dispatch(pkt_mgr_rt->mq); - - if (packet_is_claim(pkt)) - { - if (pkt_mgr_rt->claimed_cb) - { - pkt_mgr_rt->claimed_cb(pkt, pkt_mgr_rt->cb_args); - } - packet_set_claim(pkt, false); - continue; - } - - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[pkt_mgr_rt->stage + 1], pkt, stage_tqe); - pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage + 1].pkts_in++; - } - } - pkt_mgr_rt->stage = -1; -} - -int packet_manager_runtime_claim_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, on_packet_claimed_callback cb, void *args) -{ - if (packet_is_claim(pkt)) - { - PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again"); - return -1; - } - else - { - pkt_mgr_rt->claimed_cb = cb; - pkt_mgr_rt->cb_args = args; - packet_set_claim(pkt, true); - pkt_mgr_rt->stat.queue[pkt_mgr_rt->stage].pkts_claim++; - return 0; - } -} - -void packet_manager_runtime_schedule_packet(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt, enum packet_stage stage) -{ - if (stage >= PACKET_STAGE_MAX) - { - PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage); - assert(0); - return; - } - - pkt_mgr_rt->stat.queue[stage].pkts_schedule++; - pkt_mgr_rt->stat.queue[stage].pkts_in++; - TAILQ_INSERT_TAIL(&pkt_mgr_rt->queue[stage], pkt, stage_tqe); -} - /****************************************************************************** * packet manager ******************************************************************************/ @@ -393,26 +276,137 @@ void packet_manager_free(struct packet_manager *pkt_mgr) } } -struct packet_manager_schema *packet_manager_get_schema(struct packet_manager *pkt_mgr) +int packet_manager_subscribe(struct packet_manager *pkt_mgr, enum packet_stage stage, on_packet_stage_callback cb, void *args) { - if (pkt_mgr) + return mq_schema_subscribe(pkt_mgr->schema->mq, pkt_mgr->schema->topic_id[stage], (on_msg_cb_func *)cb, args); +} + +void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + runtime->mq = mq_rt; +} + +void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + runtime->stat.total.pkts_ingress++; + runtime->stat.queue[PACKET_STAGE_PREROUTING].pkts_in++; + TAILQ_INSERT_TAIL(&runtime->queue[PACKET_STAGE_PREROUTING], pkt, stage_tqe); +} + +struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + struct packet *pkt = TAILQ_FIRST(&runtime->queue[PACKET_STAGE_MAX]); + if (pkt) { - return pkt_mgr->schema; + runtime->stat.total.pkts_egress++; + runtime->stat.queue[PACKET_STAGE_MAX].pkts_out++; + TAILQ_REMOVE(&runtime->queue[PACKET_STAGE_MAX], pkt, stage_tqe); } - else + return pkt; +} + +void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + for (int i = 0; i < PACKET_STAGE_MAX; i++) { - return NULL; + runtime->stage = i; + packet_manager_print_stat(pkt_mgr, thread_id); + + struct packet *pkt = NULL; + while ((pkt = TAILQ_FIRST(&runtime->queue[runtime->stage]))) + { + packet_set_claim(pkt, false); + runtime->claimed_cb = NULL; + runtime->cb_args = NULL; + + TAILQ_REMOVE(&runtime->queue[runtime->stage], pkt, stage_tqe); + runtime->stat.queue[runtime->stage].pkts_out++; + + mq_runtime_publish_message(runtime->mq, runtime->stage, pkt); + mq_runtime_dispatch(runtime->mq); + + if (packet_is_claim(pkt)) + { + if (runtime->claimed_cb) + { + runtime->claimed_cb(pkt, runtime->cb_args); + } + packet_set_claim(pkt, false); + continue; + } + + TAILQ_INSERT_TAIL(&runtime->queue[runtime->stage + 1], pkt, stage_tqe); + runtime->stat.queue[runtime->stage + 1].pkts_in++; + } } + runtime->stage = -1; } -struct packet_manager_runtime *packet_manager_get_runtime(struct packet_manager *pkt_mgr, uint16_t thr_idx) +int packet_manager_claim_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, on_packet_claimed_callback cb, void *args) { - if (pkt_mgr && thr_idx < pkt_mgr->cfg->nr_worker_thread) + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + if (packet_is_claim(pkt)) { - return pkt_mgr->runtime[thr_idx]; + PACKET_MANAGER_LOG_ERROR("packet is already claimed, cannot claim again"); + return -1; } else { - return NULL; + runtime->claimed_cb = cb; + runtime->cb_args = args; + packet_set_claim(pkt, true); + runtime->stat.queue[runtime->stage].pkts_claim++; + return 0; + } +} + +void packet_manager_schedule_packet(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt, enum packet_stage stage) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + if (stage >= PACKET_STAGE_MAX) + { + PACKET_MANAGER_LOG_ERROR("invalid stage %d", stage); + assert(0); + return; + } + + runtime->stat.queue[stage].pkts_schedule++; + runtime->stat.queue[stage].pkts_in++; + TAILQ_INSERT_TAIL(&runtime->queue[stage], pkt, stage_tqe); +} + +struct packet_manager_stat *packet_manager_get_stat(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + return &runtime->stat; +} + +void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; + + PACKET_MANAGER_LOG_DEBUG("runtime[%p] current stage: %s, pkts_ingress: %lu, pkts_egress: %lu", + runtime, packet_stage_to_str(runtime->stage), + runtime->stat.total.pkts_ingress, runtime->stat.total.pkts_egress); + for (int i = 0; i < PACKET_QUEUE_MAX; i++) + { + PACKET_MANAGER_LOG_DEBUG("runtime[%p] (%11s) queue stat => pkts_in: %lu, pkts_out: %lu, pkts_claim: %lu, pkts_schedule: %lu", + runtime, + packet_stage_to_str(i), + runtime->stat.queue[i].pkts_in, + runtime->stat.queue[i].pkts_out, + runtime->stat.queue[i].pkts_claim, + runtime->stat.queue[i].pkts_schedule); } } diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h index 8a51f3c..f8ac98e 100644 --- a/infra/packet_manager/packet_manager_internal.h +++ b/infra/packet_manager/packet_manager_internal.h @@ -9,19 +9,7 @@ extern "C" #define PACKET_QUEUE_MAX (PACKET_STAGE_MAX + 1) -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file); -void packet_manager_free(struct packet_manager *pkt_mgr); - -void packet_manager_runtime_init(struct packet_manager_runtime *pkt_mgr_rt, struct mq_runtime *mq_rt); -void packet_manager_runtime_ingress(struct packet_manager_runtime *pkt_mgr_rt, struct packet *pkt); -struct packet *packet_manager_runtime_egress(struct packet_manager_runtime *pkt_mgr_rt); -void packet_manager_runtime_dispatch(struct packet_manager_runtime *pkt_mgr_rt); - -/****************************************************************************** - * for gtest - ******************************************************************************/ - -struct packet_manager_runtime_stat +struct packet_manager_stat { struct { @@ -37,9 +25,17 @@ struct packet_manager_runtime_stat } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets }; +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, const char *toml_file); +void packet_manager_free(struct packet_manager *pkt_mgr); + +void packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt); +void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt); +struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id); +void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id); +struct packet_manager_stat *packet_manager_get_stat(struct packet_manager *pkt_mgr, uint16_t thread_id); +void packet_manager_print_stat(struct packet_manager *pkt_mgr, uint16_t thread_id); + const char *packet_stage_to_str(enum packet_stage stage); -void packet_manager_runtime_print_stat(struct packet_manager_runtime *runtime); -struct packet_manager_runtime_stat *packet_manager_runtime_get_stat(struct packet_manager_runtime *runtime); #ifdef __cplusplus } diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index 697ce7f..a3674c6 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -67,9 +67,10 @@ unsigned char data[] = { 0x81, 0x80, 0x5c, 0x76, 0x00, 0x00, 0x00, 0x00, 0x80, 0x02, 0x20, 0x00, 0xf7, 0x57, 0x00, 0x00, 0x02, 0x04, 0x04, 0xc4, 0x01, 0x03, 0x03, 0x08, 0x01, 0x01, 0x04, 0x02}; -static struct packet_manager_runtime_stat init_stat = {}; +static uint16_t thread_id = 0; +static struct packet_manager_stat init_stat = {}; -static void check_stat(struct packet_manager_runtime_stat *curr_stat, struct packet_manager_runtime_stat *expect_stat) +static void check_stat(struct packet_manager_stat *curr_stat, struct packet_manager_stat *expect_stat) { EXPECT_TRUE(curr_stat->total.pkts_ingress == expect_stat->total.pkts_ingress); EXPECT_TRUE(curr_stat->total.pkts_egress == expect_stat->total.pkts_egress); @@ -92,10 +93,6 @@ TEST(PACKET_MANAGER, NEW_FREE) struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); EXPECT_TRUE(pkt_mgr); - EXPECT_TRUE(packet_manager_get_schema(pkt_mgr)); - EXPECT_TRUE(packet_manager_get_runtime(pkt_mgr, 0)); - EXPECT_TRUE(packet_manager_get_runtime(pkt_mgr, 1) == NULL); - packet_manager_free(pkt_mgr); mq_schema_free(mq_schema); @@ -125,18 +122,14 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) // module init struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); EXPECT_TRUE(pkt_mgr); - struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); - EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage, NULL) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage, NULL) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage, NULL) == 0); // per-thread init - struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(runtime); - packet_manager_runtime_init(runtime, mq_rt); + packet_manager_init(pkt_mgr, thread_id, mq_rt); // per-thread run struct packet pkt; @@ -144,13 +137,13 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + struct packet_manager_stat *curr_stat = packet_manager_get_stat(pkt_mgr, thread_id); check_stat(curr_stat, &init_stat); - packet_manager_runtime_ingress(runtime, &pkt); - packet_manager_runtime_dispatch(runtime); - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == &pkt); - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == NULL); - struct packet_manager_runtime_stat expect_stat = { + packet_manager_ingress(pkt_mgr, thread_id, &pkt); + packet_manager_dispatch(pkt_mgr, thread_id); + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == &pkt); + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == NULL); + struct packet_manager_stat expect_stat = { .total = {.pkts_ingress = 1, .pkts_egress = 1}, .queue = { [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, @@ -188,8 +181,6 @@ static void packet_claimed(struct packet *pkt, void *args) static void on_packet_stage_claim_packet_success(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; - struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(pkt_mgr_rt); printf("on_packet_stage_claim_packet_success: %s\n", packet_stage_to_str(stage)); @@ -198,15 +189,13 @@ static void on_packet_stage_claim_packet_success(enum packet_stage stage, struct EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); EXPECT_TRUE(packet_is_ctrl(pkt)); EXPECT_TRUE(!packet_is_claim(pkt)); // packet not claim - EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, packet_claimed, strdup("hello")) == 0); // claim packet success + EXPECT_TRUE(packet_manager_claim_packet(pkt_mgr, thread_id, pkt, packet_claimed, strdup("hello")) == 0); // claim packet success count++; } static void on_packet_stage_claim_packet_failed(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; - struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(pkt_mgr_rt); printf("on_packet_stage_claim_packet_failed: %s\n", packet_stage_to_str(stage)); @@ -215,7 +204,7 @@ static void on_packet_stage_claim_packet_failed(enum packet_stage stage, struct EXPECT_TRUE(stage == PACKET_STAGE_PREROUTING); EXPECT_TRUE(packet_is_ctrl(pkt)); EXPECT_TRUE(packet_is_claim(pkt)); // packet already claim - EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, NULL, NULL) == -1); // claim packet failed + EXPECT_TRUE(packet_manager_claim_packet(pkt_mgr, thread_id, pkt, NULL, NULL) == -1); // claim packet failed count++; } @@ -230,24 +219,20 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) // module init struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); EXPECT_TRUE(pkt_mgr); - struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); - EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); - - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_success, pkt_mgr) == 0); + + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_failed, pkt_mgr) == 0); // per-thread init - struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(runtime); - packet_manager_runtime_init(runtime, mq_rt); + packet_manager_init(pkt_mgr, thread_id, mq_rt); // per-thread run struct packet pkt; @@ -255,12 +240,12 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + struct packet_manager_stat *curr_stat = packet_manager_get_stat(pkt_mgr, thread_id); check_stat(curr_stat, &init_stat); - packet_manager_runtime_ingress(runtime, &pkt); - packet_manager_runtime_dispatch(runtime); - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == NULL); - struct packet_manager_runtime_stat expect_stat = { + packet_manager_ingress(pkt_mgr, thread_id, &pkt); + packet_manager_dispatch(pkt_mgr, thread_id); + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == NULL); + struct packet_manager_stat expect_stat = { .total = {.pkts_ingress = 1, .pkts_egress = 0}, .queue = { [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 1, .pkts_schedule = 0}, @@ -288,8 +273,6 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) static void on_packet_stage_schedule_packet(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; - struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(pkt_mgr_rt); printf("on_packet_stage_schedule_packet: \"%s\" schedule packet %p\n", packet_stage_to_str(stage), pkt); @@ -297,10 +280,10 @@ static void on_packet_stage_schedule_packet(enum packet_stage stage, struct pack if (stage == PACKET_STAGE_PREROUTING) { - packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_INPUT); - packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_FORWARD); - packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_OUTPUT); - packet_manager_runtime_schedule_packet(pkt_mgr_rt, packet_new(10), PACKET_STAGE_POSTROUTING); + packet_manager_schedule_packet(pkt_mgr, thread_id, packet_new(10), PACKET_STAGE_INPUT); + packet_manager_schedule_packet(pkt_mgr, thread_id, packet_new(10), PACKET_STAGE_FORWARD); + packet_manager_schedule_packet(pkt_mgr, thread_id, packet_new(10), PACKET_STAGE_OUTPUT); + packet_manager_schedule_packet(pkt_mgr, thread_id, packet_new(10), PACKET_STAGE_POSTROUTING); } } @@ -315,18 +298,14 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) // module init struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); EXPECT_TRUE(pkt_mgr); - struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); - EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_schedule_packet, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_schedule_packet, pkt_mgr) == 0); // per-thread init - struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(runtime); - packet_manager_runtime_init(runtime, mq_rt); + packet_manager_init(pkt_mgr, thread_id, mq_rt); // per-thread run struct packet pkt; @@ -334,21 +313,21 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + struct packet_manager_stat *curr_stat = packet_manager_get_stat(pkt_mgr, thread_id); check_stat(curr_stat, &init_stat); - packet_manager_runtime_ingress(runtime, &pkt); - packet_manager_runtime_dispatch(runtime); + packet_manager_ingress(pkt_mgr, thread_id, &pkt); + packet_manager_dispatch(pkt_mgr, thread_id); struct packet *tmp = NULL; for (int i = 0; i < 4; i++) { - tmp = packet_manager_runtime_egress(runtime); + tmp = packet_manager_egress(pkt_mgr, thread_id); EXPECT_TRUE(tmp); packet_free(tmp); } - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == &pkt); - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == NULL); - struct packet_manager_runtime_stat expect_stat = { + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == &pkt); + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == NULL); + struct packet_manager_stat expect_stat = { .total = {.pkts_ingress = 1, .pkts_egress = 5}, .queue = { [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 0, .pkts_schedule = 0}, @@ -375,19 +354,18 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) #if 1 static void schedule_claimed_packet(struct packet *pkt, void *args) { - struct packet_manager_runtime *pkt_mgr_rt = (struct packet_manager_runtime *)args; + struct packet_manager *pkt_mgr = (struct packet_manager *)args; + printf("schedule_claimed_packet: %p\n", pkt); EXPECT_TRUE(packet_is_ctrl(pkt)); EXPECT_TRUE(packet_is_claim(pkt)); - packet_manager_runtime_schedule_packet(pkt_mgr_rt, pkt, PACKET_STAGE_POSTROUTING); + packet_manager_schedule_packet(pkt_mgr, thread_id, pkt, PACKET_STAGE_POSTROUTING); } static void on_packet_stage_claim_packet_to_schedule(enum packet_stage stage, struct packet *pkt, void *args) { struct packet_manager *pkt_mgr = (struct packet_manager *)args; - struct packet_manager_runtime *pkt_mgr_rt = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(pkt_mgr_rt); printf("on_packet_stage_claim_packet_to_schedule: %s\n", packet_stage_to_str(stage)); @@ -396,8 +374,8 @@ static void on_packet_stage_claim_packet_to_schedule(enum packet_stage stage, st EXPECT_TRUE(!packet_is_claim(pkt)); if (stage == PACKET_STAGE_PREROUTING) { - EXPECT_TRUE(count == 0); // packet not claim - EXPECT_TRUE(packet_manager_runtime_claim_packet(pkt_mgr_rt, pkt, schedule_claimed_packet, pkt_mgr_rt) == 0); // claim packet success + EXPECT_TRUE(count == 0); // packet not claim + EXPECT_TRUE(packet_manager_claim_packet(pkt_mgr, thread_id, pkt, schedule_claimed_packet, pkt_mgr) == 0); // claim packet success } else if (stage == PACKET_STAGE_POSTROUTING) { @@ -422,18 +400,14 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) // module init struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, "./conf/stellar.toml"); EXPECT_TRUE(pkt_mgr); - struct packet_manager_schema *schema = packet_manager_get_schema(pkt_mgr); - EXPECT_TRUE(schema); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); - EXPECT_TRUE(packet_manager_schema_add_subscriber(schema, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_PREROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_INPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_FORWARD, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_OUTPUT, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); + EXPECT_TRUE(packet_manager_subscribe(pkt_mgr, PACKET_STAGE_POSTROUTING, on_packet_stage_claim_packet_to_schedule, pkt_mgr) == 0); // per-thread init - struct packet_manager_runtime *runtime = packet_manager_get_runtime(pkt_mgr, 0); - EXPECT_TRUE(runtime); - packet_manager_runtime_init(runtime, mq_rt); + packet_manager_init(pkt_mgr, thread_id, mq_rt); // per-thread run struct packet pkt; @@ -441,12 +415,12 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) packet_parse(&pkt, (const char *)data, sizeof(data)); packet_set_ctrl(&pkt, true); - struct packet_manager_runtime_stat *curr_stat = packet_manager_runtime_get_stat(runtime); + struct packet_manager_stat *curr_stat = packet_manager_get_stat(pkt_mgr, thread_id); check_stat(curr_stat, &init_stat); - packet_manager_runtime_ingress(runtime, &pkt); - packet_manager_runtime_dispatch(runtime); - EXPECT_TRUE(packet_manager_runtime_egress(runtime) == &pkt); - struct packet_manager_runtime_stat expect_stat = { + packet_manager_ingress(pkt_mgr, thread_id, &pkt); + packet_manager_dispatch(pkt_mgr, thread_id); + EXPECT_TRUE(packet_manager_egress(pkt_mgr, thread_id) == &pkt); + struct packet_manager_stat expect_stat = { .total = {.pkts_ingress = 1, .pkts_egress = 1}, .queue = { [PACKET_STAGE_PREROUTING] = {.pkts_in = 1, .pkts_out = 1, .pkts_claim = 1, .pkts_schedule = 0}, |
