diff options
| author | luwenpeng <[email protected]> | 2024-10-23 10:10:20 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-10-23 10:10:15 +0800 |
| commit | 3f3059b40ff84dfe5b086948e58c49d0e35c11ba (patch) | |
| tree | 3a9f199c46caf37aef819ab495314b987c4931aa /infra/packet_manager | |
| parent | fd3cc20554cba6fe7ee7c671730079f81a2fbc5d (diff) | |
refactor: packet manager and session manager add on_thread_init/on_thread_exit entry
Diffstat (limited to 'infra/packet_manager')
| -rw-r--r-- | infra/packet_manager/packet_manager.c | 85 | ||||
| -rw-r--r-- | infra/packet_manager/packet_manager_internal.h | 3 | ||||
| -rw-r--r-- | infra/packet_manager/test/gtest_packet_manager.cpp | 4 |
3 files changed, 61 insertions, 31 deletions
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c index 1f8c678..e139955 100644 --- a/infra/packet_manager/packet_manager.c +++ b/infra/packet_manager/packet_manager.c @@ -31,7 +31,7 @@ struct packet_manager_schema struct packet_manager { - uint64_t thread_num; + uint16_t thread_num; struct packet_manager_schema *schema; struct packet_manager_runtime *runtime[MAX_THREAD_NUM]; }; @@ -73,7 +73,7 @@ void packet_manager_runtime_free(struct packet_manager_runtime *pkt_mgr_rt) pkt_mgr_rt = NULL; } -struct packet_manager_runtime *packet_manager_runtime_new() +struct packet_manager_runtime *packet_manager_runtime_new(struct mq_runtime *mq_rt) { struct packet_manager_runtime *pkt_mgr_rt = calloc(1, sizeof(struct packet_manager_runtime)); if (pkt_mgr_rt == NULL) @@ -86,6 +86,7 @@ struct packet_manager_runtime *packet_manager_runtime_new() { TAILQ_INIT(&pkt_mgr_rt->queue[i]); } + pkt_mgr_rt->mq = mq_rt; return pkt_mgr_rt; } @@ -178,7 +179,7 @@ error_out: * packet manager ******************************************************************************/ -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num) +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num) { struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager)); if (pkt_mgr == NULL) @@ -195,16 +196,6 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t goto error_out; } - for (uint16_t i = 0; i < pkt_mgr->thread_num; i++) - { - pkt_mgr->runtime[i] = packet_manager_runtime_new(); - if (pkt_mgr->runtime[i] == NULL) - { - PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); - goto error_out; - } - } - return pkt_mgr; error_out: @@ -214,21 +205,8 @@ error_out: void packet_manager_free(struct packet_manager *pkt_mgr) { - struct packet_manager_runtime *pkt_mgr_rt = NULL; - if (pkt_mgr) { - - for (uint16_t i = 0; i < pkt_mgr->thread_num; i++) - { - pkt_mgr_rt = pkt_mgr->runtime[i]; - if (pkt_mgr_rt) - { - packet_manager_print_stat(pkt_mgr, i); - packet_manager_runtime_free(pkt_mgr_rt); - } - } - packet_schema_free(pkt_mgr->schema); free(pkt_mgr); @@ -253,11 +231,30 @@ int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, stru assert(pkt_mgr); assert(thread_id < pkt_mgr->thread_num); assert(mq_rt); - struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id]; - runtime->mq = mq_rt; + struct packet_manager_runtime *pkt_mgr_rt = packet_manager_runtime_new(mq_rt); + if (pkt_mgr_rt == NULL) + { + PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime"); + return -1; + } + else + { + pkt_mgr->runtime[thread_id] = pkt_mgr_rt; + return 0; + } +} - return 0; +void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id) +{ + assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + + struct packet_manager_runtime *pkt_mgr_rt = pkt_mgr->runtime[thread_id]; + PACKET_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", pkt_mgr_rt, thread_id); + packet_manager_print_stat(pkt_mgr, thread_id); + packet_manager_runtime_free(pkt_mgr_rt); + pkt_mgr_rt = NULL; } void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt) @@ -412,7 +409,7 @@ struct stellar_module *packet_manager_on_init(struct stellar_module_manager *mod assert(mod_mgr); struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); assert(mq_schema); - uint64_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); + uint16_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr); struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, thread_num); if (pkt_mgr == NULL) @@ -443,4 +440,32 @@ void packet_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__ stellar_module_free(mod); PACKET_MANAGER_LOG_FATAL("packet_manager exited"); } +} + +struct stellar_module *packet_manager_on_thread_init(struct stellar_module_manager *mod_mgr, int thread_id, struct stellar_module *mod) +{ + struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod); + assert(pkt_mgr); + struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr); + assert(mq_rt); + assert(thread_id < pkt_mgr->thread_num); + + if (packet_manager_init(pkt_mgr, thread_id, mq_rt) != 0) + { + PACKET_MANAGER_LOG_ERROR("failed to init packet_manager_init"); + return NULL; + } + else + { + return mod; + } +} + +void packet_manager_on_thread_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), int thread_id, struct stellar_module *mod) +{ + struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod); + assert(pkt_mgr); + assert(thread_id < pkt_mgr->thread_num); + + packet_manager_clean(pkt_mgr, thread_id); }
\ No newline at end of file diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h index d001770..dc7a979 100644 --- a/infra/packet_manager/packet_manager_internal.h +++ b/infra/packet_manager/packet_manager_internal.h @@ -26,10 +26,11 @@ struct packet_manager_stat } queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets }; -struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num); +struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num); void packet_manager_free(struct packet_manager *pkt_mgr); int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt); +void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt); struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id); void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id); diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp index d186e45..e447c32 100644 --- a/infra/packet_manager/test/gtest_packet_manager.cpp +++ b/infra/packet_manager/test/gtest_packet_manager.cpp @@ -156,6 +156,7 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -258,6 +259,7 @@ TEST(PACKET_MANAGER, CLAIM_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -340,6 +342,7 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); @@ -433,6 +436,7 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET) check_stat(curr_stat, &expect_stat); // per-thread free + packet_manager_clean(pkt_mgr, thread_id); // module free packet_manager_free(pkt_mgr); |
