summaryrefslogtreecommitdiff
path: root/infra/packet_manager
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-10-23 10:10:20 +0800
committerluwenpeng <[email protected]>2024-10-23 10:10:15 +0800
commit3f3059b40ff84dfe5b086948e58c49d0e35c11ba (patch)
tree3a9f199c46caf37aef819ab495314b987c4931aa /infra/packet_manager
parentfd3cc20554cba6fe7ee7c671730079f81a2fbc5d (diff)
refactor: packet manager and session manager add on_thread_init/on_thread_exit entry
Diffstat (limited to 'infra/packet_manager')
-rw-r--r--infra/packet_manager/packet_manager.c85
-rw-r--r--infra/packet_manager/packet_manager_internal.h3
-rw-r--r--infra/packet_manager/test/gtest_packet_manager.cpp4
3 files changed, 61 insertions, 31 deletions
diff --git a/infra/packet_manager/packet_manager.c b/infra/packet_manager/packet_manager.c
index 1f8c678..e139955 100644
--- a/infra/packet_manager/packet_manager.c
+++ b/infra/packet_manager/packet_manager.c
@@ -31,7 +31,7 @@ struct packet_manager_schema
struct packet_manager
{
- uint64_t thread_num;
+ uint16_t thread_num;
struct packet_manager_schema *schema;
struct packet_manager_runtime *runtime[MAX_THREAD_NUM];
};
@@ -73,7 +73,7 @@ void packet_manager_runtime_free(struct packet_manager_runtime *pkt_mgr_rt)
pkt_mgr_rt = NULL;
}
-struct packet_manager_runtime *packet_manager_runtime_new()
+struct packet_manager_runtime *packet_manager_runtime_new(struct mq_runtime *mq_rt)
{
struct packet_manager_runtime *pkt_mgr_rt = calloc(1, sizeof(struct packet_manager_runtime));
if (pkt_mgr_rt == NULL)
@@ -86,6 +86,7 @@ struct packet_manager_runtime *packet_manager_runtime_new()
{
TAILQ_INIT(&pkt_mgr_rt->queue[i]);
}
+ pkt_mgr_rt->mq = mq_rt;
return pkt_mgr_rt;
}
@@ -178,7 +179,7 @@ error_out:
* packet manager
******************************************************************************/
-struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num)
+struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num)
{
struct packet_manager *pkt_mgr = calloc(1, sizeof(struct packet_manager));
if (pkt_mgr == NULL)
@@ -195,16 +196,6 @@ struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t
goto error_out;
}
- for (uint16_t i = 0; i < pkt_mgr->thread_num; i++)
- {
- pkt_mgr->runtime[i] = packet_manager_runtime_new();
- if (pkt_mgr->runtime[i] == NULL)
- {
- PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime");
- goto error_out;
- }
- }
-
return pkt_mgr;
error_out:
@@ -214,21 +205,8 @@ error_out:
void packet_manager_free(struct packet_manager *pkt_mgr)
{
- struct packet_manager_runtime *pkt_mgr_rt = NULL;
-
if (pkt_mgr)
{
-
- for (uint16_t i = 0; i < pkt_mgr->thread_num; i++)
- {
- pkt_mgr_rt = pkt_mgr->runtime[i];
- if (pkt_mgr_rt)
- {
- packet_manager_print_stat(pkt_mgr, i);
- packet_manager_runtime_free(pkt_mgr_rt);
- }
- }
-
packet_schema_free(pkt_mgr->schema);
free(pkt_mgr);
@@ -253,11 +231,30 @@ int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, stru
assert(pkt_mgr);
assert(thread_id < pkt_mgr->thread_num);
assert(mq_rt);
- struct packet_manager_runtime *runtime = pkt_mgr->runtime[thread_id];
- runtime->mq = mq_rt;
+ struct packet_manager_runtime *pkt_mgr_rt = packet_manager_runtime_new(mq_rt);
+ if (pkt_mgr_rt == NULL)
+ {
+ PACKET_MANAGER_LOG_ERROR("failed to create packet_manager_runtime");
+ return -1;
+ }
+ else
+ {
+ pkt_mgr->runtime[thread_id] = pkt_mgr_rt;
+ return 0;
+ }
+}
- return 0;
+void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id)
+{
+ assert(pkt_mgr);
+ assert(thread_id < pkt_mgr->thread_num);
+
+ struct packet_manager_runtime *pkt_mgr_rt = pkt_mgr->runtime[thread_id];
+ PACKET_MANAGER_LOG_INFO("runtime: %p, idx: %d, will be cleaned", pkt_mgr_rt, thread_id);
+ packet_manager_print_stat(pkt_mgr, thread_id);
+ packet_manager_runtime_free(pkt_mgr_rt);
+ pkt_mgr_rt = NULL;
}
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt)
@@ -412,7 +409,7 @@ struct stellar_module *packet_manager_on_init(struct stellar_module_manager *mod
assert(mod_mgr);
struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr);
assert(mq_schema);
- uint64_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
+ uint16_t thread_num = stellar_module_manager_get_max_thread_num(mod_mgr);
struct packet_manager *pkt_mgr = packet_manager_new(mq_schema, thread_num);
if (pkt_mgr == NULL)
@@ -443,4 +440,32 @@ void packet_manager_on_exit(struct stellar_module_manager *mod_mgr __attribute__
stellar_module_free(mod);
PACKET_MANAGER_LOG_FATAL("packet_manager exited");
}
+}
+
+struct stellar_module *packet_manager_on_thread_init(struct stellar_module_manager *mod_mgr, int thread_id, struct stellar_module *mod)
+{
+ struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod);
+ assert(pkt_mgr);
+ struct mq_runtime *mq_rt = stellar_module_manager_get_mq_runtime(mod_mgr);
+ assert(mq_rt);
+ assert(thread_id < pkt_mgr->thread_num);
+
+ if (packet_manager_init(pkt_mgr, thread_id, mq_rt) != 0)
+ {
+ PACKET_MANAGER_LOG_ERROR("failed to init packet_manager_init");
+ return NULL;
+ }
+ else
+ {
+ return mod;
+ }
+}
+
+void packet_manager_on_thread_exit(struct stellar_module_manager *mod_mgr __attribute__((unused)), int thread_id, struct stellar_module *mod)
+{
+ struct packet_manager *pkt_mgr = stellar_module_get_ctx(mod);
+ assert(pkt_mgr);
+ assert(thread_id < pkt_mgr->thread_num);
+
+ packet_manager_clean(pkt_mgr, thread_id);
} \ No newline at end of file
diff --git a/infra/packet_manager/packet_manager_internal.h b/infra/packet_manager/packet_manager_internal.h
index d001770..dc7a979 100644
--- a/infra/packet_manager/packet_manager_internal.h
+++ b/infra/packet_manager/packet_manager_internal.h
@@ -26,10 +26,11 @@ struct packet_manager_stat
} queue[PACKET_QUEUE_MAX]; // the last queue is for sending packets
};
-struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint64_t thread_num);
+struct packet_manager *packet_manager_new(struct mq_schema *mq_schema, uint16_t thread_num);
void packet_manager_free(struct packet_manager *pkt_mgr);
int packet_manager_init(struct packet_manager *pkt_mgr, uint16_t thread_id, struct mq_runtime *mq_rt);
+void packet_manager_clean(struct packet_manager *pkt_mgr, uint16_t thread_id);
void packet_manager_ingress(struct packet_manager *pkt_mgr, uint16_t thread_id, struct packet *pkt);
struct packet *packet_manager_egress(struct packet_manager *pkt_mgr, uint16_t thread_id);
void packet_manager_dispatch(struct packet_manager *pkt_mgr, uint16_t thread_id);
diff --git a/infra/packet_manager/test/gtest_packet_manager.cpp b/infra/packet_manager/test/gtest_packet_manager.cpp
index d186e45..e447c32 100644
--- a/infra/packet_manager/test/gtest_packet_manager.cpp
+++ b/infra/packet_manager/test/gtest_packet_manager.cpp
@@ -156,6 +156,7 @@ TEST(PACKET_MANAGER, SUBSCRIBER_PACKET_STAGE)
check_stat(curr_stat, &expect_stat);
// per-thread free
+ packet_manager_clean(pkt_mgr, thread_id);
// module free
packet_manager_free(pkt_mgr);
@@ -258,6 +259,7 @@ TEST(PACKET_MANAGER, CLAIM_PACKET)
check_stat(curr_stat, &expect_stat);
// per-thread free
+ packet_manager_clean(pkt_mgr, thread_id);
// module free
packet_manager_free(pkt_mgr);
@@ -340,6 +342,7 @@ TEST(PACKET_MANAGER, SCHEDULE_PACKET)
check_stat(curr_stat, &expect_stat);
// per-thread free
+ packet_manager_clean(pkt_mgr, thread_id);
// module free
packet_manager_free(pkt_mgr);
@@ -433,6 +436,7 @@ TEST(PACKET_MANAGER, SCHEDULE_CLAIMED_PACKET)
check_stat(curr_stat, &expect_stat);
// per-thread free
+ packet_manager_clean(pkt_mgr, thread_id);
// module free
packet_manager_free(pkt_mgr);