diff options
| author | yangwei <[email protected]> | 2024-10-18 11:16:42 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-10-18 11:16:42 +0800 |
| commit | 6e7bb30630bd533873167b337a17365b3840420d (patch) | |
| tree | a918f3ee122b6b1b247b0bd205199f539f2aadfd | |
| parent | 260787167a81a8f7ac311ed394ceb5d990451136 (diff) | |
🦄 refactor(polling manager): merge polling into module manager
| -rw-r--r-- | conf/stellar.toml | 5 | ||||
| -rw-r--r-- | include/stellar/module_manager.h | 4 | ||||
| -rw-r--r-- | include/stellar/polling_manager.h | 21 | ||||
| -rw-r--r-- | infra/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | infra/module_manager/module_manager.c | 47 | ||||
| -rw-r--r-- | infra/module_manager/module_manager_interna.h | 2 | ||||
| -rw-r--r-- | infra/module_manager/test/gtest_module_manager_main.cpp | 57 | ||||
| -rw-r--r-- | infra/polling_manager/CMakeLists.txt | 6 | ||||
| -rw-r--r-- | infra/polling_manager/polling_manager.c | 73 | ||||
| -rw-r--r-- | infra/polling_manager/polling_manager_internal.h | 23 | ||||
| -rw-r--r-- | infra/polling_manager/test/CMakeLists.txt | 18 | ||||
| -rw-r--r-- | infra/polling_manager/test/gtest_polling_manager_main.cpp | 90 | ||||
| -rw-r--r-- | infra/session_manager/session_manager.c | 12 | ||||
| -rw-r--r-- | infra/stellar_core.c | 8 |
14 files changed, 117 insertions, 251 deletions
diff --git a/conf/stellar.toml b/conf/stellar.toml index beb2121..ced18cc 100644 --- a/conf/stellar.toml +++ b/conf/stellar.toml @@ -16,11 +16,6 @@ init="packet_manager_on_init" exit="packet_manager_on_exit" [[module]] -path = "" -init = "polling_manager_on_init" -exit = "polling_manager_on_exit" - -[[module]] path="" init="session_manager_on_init" exit="session_manager_on_exit" diff --git a/include/stellar/module_manager.h b/include/stellar/module_manager.h index 70ee5a5..47e8384 100644 --- a/include/stellar/module_manager.h +++ b/include/stellar/module_manager.h @@ -39,7 +39,9 @@ const char *stellar_module_manager_get_toml_path(struct stellar_module_manager * struct mq_schema *stellar_module_manager_get_mq_schema(struct stellar_module_manager *mod_mgr); struct logger *stellar_module_manager_get_logger(struct stellar_module_manager *mod_mgr); - +typedef void module_on_polling_func(struct stellar_module_manager *mod_mgr, void *polling_arg); +int stellar_module_manager_polling_subscribe(struct stellar_module_manager *mod_mgr, module_on_polling_func on_polling, void *polling_arg); +void stellar_module_manager_polling_active(struct stellar_module_manager *mod_mgr); #ifdef __cplusplus } diff --git a/include/stellar/polling_manager.h b/include/stellar/polling_manager.h deleted file mode 100644 index 641c03e..0000000 --- a/include/stellar/polling_manager.h +++ /dev/null @@ -1,21 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar/module_manager.h" - -struct stellar_polling_manager; - -struct stellar_polling_manager *stellar_module_get_polling_manager(struct stellar_module_manager *mod_mgr); - -typedef void module_on_polling_func(struct stellar_polling_manager* mod_mgr, void *polling_arg); -//return 0 if success, otherwise return -1. -int stellar_polling_subscribe(struct stellar_polling_manager* mod_mgr, module_on_polling_func on_polling, void *polling_arg); -void stellar_polling_active(struct stellar_polling_manager *mod_mgr); - -#ifdef __cplusplus -} -#endif
\ No newline at end of file diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt index 8dfa468..d2bab28 100644 --- a/infra/CMakeLists.txt +++ b/infra/CMakeLists.txt @@ -1,4 +1,4 @@ -set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager polling_manager) +set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager) set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml) set(DECODERS appid lpi_plus) set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS}) diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c index 517fdea..f05256e 100644 --- a/infra/module_manager/module_manager.c +++ b/infra/module_manager/module_manager.c @@ -226,3 +226,50 @@ void stellar_module_set_name(struct stellar_module* mod, const char *name) return; } +/******************************************* + * polling API * + *******************************************/ + + #define TOPIC_POLLING "polling" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wcast-function-type" +static void on_polling_dispatch(int topic_id __unused, + void *msg __unused, + on_msg_cb_func* on_msg_cb, + void *on_msg_cb_arg, + void *dispatch_arg) +{ + struct stellar_module_manager *mod_mgr=(struct stellar_module_manager *)dispatch_arg; + module_on_polling_func *polling = (module_on_polling_func *)on_msg_cb; + polling(mod_mgr, on_msg_cb_arg); +} + +int stellar_module_manager_polling_subscribe(struct stellar_module_manager *mod_mgr, module_on_polling_func on_polling, void *polling_arg) +{ + if(mod_mgr == NULL)return -1; + mod_mgr->topic_polling_id=mq_schema_get_topic_id(stellar_module_manager_get_mq_schema(mod_mgr), TOPIC_POLLING); + if(mod_mgr->topic_polling_id<0) + { + mod_mgr->topic_polling_id=mq_schema_create_topic(mod_mgr->schema.mq_schema, TOPIC_POLLING, on_polling_dispatch, mod_mgr, NULL, NULL); + } + return mq_schema_subscribe(mod_mgr->schema.mq_schema, mod_mgr->topic_polling_id, (on_msg_cb_func *)on_polling, polling_arg); +} + +#pragma GCC diagnostic pop + +void stellar_module_manager_polling_active(struct stellar_module_manager *mod_mgr) +{ + if(mod_mgr == NULL)return; + mq_runtime_publish_message(local_mq_rt, mod_mgr->topic_polling_id, NULL); +} + + +void stellar_polling_dispatch(struct stellar_module_manager *mod_mgr) +{ + if(mod_mgr==NULL)return; + stellar_module_manager_polling_active(mod_mgr); + mq_runtime_dispatch(local_mq_rt); + mq_runtime_clean(local_mq_rt); + return; +}
\ No newline at end of file diff --git a/infra/module_manager/module_manager_interna.h b/infra/module_manager/module_manager_interna.h index 96b46a7..b58a374 100644 --- a/infra/module_manager/module_manager_interna.h +++ b/infra/module_manager/module_manager_interna.h @@ -36,6 +36,7 @@ struct stellar_module_manager char *module_spec_toml_path; struct module_spec_load *module_specs; int load_module_num; + int topic_polling_id; struct { int max_thread_num; @@ -45,6 +46,7 @@ struct stellar_module_manager }__attribute__((aligned(sizeof(void*)))); +void stellar_polling_dispatch(struct stellar_module_manager *mod_mgr); #ifdef __cplusplus } diff --git a/infra/module_manager/test/gtest_module_manager_main.cpp b/infra/module_manager/test/gtest_module_manager_main.cpp index 20d5b6d..5a30553 100644 --- a/infra/module_manager/test/gtest_module_manager_main.cpp +++ b/infra/module_manager/test/gtest_module_manager_main.cpp @@ -182,6 +182,63 @@ TEST(module_manager, basic_module) { unlink(toml_template); } +/*********************************** + * TEST POLLING MANAGER POLLING API * + ***********************************/ + +struct test_module_polling_env +{ + int N_round; + int polling_count; + int polling_active_count; +}; + + void test_module_on_polling(struct stellar_module_manager* mod_mgr, void *polling_arg) + { + struct test_module_polling_env *env = (struct test_module_polling_env*)polling_arg; + env->polling_count++; + if(env->polling_count%2==0) + { + stellar_module_manager_polling_active(mod_mgr); + env->polling_active_count++; + } + } + + +TEST(polling_manager, basic_polling_module) { + + struct mq_schema *mq_schema=mq_schema_new(); + + struct stellar_module_manager *mod_mgr=stellar_module_manager_new(NULL, 10, mq_schema, NULL); + EXPECT_TRUE(mod_mgr!=NULL); + + + EXPECT_EQ(stellar_module_manager_get_max_thread_num(mod_mgr), 10); + EXPECT_EQ(stellar_module_manager_get_mq_schema(mod_mgr), mq_schema); + + struct test_module_polling_env env={}; + env.N_round=10; + + stellar_module_manager_polling_subscribe(mod_mgr, test_module_on_polling, &env); + + struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); + stellar_module_manager_register_thread(mod_mgr, 1, mq_rt); + + EXPECT_EQ((long)stellar_module_manager_get_thread_id(mod_mgr), 1); + EXPECT_EQ(stellar_module_manager_get_mq_runtime(mod_mgr), mq_rt); + + for(int i=0; i<env.N_round; i++) + { + stellar_polling_dispatch(mod_mgr); + } + + stellar_module_manager_free(mod_mgr); + + EXPECT_EQ(env.polling_count, env.N_round+env.polling_active_count); + + +} + /********************************************** * GTEST MAIN * **********************************************/ diff --git a/infra/polling_manager/CMakeLists.txt b/infra/polling_manager/CMakeLists.txt deleted file mode 100644 index a85368b..0000000 --- a/infra/polling_manager/CMakeLists.txt +++ /dev/null @@ -1,6 +0,0 @@ -add_library(polling_manager polling_manager.c) -target_include_directories(polling_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) -target_include_directories(polling_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/) -target_link_libraries(polling_manager PUBLIC module_manager ${CMAKE_DL_LIBS}) - -add_subdirectory(test)
\ No newline at end of file diff --git a/infra/polling_manager/polling_manager.c b/infra/polling_manager/polling_manager.c deleted file mode 100644 index 81a585b..0000000 --- a/infra/polling_manager/polling_manager.c +++ /dev/null @@ -1,73 +0,0 @@ -#include "polling_manager_internal.h" - -#include "stellar/utils.h" - - -struct stellar_polling_manager *stellar_module_get_polling_manager(struct stellar_module_manager *mod_mgr) -{ - if(mod_mgr==NULL)return NULL; - struct stellar_module *mod=stellar_module_manager_get_module(mod_mgr, MODULE_POLLING); - if(mod==NULL)return NULL; - return (struct stellar_polling_manager *)stellar_module_get_ctx(mod); -} - - -struct stellar_module *polling_manager_on_init(struct stellar_module_manager *mod_mgr) -{ - if(mod_mgr==NULL)return NULL; - struct stellar_polling_manager *polling_mgr=CALLOC(struct stellar_polling_manager, 1); - polling_mgr->mod_mgr=mod_mgr; - struct stellar_module *mod=stellar_module_new(MODULE_POLLING, polling_mgr); - return mod; -} - -void polling_manager_on_exit(struct stellar_module_manager *mod_mgr, struct stellar_module *mod) -{ - if(mod_mgr==NULL || mod==NULL)return; - struct stellar_polling_manager *polling_mgr=(struct stellar_polling_manager *)stellar_module_get_ctx(mod); - if(polling_mgr==NULL)return; - FREE(polling_mgr); - stellar_module_free(mod); - return; -} - -#pragma GCC diagnostic push -#pragma GCC diagnostic ignored "-Wcast-function-type" -static void on_polling_dispatch(int topic_id __unused, - void *msg __unused, - on_msg_cb_func* on_msg_cb, - void *on_msg_cb_arg, - void *dispatch_arg) -{ - struct stellar_polling_manager *polling_mgr=(struct stellar_polling_manager *)dispatch_arg; - module_on_polling_func *polling = (module_on_polling_func *)on_msg_cb; - polling(polling_mgr, on_msg_cb_arg); -} - -int stellar_polling_subscribe(struct stellar_polling_manager* polling_mgr, module_on_polling_func on_polling, void *polling_arg) -{ - if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return -1; - polling_mgr->polling_topic_id=mq_schema_get_topic_id(stellar_module_manager_get_mq_schema(polling_mgr->mod_mgr), TOPIC_POLLING); - if(polling_mgr->polling_topic_id<0) - { - polling_mgr->polling_topic_id=mq_schema_create_topic(stellar_module_manager_get_mq_schema(polling_mgr->mod_mgr), TOPIC_POLLING, on_polling_dispatch, polling_mgr, NULL, NULL); - } - return mq_schema_subscribe(stellar_module_manager_get_mq_schema(polling_mgr->mod_mgr), polling_mgr->polling_topic_id, (on_msg_cb_func *)on_polling, polling_arg); -} - -#pragma GCC diagnostic pop - -void stellar_polling_active(struct stellar_polling_manager *polling_mgr) -{ - if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return; - mq_runtime_publish_message(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr), polling_mgr->polling_topic_id, NULL); -} - -void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr) -{ - if(polling_mgr==NULL || polling_mgr->mod_mgr == NULL)return; - stellar_polling_active(polling_mgr); - mq_runtime_dispatch(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr)); - mq_runtime_clean(stellar_module_manager_get_mq_runtime(polling_mgr->mod_mgr)); - return; -}
\ No newline at end of file diff --git a/infra/polling_manager/polling_manager_internal.h b/infra/polling_manager/polling_manager_internal.h deleted file mode 100644 index c0d2f1b..0000000 --- a/infra/polling_manager/polling_manager_internal.h +++ /dev/null @@ -1,23 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar/polling_manager.h" - -#define TOPIC_POLLING "POLLING" -#define MODULE_POLLING "POLLING" -struct stellar_polling_manager -{ - struct stellar_module_manager *mod_mgr; - int polling_topic_id; -}; - -//TODO: expose this function to polling_manager.h -void stellar_polling_dispatch(struct stellar_polling_manager *polling_mgr); - -#ifdef __cplusplus -} -#endif
\ No newline at end of file diff --git a/infra/polling_manager/test/CMakeLists.txt b/infra/polling_manager/test/CMakeLists.txt deleted file mode 100644 index 1a77472..0000000 --- a/infra/polling_manager/test/CMakeLists.txt +++ /dev/null @@ -1,18 +0,0 @@ -add_executable(gtest_polling_manager - gtest_polling_manager_main.cpp -) - - -include_directories(${CMAKE_SOURCE_DIR}/infra/module_manager/) - -target_link_libraries( - gtest_polling_manager - polling_manager - dl - "-rdynamic" - gtest - gmock -) - -include(GoogleTest) -gtest_discover_tests(gtest_polling_manager)
\ No newline at end of file diff --git a/infra/polling_manager/test/gtest_polling_manager_main.cpp b/infra/polling_manager/test/gtest_polling_manager_main.cpp deleted file mode 100644 index 5738e64..0000000 --- a/infra/polling_manager/test/gtest_polling_manager_main.cpp +++ /dev/null @@ -1,90 +0,0 @@ -#pragma GCC diagnostic ignored "-Wunused-parameter" - -#include <gtest/gtest.h> - - -#include "polling_manager/polling_manager_internal.h" -#include "module_manager/module_manager_interna.h" - -/*********************************** - * TEST POLLING MANAGER POLLING API * - ***********************************/ - -struct test_module_polling_env -{ - int N_round; - int polling_count; - int polling_active_count; -}; - - void test_module_on_polling(struct stellar_polling_manager* mod_mgr, void *polling_arg) - { - struct test_module_polling_env *env = (struct test_module_polling_env*)polling_arg; - env->polling_count++; - if(env->polling_count%2==0) - { - stellar_polling_active(mod_mgr); - env->polling_active_count++; - } - } - -const char *gtest_mock_spec_toml = - "[[module]]\n" - "path = \"\"\n" - "init = \"polling_manager_on_init\"\n" - "exit = \"polling_manager_on_exit\"\n"; - -TEST(polling_manager, basic_polling_module) { - - struct mq_schema *mq_schema=mq_schema_new(); - - char toml_template[] = "./stellar.toml.XXXXXX"; - int fd = mkstemp(toml_template); - EXPECT_TRUE(fd>=0); - write(fd, gtest_mock_spec_toml, strlen(gtest_mock_spec_toml)); - close(fd); - - struct stellar_module_manager *mod_mgr=stellar_module_manager_new(toml_template, 10, mq_schema, NULL); - EXPECT_TRUE(mod_mgr!=NULL); - - struct stellar_polling_manager *polling_mgr=stellar_module_get_polling_manager(mod_mgr); - EXPECT_TRUE(polling_mgr!=NULL); - - EXPECT_EQ(stellar_module_manager_get_max_thread_num(mod_mgr), 10); - EXPECT_EQ(stellar_module_manager_get_mq_schema(mod_mgr), mq_schema); - - struct test_module_polling_env env={}; - env.N_round=10; - - stellar_polling_subscribe(polling_mgr, test_module_on_polling, &env); - - struct mq_runtime *mq_rt = mq_runtime_new(mq_schema); - stellar_module_manager_register_thread(mod_mgr, 1, mq_rt); - - EXPECT_EQ((long)stellar_module_manager_get_thread_id(mod_mgr), 1); - EXPECT_EQ(stellar_module_manager_get_mq_runtime(mod_mgr), mq_rt); - - for(int i=0; i<env.N_round; i++) - { - stellar_polling_dispatch(polling_mgr); - } - - stellar_module_manager_free(mod_mgr); - - EXPECT_EQ(env.polling_count, env.N_round+env.polling_active_count); - - unlink(toml_template); - -} - -/********************************************** - * GTEST MAIN * - **********************************************/ - -int main(int argc, char ** argv) -{ - int ret=0; - ::testing::InitGoogleTest(&argc, argv); - ret=RUN_ALL_TESTS(); - return ret; -}
\ No newline at end of file diff --git a/infra/session_manager/session_manager.c b/infra/session_manager/session_manager.c index 5f3820f..ecc55f6 100644 --- a/infra/session_manager/session_manager.c +++ b/infra/session_manager/session_manager.c @@ -5,7 +5,6 @@ #include "stellar/packet_manager.h" #include "stellar/session_manager.h" #include "stellar/module_manager.h" -#include "stellar/polling_manager.h" #include "utils.h" #include "session_internal.h" @@ -174,10 +173,9 @@ static void clean_session(struct session_manager_runtime *sess_mgr_rt, uint64_t } } -static void on_polling(struct stellar_polling_manager *poll_mgr, void *args) +static void on_polling(struct stellar_module_manager *mod_mgr, void *args) { struct session_manager *sess_mgr = (struct session_manager *)args; - struct stellar_module_manager *mod_mgr = sess_mgr->mod_mgr; int thread_id = stellar_module_manager_get_thread_id(mod_mgr); struct session_manager_runtime *sess_mgr_rt = sess_mgr->runtime[thread_id]; uint64_t now_ms = clock_get_real_time_ms(); @@ -316,7 +314,7 @@ void session_manager_free(struct session_manager *sess_mgr) } } -struct session_manager *session_manager_new(struct stellar_polling_manager *poll_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) +struct session_manager *session_manager_new(struct stellar_module_manager *mod_mgr, struct packet_manager *pkt_mgr, struct mq_schema *mq_schema, const char *toml_file) { assert(pkt_mgr); assert(mq_schema); @@ -368,7 +366,7 @@ struct session_manager *session_manager_new(struct stellar_polling_manager *poll } } - stellar_polling_subscribe(poll_mgr, on_polling, sess_mgr); + stellar_module_manager_polling_subscribe(mod_mgr, on_polling, sess_mgr); return sess_mgr; @@ -437,14 +435,12 @@ struct stellar_module *session_manager_on_init(struct stellar_module_manager *mo assert(mod_mgr); struct packet_manager *pkt_mgr = stellar_module_get_packet_manager(mod_mgr); assert(pkt_mgr); - struct stellar_polling_manager *poll_mgr = stellar_module_get_polling_manager(mod_mgr); - assert(poll_mgr); struct mq_schema *mq_schema = stellar_module_manager_get_mq_schema(mod_mgr); assert(mq_schema); const char *toml_file = stellar_module_manager_get_toml_path(mod_mgr); assert(toml_file); - struct session_manager *sess_mgr = session_manager_new(poll_mgr, pkt_mgr, mq_schema, toml_file); + struct session_manager *sess_mgr = session_manager_new(mod_mgr, pkt_mgr, mq_schema, toml_file); if (sess_mgr == NULL) { return NULL; diff --git a/infra/stellar_core.c b/infra/stellar_core.c index 81c3c2a..bfed599 100644 --- a/infra/stellar_core.c +++ b/infra/stellar_core.c @@ -5,10 +5,9 @@ #include "packet_io.h" #include "packet_internal.h" #include "packet_manager_internal.h" -#include "polling_manager_internal.h" #include "stellar/stellar.h" -#include "stellar/module_manager.h" +#include "module_manager_interna.h" #define CORE_LOG_FATAL(format, ...) STELLAR_LOG_FATAL(__thread_local_logger, "core", format, ##__VA_ARGS__) #define CORE_LOG_ERROR(format, ...) STELLAR_LOG_ERROR(__thread_local_logger, "core", format, ##__VA_ARGS__) @@ -51,7 +50,6 @@ static void *worker_thread(void *arg) struct packet_io *pkt_io = st->pkt_io; struct stellar_module_manager *mod_mgr = st->mod_mgr; struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema); - struct stellar_polling_manager *polling_mgr = stellar_module_get_polling_manager(mod_mgr); struct packet_manager *pkt_mgr = stellar_module_get_packet_manager(mod_mgr); snprintf(thread_name, sizeof(thread_name), "stellar:%d", thread_id); @@ -106,11 +104,11 @@ static void *worker_thread(void *arg) packet_io_egress(pkt_io, thread_id, pkt, 1); packet_free(pkt); } - stellar_polling_dispatch(polling_mgr); + stellar_polling_dispatch(mod_mgr); } idle_tasks: - stellar_polling_dispatch(polling_mgr); + stellar_polling_dispatch(mod_mgr); if (nr_pkt_rcv == 0) { |
