summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-14 12:18:26 +0800
committeryangwei <[email protected]>2024-09-14 12:18:26 +0800
commit1f55a6f2405ce209d04d7b71df593aef4474d1ed (patch)
tree17e5eb5d0840adc60bf58651f9c736070b74c247
parent0b142cd0bb38169936c516772d84c9b4e4201cae (diff)
✨ feat(module manager): from plugin manager to module manager
-rw-r--r--include/stellar/module_manager.h40
-rw-r--r--include/stellar/stellar.h10
-rw-r--r--infra/CMakeLists.txt2
-rw-r--r--infra/exdata/CMakeLists.txt1
-rw-r--r--infra/exdata/test/CMakeLists.txt (renamed from infra/plugin_manager/test/CMakeLists.txt)11
-rw-r--r--infra/exdata/test/gtest_module_manager_main.cpp (renamed from infra/plugin_manager/test/plugin_manager_gtest_main.cpp)40
-rw-r--r--infra/module_manager/CMakeLists.txt8
-rw-r--r--infra/module_manager/module_manager.c234
-rw-r--r--infra/module_manager/module_manager_interna.h41
-rw-r--r--infra/module_manager/test/CMakeLists.txt19
-rw-r--r--infra/module_manager/test/gtest_module_manager_main.cpp1617
-rw-r--r--infra/plugin_manager/CMakeLists.txt11
-rw-r--r--infra/plugin_manager/plugin_manager.c361
-rw-r--r--infra/plugin_manager/plugin_manager.h26
-rw-r--r--infra/plugin_manager/plugin_manager_interna.h33
-rw-r--r--infra/plugin_manager/test/plugin_manager_gtest_mock.h106
-rw-r--r--infra/stellar_core.c35
17 files changed, 1987 insertions, 608 deletions
diff --git a/include/stellar/module_manager.h b/include/stellar/module_manager.h
new file mode 100644
index 0000000..a626575
--- /dev/null
+++ b/include/stellar/module_manager.h
@@ -0,0 +1,40 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar/mq.h"
+
+struct stellar_module;
+struct stellar_module *stellar_module_new(const char *name);
+void stellar_module_free(struct stellar_module *mod);
+
+void * stellar_module_get_ctx(struct stellar_module *mod);
+void stellar_module_set_ctx(struct stellar_module *mod, void *ctx);
+
+const char *stellar_module_get_name(struct stellar_module* mod);
+
+struct stellar_module_manager;
+
+typedef struct stellar_module *module_on_init_func(struct stellar_module_manager *mod_mgr);
+typedef void module_on_exit_func(struct stellar_module_manager *mod_mgr, struct stellar_module *mod);
+
+struct stellar_module_manager *stellar_module_manager_new(const char *module_spec_toml_path, int max_thread_num, struct mq_schema *mq_schema);
+void stellar_module_manager_free(struct stellar_module_manager *mod_mgr);
+
+struct stellar_module *stellar_module_manager_get_module(struct stellar_module_manager *mod_mgr, const char *module_name);
+
+void stellar_module_manager_register_thread(struct stellar_module_manager* mod_mgr, int thread_id, struct mq_runtime *mq_rt);
+
+// return -1 on error
+int stellar_module_manager_get_thread_id(struct stellar_module_manager* mod_mgr);
+int stellar_module_manager_get_max_thread_num(struct stellar_module_manager* mod_mgr);
+
+struct mq_schema *stellar_module_get_mq_schema(struct stellar_module_manager *mod_mgr);
+struct mq_runtime *stellar_module_get_mq_runtime(struct stellar_module_manager *mod_mgr);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index cc66cfe..397e1a4 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -7,15 +7,12 @@ extern "C"
#include <stdint.h>
-#include "stellar/mq.h"
#include "stellar/log.h"
#include "stellar/packet.h"
struct stellar;
-//return plugin_env
-typedef void *plugin_on_load_func(struct stellar *st);
-typedef void plugin_on_unload_func(void *plugin_env);
+
typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg);
//return 0 if success, otherwise return -1.
@@ -28,8 +25,6 @@ int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_pol
void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str);
-int stellar_get_worker_thread_num(struct stellar *st);
-uint16_t stellar_get_current_thread_index();
// only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
@@ -40,9 +35,6 @@ void stellar_loopbreak(struct stellar *st);
void stellar_reload_log_level(struct stellar *st);
struct logger *stellar_get_logger(struct stellar *st);
-struct mq_schema *stellar_get_mq_schema(struct stellar *st);
-struct mq_runtime *stellar_get_mq_runtime(struct stellar *st);
-
#ifdef __cplusplus
}
#endif
diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt
index 5d73a33..0ad1bb2 100644
--- a/infra/CMakeLists.txt
+++ b/infra/CMakeLists.txt
@@ -1,4 +1,4 @@
-set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager plugin_manager)
+set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml)
#set(DECODERS http lpi)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
diff --git a/infra/exdata/CMakeLists.txt b/infra/exdata/CMakeLists.txt
index 2630f57..93e8c8f 100644
--- a/infra/exdata/CMakeLists.txt
+++ b/infra/exdata/CMakeLists.txt
@@ -1,3 +1,4 @@
add_library(exdata exdata.c)
+# //TODO: Add test
#add_subdirectory(test) \ No newline at end of file
diff --git a/infra/plugin_manager/test/CMakeLists.txt b/infra/exdata/test/CMakeLists.txt
index d16c69a..323e29c 100644
--- a/infra/plugin_manager/test/CMakeLists.txt
+++ b/infra/exdata/test/CMakeLists.txt
@@ -1,13 +1,14 @@
-add_executable(gtest_plugin_manager
-plugin_manager_gtest_main.cpp
+add_executable(gtest_module_manager
+ module_manager_gtest_main.cpp
)
+
include_directories(${CMAKE_SOURCE_DIR}/infra/plugin_manager/)
include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/)
target_link_libraries(
- gtest_plugin_manager
- plugin_manager
+ gtest_module_manager
+ module_manager
dl
"-rdynamic"
gtest
@@ -15,4 +16,4 @@ target_link_libraries(
)
include(GoogleTest)
-gtest_discover_tests(gtest_plugin_manager) \ No newline at end of file
+gtest_discover_tests(gtest_module_manager) \ No newline at end of file
diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/exdata/test/gtest_module_manager_main.cpp
index ec74872..079f9e7 100644
--- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp
+++ b/infra/exdata/test/gtest_module_manager_main.cpp
@@ -2,46 +2,8 @@
#include <gtest/gtest.h>
-#include "plugin_manager.h"
-#include "plugin_manager_gtest_mock.h"
-#define STELLAR_INTRINSIC_TOPIC_NUM 0
-#define TOPIC_NAME_MAX 512
-
-#if 0
-void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr)
-{
- SCOPED_TRACE("whitebox test intrisic metadata");
-
- EXPECT_TRUE(plug_mgr!=NULL);
-
- EXPECT_EQ(plug_mgr->st, st);
-
- //load spec null
- EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL);
-
- //session exdata schema null
- EXPECT_TRUE(plug_mgr->exdata_schema!=NULL);
-
- //stellar mq schema null
- EXPECT_TRUE(plug_mgr->stellar_mq_schema_array==NULL);
-
- //registered plugin array null
- EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL);
- EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL);
-
- EXPECT_TRUE(plug_mgr->per_thread_data!=NULL);
- int thread_num=stellar_get_worker_thread_num(st);
- for(int i=0; i<thread_num; i++)
- {
- EXPECT_TRUE(plug_mgr->per_thread_data[i].exdata_array==NULL);
- EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL);
- for(int j=0; j<STELLAR_MQ_PRIORITY_MAX; j++)
- EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL);
- }
-}
-
-#endif
+#include "module_manager/module_manager_interna.h"
/***********************************
* TEST PLUGIN MANAGER INIT & EXIT *
diff --git a/infra/module_manager/CMakeLists.txt b/infra/module_manager/CMakeLists.txt
new file mode 100644
index 0000000..cc5b0f6
--- /dev/null
+++ b/infra/module_manager/CMakeLists.txt
@@ -0,0 +1,8 @@
+add_library(module_manager module_manager.c)
+target_include_directories(module_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
+target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/)
+target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
+target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
+target_link_libraries(module_manager PUBLIC toml ${CMAKE_DL_LIBS})
+
+#add_subdirectory(test) \ No newline at end of file
diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c
new file mode 100644
index 0000000..53f906c
--- /dev/null
+++ b/infra/module_manager/module_manager.c
@@ -0,0 +1,234 @@
+#include "module_manager_interna.h"
+
+#include "stellar/module_manager.h"
+#include "stellar/utils.h"
+#include "toml/toml.h"
+#include <dlfcn.h>
+#include <stdbool.h>
+
+
+UT_icd module_specs_icd = {sizeof(struct module_specific), NULL, NULL, NULL};
+
+static struct module_specific *module_specs_load(const char *toml_conf_path, int *mod_num)
+{
+ *mod_num = 0;
+ FILE* fp = fopen(toml_conf_path, "r");
+ if(fp==NULL)return NULL;
+ char errbuf[256];
+ toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ fclose(fp);
+ if (!conf) {
+ fprintf(stderr, "Error parsing toml: %s\n", errbuf);
+ return NULL;
+ }
+ struct module_specific* mod_spec=NULL;
+ toml_array_t* mod_array = toml_array_in(conf, "module");
+ if(mod_array==NULL)goto MODULE_SPEC_LOAD_ERROR;
+ *mod_num = toml_array_nelem(mod_array);
+ mod_spec = CALLOC(struct module_specific, *mod_num);
+
+ for (int i = 0; i < *mod_num; i++) {
+ toml_table_t* toml_mod = toml_table_at(mod_array, i);
+
+ const char *path_raw = toml_raw_in(toml_mod, "path");
+ const char *init_func_name_raw = toml_raw_in(toml_mod, "init");
+ const char *exit_func_name_raw = toml_raw_in(toml_mod, "exit");
+ char *path = NULL;
+ char *init_func_name = NULL;
+ char *exit_func_name = NULL;
+ if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
+ toml_rtos(exit_func_name_raw, &exit_func_name))
+ {
+ goto MODULE_SPEC_LOAD_ERROR;
+ }
+ void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
+ if (!handle) {
+ fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
+ goto MODULE_SPEC_LOAD_ERROR;
+ }
+
+ mod_spec[i].load_cb = (module_on_init_func *) dlsym(handle, init_func_name);
+ if (!mod_spec[i].load_cb) {
+ fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
+ }
+
+ mod_spec[i].unload_cb = (module_on_exit_func *) dlsym(handle, exit_func_name);
+ if (!mod_spec[i].unload_cb) {
+ fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
+ }
+ FREE(path);
+ FREE(init_func_name);
+ FREE(exit_func_name);
+ }
+ toml_free(conf);
+ return mod_spec;
+MODULE_SPEC_LOAD_ERROR:
+ toml_free(conf);
+ if(mod_spec)FREE(mod_spec);
+ *mod_num=0;
+ return NULL;
+}
+
+/*******************************************
+ * stellar module manager API *
+ *******************************************/
+
+struct stellar_module_manager *stellar_module_manager_new(const char *module_spec_toml_path, int max_thread_num, struct mq_schema *mq_schema)
+{
+ int spec_num;
+ struct module_specific *specs = module_specs_load(module_spec_toml_path, &spec_num);
+ if(spec_num < 0)
+ {
+ return NULL;
+ }
+ struct stellar_module_manager *mod_mgr = CALLOC(struct stellar_module_manager, 1);
+ if(spec_num > 0)
+ {
+ utarray_new(mod_mgr->schema.module_specs_array,&module_specs_icd);
+ utarray_reserve(mod_mgr->schema.module_specs_array, spec_num);
+ }
+
+ mod_mgr->schema.max_thread_num=max_thread_num;
+ mod_mgr->schema.mq_schema=mq_schema;
+
+ // TODO: store module specific data in hash
+
+ for(int i = 0; i < spec_num; i++)
+ {
+ if (specs[i].load_cb != NULL)
+ {
+ //TODO: duplicate check mod_name
+ specs[i].mod=specs[i].load_cb(mod_mgr);
+ utarray_push_back(mod_mgr->schema.module_specs_array, &specs[i]);
+ }
+ }
+ FREE(specs);
+ return mod_mgr;
+}
+
+void stellar_module_manager_free(struct stellar_module_manager *mod_mgr)
+{
+ if(mod_mgr==NULL)return;
+ struct module_specific *p=NULL;
+ if (mod_mgr->schema.module_specs_array)
+ {
+ while ((p = (struct module_specific *)utarray_next(mod_mgr->schema.module_specs_array, p)))
+ {
+ if (p->unload_cb)
+ p->unload_cb(mod_mgr, p->mod);
+ }
+ utarray_free(mod_mgr->schema.module_specs_array);
+ }
+#if 0
+ if(plug_mgr->stellar_mq_schema_array)
+ {
+ for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
+ {
+ stellar_mq_destroy_topic( plug_mgr->st, i);
+ }
+ utarray_free(plug_mgr->stellar_mq_schema_array);
+ }
+
+ //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
+ if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
+ if(plug_mgr->registered_packet_plugin_array)
+ {
+ struct registered_plugin_schema *s = NULL;
+ while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
+ {
+ if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
+ }
+ utarray_free(plug_mgr->registered_packet_plugin_array);
+ }
+#endif
+
+ FREE(mod_mgr);
+ return;
+}
+
+int stellar_module_manager_get_max_thread_num(struct stellar_module_manager*mod_mgr)
+{
+ if(mod_mgr==NULL)return -1;
+ return mod_mgr->schema.max_thread_num;
+}
+
+struct mq_schema *stellar_module_get_mq_schema(struct stellar_module_manager *mod_mgr)
+{
+ if(mod_mgr==NULL)return NULL;
+ return mod_mgr->schema.mq_schema;
+}
+
+__thread int local_thread_id=-1;
+__thread struct mq_runtime *local_mq_rt=NULL;
+
+int stellar_module_manager_get_thread_id(struct stellar_module_manager* mod_mgr __unused)
+{
+ return local_thread_id;
+}
+
+struct mq_runtime *stellar_module_get_mq_runtime(struct stellar_module_manager *mod_mgr __unused)
+{
+ return local_mq_rt;
+}
+
+void stellar_module_manager_register_thread(struct stellar_module_manager* mod_mgr __unused, int thread_id, struct mq_runtime *mq_rt)
+{
+ local_thread_id=thread_id;
+ local_mq_rt=mq_rt;
+ return;
+}
+
+struct stellar_module *stellar_module_manager_get_module(struct stellar_module_manager *mod_mgr, const char *module_name)
+{
+ if(mod_mgr==NULL)return NULL;
+ struct module_specific *p=NULL;
+ if (mod_mgr->schema.module_specs_array)
+ {
+ while ((p = (struct module_specific *)utarray_next(mod_mgr->schema.module_specs_array, p)))
+ {
+ if(strcmp(p->mod->name, module_name)==0)
+ {
+ return p->mod;
+ }
+ }
+ }
+ return NULL;
+}
+
+/*******************************************
+ * stellar module API *
+ *******************************************/
+
+
+struct stellar_module *stellar_module_new(const char *name)
+{
+ struct stellar_module *mod = CALLOC(struct stellar_module, 1);
+ strncpy(mod->name, name, NAME_MAX);
+ return mod;
+}
+
+void stellar_module_free(struct stellar_module *mod)
+{
+ if(mod==NULL)return;
+ FREE(mod);
+ return;
+}
+
+void * stellar_module_get_ctx(struct stellar_module *mod)
+{
+ if(mod==NULL)return NULL;
+ return mod->module_ctx;
+}
+
+void stellar_module_set_ctx(struct stellar_module *mod, void *ctx)
+{
+ if(mod==NULL)return;
+ mod->module_ctx=ctx;
+ return;
+}
+
+const char *stellar_module_get_name(struct stellar_module* mod)
+{
+ if(mod==NULL)return NULL;
+ return mod->name;
+}
diff --git a/infra/module_manager/module_manager_interna.h b/infra/module_manager/module_manager_interna.h
new file mode 100644
index 0000000..af045e4
--- /dev/null
+++ b/infra/module_manager/module_manager_interna.h
@@ -0,0 +1,41 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar/module_manager.h"
+
+#include "uthash/utarray.h"
+#include "stellar/mq.h"
+
+#include <limits.h>
+
+struct stellar_module
+{
+ char name[NAME_MAX];
+ void *module_ctx;
+};
+
+struct stellar_module_manager
+{
+ struct
+ {
+ UT_array *module_specs_array;
+ int max_thread_num;
+ struct mq_schema *mq_schema;
+ }schema;
+
+}__attribute__((aligned(sizeof(void*))));
+
+struct module_specific
+{
+ struct stellar_module *mod;
+ module_on_init_func *load_cb;
+ module_on_exit_func *unload_cb;
+}__attribute__((aligned(sizeof(void*))));
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/module_manager/test/CMakeLists.txt b/infra/module_manager/test/CMakeLists.txt
new file mode 100644
index 0000000..323e29c
--- /dev/null
+++ b/infra/module_manager/test/CMakeLists.txt
@@ -0,0 +1,19 @@
+add_executable(gtest_module_manager
+ module_manager_gtest_main.cpp
+)
+
+
+include_directories(${CMAKE_SOURCE_DIR}/infra/plugin_manager/)
+include_directories(${CMAKE_SOURCE_DIR}/infra/tuple/)
+
+target_link_libraries(
+ gtest_module_manager
+ module_manager
+ dl
+ "-rdynamic"
+ gtest
+ gmock
+)
+
+include(GoogleTest)
+gtest_discover_tests(gtest_module_manager) \ No newline at end of file
diff --git a/infra/module_manager/test/gtest_module_manager_main.cpp b/infra/module_manager/test/gtest_module_manager_main.cpp
new file mode 100644
index 0000000..079f9e7
--- /dev/null
+++ b/infra/module_manager/test/gtest_module_manager_main.cpp
@@ -0,0 +1,1617 @@
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+
+#include <gtest/gtest.h>
+
+
+#include "module_manager/module_manager_interna.h"
+
+/***********************************
+ * TEST PLUGIN MANAGER INIT & EXIT *
+ ***********************************/
+
+//TODO: test case, plugin_specs_load
+
+TEST(plugin_manager_init, init_with_null_toml) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ //whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ plugin_manager_exit(plug_mgr);
+}
+
+TEST(plugin_manager_init, init_with_empty_toml) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, "/dev/null");
+ //whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ plugin_manager_exit(plug_mgr);
+}
+
+#if 0
+
+/******************************************
+ * TEST PLUGIN MANAGER PACKET PLUGIN INIT *
+ ******************************************/
+
+static void test_mock_packet_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+static void test_mock_overwrite_packet_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *exdata_name="PACKET_EXDATA";
+ int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st);
+ EXPECT_GE(exdata_idx, 0);
+ int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr);
+ EXPECT_GE(overwrite_idx, 0);
+ EXPECT_EQ(overwrite_idx, exdata_idx);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct exdata_meta *exdata_schema = (struct exdata_meta *)utarray_eltptr(
+ plug_mgr->exdata_schema->exdata_meta_array, (unsigned int)exdata_idx);
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, plug_mgr);
+ EXPECT_EQ(exdata_schema->idx, exdata_idx);
+ EXPECT_STREQ(exdata_schema->name, exdata_name);
+
+ int exdata_num = utarray_len(plug_mgr->exdata_schema->exdata_meta_array);
+ EXPECT_EQ(exdata_num, 1);
+ }
+
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_packet_msg_free(void *msg, void *msg_free_arg){}
+void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){}
+
+TEST(plugin_manager_init, stellar_mq_topic_create_and_update) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="PACKET_TOPIC";
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
+
+ int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+ struct stellar_mq_topic_schema *topic_schema = NULL;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr),
+ -1); // duplicate create, return error
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0;
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema
+ }
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_on_packet_msg(int topic_id, const void *msg, void *plugin_env){}
+void test_mock_overwrite_on_packet_msg(int topic_id, const void *msg, void *plugin_env){}
+
+TEST(plugin_manager_init, stellar_mq_subscribe) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+
+ const char *topic_name="PACKET_TOPIC";
+
+ int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_plugin_register(&st, NULL, NULL,&st);
+ EXPECT_GE(plugin_id, 0);
+
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_packet_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+
+
+static void test_mock_session_exdata_free(int idx, void *ex_ptr, void *arg){}
+static void test_mock_overwrite_session_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+TEST(plugin_manager_init, session_exdata_new_index_overwrite) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *exdata_name="SESSION_EXDATA";
+ int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_session_exdata_free, &st);
+ EXPECT_GE(exdata_idx, 0);
+ int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_session_exdata_free, plug_mgr);
+ EXPECT_GE(overwrite_idx, 0);
+ EXPECT_EQ(overwrite_idx, exdata_idx);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct exdata_meta *exdata_schema = (struct exdata_meta *)utarray_eltptr(
+ plug_mgr->exdata_schema->exdata_meta_array, (unsigned int)exdata_idx);
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_session_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, plug_mgr);
+ EXPECT_EQ(exdata_schema->idx, exdata_idx);
+ EXPECT_STREQ(exdata_schema->name, exdata_name);
+
+ int exdata_num = utarray_len(plug_mgr->exdata_schema->exdata_meta_array);
+ EXPECT_EQ(exdata_num, 1);
+ }
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_session_msg_free(void *msg, void *msg_free_arg){}
+
+void test_mock_on_session_msg(int topic_id, const void *msg,void *plugin_env){}
+void test_mock_overwrite_on_session_msg(int topic_id, const void *msg, void *plugin_env){}
+
+
+TEST(plugin_manager_init, stellar_mq_subscribe_overwrite) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="SESSION_TOPIC";
+
+ int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, NULL, test_mock_session_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_mq_subscribe(&st, 10, test_mock_on_session_msg, 10),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_plugin_register(&st, NULL, NULL, NULL);
+ EXPECT_GE(plugin_id, 0);
+
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, plugin_id),0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_overwrite_on_session_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->plugin_msg_cb, (void *)test_mock_overwrite_on_session_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+
+/**********************************************
+ * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT *
+ **********************************************/
+
+int test_plugin_on_polling_func(void *plugin_env)
+{
+ return 1;
+}
+
+TEST(plugin_manager_init, polling_plugin_register) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ st.plug_mgr=plug_mgr;
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ int plugin_id = stellar_polling_plugin_register(&st, test_plugin_on_polling_func, &st);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_TRUE(plugin_id>=0);
+ struct registered_polling_plugin_schema *schema = (struct registered_polling_plugin_schema *)utarray_eltptr(
+ plug_mgr->registered_polling_plugin_array, (unsigned int)(plugin_id));
+ EXPECT_EQ(schema->on_polling, (void *)test_plugin_on_polling_func);
+ EXPECT_EQ(schema->plugin_env, &st);
+ EXPECT_EQ(utarray_len(plug_mgr->registered_polling_plugin_array), 1);
+ }
+
+ plugin_manager_exit(plug_mgr);
+}
+
+/*******************************************
+ * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME*
+ *******************************************/
+
+#define PACKET_PROTO_PLUGIN_NUM 128
+#define PACKET_EXDATA_NUM 2
+#define PACKET_TOPIC_NUM 2
+#define PACKET_MQ_SUB_NUM 2
+struct packet_plugin_env
+{
+ struct plugin_manager_schema *plug_mgr;
+ int basic_on_packet_called;
+ int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM];
+ int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM];
+ int exdata_set_on_packet_called;
+ int exdata_get_on_packet_called;
+ unsigned int packet_exdata_idx[PACKET_EXDATA_NUM];
+ int exdata_free_called[PACKET_EXDATA_NUM];
+ unsigned int packet_topic_id[PACKET_TOPIC_NUM];
+ unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM];
+ int msg_pub_cnt;
+ int msg_sub_cnt;
+ int msg_free_cnt;
+};
+
+static void test_basic_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+TEST(plugin_manager, packet_plugin_illegal_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_plugin_register(&st, test_basic_on_packet, NULL,&env);
+ EXPECT_GE(plugin_id, 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array);
+ EXPECT_EQ(packet_plugin_num, 1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+}
+
+static void test_proto_filter_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
+ //env->proto_filter_plugin_called[ip_protocol]+=1;
+ return;
+}
+
+TEST(plugin_manager, DISABLED_packet_plugins_with_proto_filter) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+
+ int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0]));
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ env.proto_filter_plugin_id[i] = stellar_plugin_register(&st, test_proto_filter_on_packet, NULL,&env);
+ EXPECT_GE(env.proto_filter_plugin_id[i], 0);
+
+
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num);
+ }
+
+ struct packet pkt={&st, IPv4, 0};
+
+ int N_packet=10;
+ for (int j = 0; j < N_packet; j++)
+ {
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ pkt.ip_proto = i;
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+ }
+ plugin_manager_exit(plug_mgr);
+
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ EXPECT_EQ(env.proto_filter_plugin_called[i], N_packet);
+ }
+}
+
+
+struct test_exdata
+{
+ struct packet *pkt;
+ struct session *sess;
+ long long value;
+};
+
+static void test_exdata_set_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->exdata_set_on_packet_called+=1;
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ struct test_exdata *exdata_val=CALLOC(struct test_exdata , 1);
+ exdata_val->value=i;
+ exdata_val->pkt=pkt;
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0);
+ }
+ return;
+}
+
+static void test_exdata_get_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ struct test_exdata *exdata_val=(struct test_exdata *)packet_exdata_get(pkt, env->packet_exdata_idx[i]);
+ EXPECT_EQ(exdata_val->value, i);
+ }
+ env->exdata_get_on_packet_called+=1;
+ return;
+}
+
+static void test_packet_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ struct test_exdata *exdata_val=(struct test_exdata *)ex_ptr;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ EXPECT_EQ(exdata_val->value, idx);
+
+ EXPECT_EQ(packet_exdata_get(exdata_val->pkt, idx), nullptr);// illegal get in exdata_free callback
+ EXPECT_EQ(packet_exdata_set(exdata_val->pkt, idx, exdata_val->pkt), -1);// illegal set in exdata_free callback
+
+ FREE(ex_ptr);
+ env->exdata_free_called[idx]+=1;
+
+ return;
+}
+
+
+TEST(plugin_manager, packet_plugins_share_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+
+ char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX];
+ int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0]));
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ sprintf(exdata_name[i], "PACKET_EXDATA_%d", i);
+ env.packet_exdata_idx[i]=stellar_exdata_new_index(&st, exdata_name[i], test_packet_exdata_free, &env);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct exdata_meta *exdata_schema = (struct exdata_meta *)utarray_eltptr(
+ plug_mgr->exdata_schema->exdata_meta_array, env.packet_exdata_idx[i]);
+
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, &env);
+ EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]);
+ EXPECT_STREQ(exdata_schema->name, exdata_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->exdata_schema->exdata_meta_array), exdata_idx_len);
+ }
+
+ int exdata_set_plugin_id=stellar_plugin_register(&st, test_exdata_set_on_packet, NULL,&env);
+ EXPECT_GE(exdata_set_plugin_id, 0);
+
+ int exdata_get_plugin_id=stellar_plugin_register(&st, test_exdata_get_on_packet, NULL,&env);
+ EXPECT_GE(exdata_get_plugin_id, 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+
+ for(int i=0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.exdata_set_on_packet_called, N_packet);
+ EXPECT_EQ(env.exdata_get_on_packet_called, N_packet);
+
+ for(int i=0; i < exdata_idx_len; i++)
+ {
+ EXPECT_EQ(env.exdata_free_called[i], N_packet);
+ }
+}
+
+static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ return;
+}
+
+static void test_mq_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void test_mq_pub_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ for(int i=0; i<topic_id_num; i++)
+ {
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], pkt), 0);
+ env->msg_pub_cnt+=1;
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_mq_pub_sub) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, NULL, test_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_plugin_register(&st, test_mq_pub_on_packet, NULL,&env);
+ EXPECT_GE(pub_plugin_id, 0);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL,&env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ FREE(msg);
+ return;
+}
+
+static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ unsigned int cnt=0;
+ int *msg;
+ for(int i=0; i<topic_id_num; i++)
+ {
+ for(unsigned int j=0; j < env->plug_mgr->max_message_dispatch; j++)
+ {
+ msg=CALLOC(int, 1);
+ *msg=cnt;
+ int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg);
+ if(cnt < env->plug_mgr->max_message_dispatch)
+ {
+ ASSERT_EQ(pub_ret, 0);
+ env->msg_pub_cnt+=1;
+ }
+ else
+ {
+ ASSERT_EQ(pub_ret, -1);
+ }
+ if(pub_ret!=0)FREE(msg);
+ cnt+=1;
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_pub_overlimit) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env);
+ EXPECT_GE(pub_plugin_id, 0);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+
+
+static void test_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ env->exdata_free_called[idx]+=1;
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], (struct packet *)ex_ptr), -1);// publish message in packet exdata_free is illegal
+ env->msg_pub_cnt+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ //EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0);
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_EQ(topic_id, env->packet_topic_id[0]);
+ env->msg_sub_cnt+=1;
+}
+
+TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_plugin_register(&st, test_exdata_free_pub_msg_on_packet, NULL,&env);
+ EXPECT_GE(plugin_id, 0);
+
+ env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
+ env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, NULL, test_exdata_free_pub_msg_free, &env);
+
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
+
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+ EXPECT_EQ(env.msg_pub_cnt, 1);
+ EXPECT_EQ(env.msg_sub_cnt, 0);
+ EXPECT_EQ(env.msg_free_cnt, 0);
+ EXPECT_EQ(env.exdata_free_called[0], 1);
+}
+
+
+/**********************************************
+ * PESUDO SESSION MANAGER PLUGIN API *
+ **********************************************/
+
+struct session_manager_plugin_env
+{
+ struct stellar *st;
+ int N_session;
+ int N_per_session_pkt_cnt;
+ struct session sess[1024];
+
+ int session_manager_plugin_id;
+
+ int intrinsc_tcp_input_topic_id;
+ int intrinsc_tcp_output_topic_id;
+ int intrinsc_tcp_stream_topic_id;
+
+ int intrinsc_udp_input_topic_id;
+ int intrinsc_udp_output_topic_id;
+
+ int basic_exdata_idx;
+ int basic_exdata_free_called;
+ int basic_on_tcp_called;
+ int basic_ctx_new_called;
+ int basic_ctx_free_called;
+ int test_mq_pub_plugin_id;
+ int test_mq_sub_plugin_id;
+ int test_mq_pub_called;
+ int test_mq_sub_called;
+ int test_mq_free_called;
+ int test_mq_topic_id;
+ int plugin_id_1;
+ int plugin_id_2;
+ int plugin_id_1_called;
+ int plugin_id_2_called;
+ int exdata_ctx_1_id;
+ int exdata_ctx_2_id;
+};
+
+typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *plugin_env);
+
+static void pesudo_on_msg_dispatch(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *dispatch_arg, void *sub_plugin_env)
+{
+ on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb;
+ struct session *sess=(struct session *)msg;
+ session_cb(topic_id, sess, sub_plugin_env);
+}
+static void pesudo_on_packet_input(struct packet *pkt, void *plugin_env)
+{
+ struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env;
+ for (int i = 0; i < env->N_session; i++)
+ {
+ stellar_mq_publish_message(env->st, env->intrinsc_tcp_input_topic_id, &env->sess[i]);
+ }
+}
+
+static void pesudo_on_packet_output(struct packet *pkt, void *plugin_env)
+{
+ struct session_manager_plugin_env *env=(struct session_manager_plugin_env *)plugin_env;
+ for (int i = 0; i < env->N_session; i++)
+ {
+ stellar_mq_publish_message(env->st, env->intrinsc_tcp_output_topic_id, &env->sess[i]);
+ }
+}
+
+static void pesudo_session_load(struct stellar *st, struct session_manager_plugin_env *env)
+{
+ env->st=st;
+ for(int i=0; i <env->N_session; i++)
+ {
+ env->sess[i].session_exdat_rt=session_exdata_runtime_new(st);
+ env->sess[i].type=SESSION_TYPE_TCP;
+ }
+ env->intrinsc_tcp_input_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, env);
+
+ env->session_manager_plugin_id=stellar_plugin_register(st, pesudo_on_packet_input, pesudo_on_packet_output, env);
+}
+
+void pesudo_session_unload(struct stellar *st, struct session_manager_plugin_env *env)
+{
+ for(int i=0; i <env->N_session; i++)
+ {
+ session_exdata_runtime_free(env->sess[i].session_exdat_rt);
+ }
+}
+
+static int pesudo_tcp_session_subscribe(struct stellar *st, on_session_msg_cb_func *on_session_cb, int plugin_id)
+{
+ int topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP);
+ if(topic_id<0)
+ {
+ topic_id=stellar_mq_create_topic(st, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, NULL, NULL);
+ }
+ return stellar_mq_subscribe(st, topic_id, (on_msg_cb_func *)on_session_cb, plugin_id);
+}
+
+/**********************************************
+ * SESSION MANAGER PLUGIN RUNTIME *
+ **********************************************/
+
+
+TEST(plugin_manager, no_plugin_register_runtime) {
+
+ struct stellar st={0};
+
+// init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+
+// prepare packet and session
+
+ struct session_manager_plugin_env env;
+ memset(&env, 0, sizeof(struct session_manager_plugin_env));
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+// load session manager plugin
+ pesudo_session_load(&st, &env);
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+
+ }
+
+// unload session manager plugin
+ pesudo_session_unload(&st, &env);
+
+//exit stage
+ plugin_manager_exit(plug_mgr);
+}
+
+
+static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env)
+{
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ if(sess)
+ {
+ EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set
+ EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get
+ long long called = (long long )session_exdata_get(sess, env->basic_exdata_idx);
+ EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, (void *)(called+1)), 0);
+ env->basic_on_tcp_called+=1;
+ }
+ return;
+}
+
+static void test_basic_session_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg;
+ EXPECT_EQ(env->basic_exdata_idx, idx);
+ EXPECT_EQ((long long )ex_ptr, env->N_per_session_pkt_cnt);
+
+ env->basic_exdata_free_called+=1;
+}
+
+TEST(plugin_manager, session_plugin_on_tcp) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL,MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_manager_plugin_env env;
+ memset(&env, 0, sizeof(env));
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=1;
+
+ int plugin_id=stellar_plugin_register(&st, NULL, NULL, &env);
+ EXPECT_GE(plugin_id, 0);
+
+ EXPECT_EQ(pesudo_tcp_session_subscribe(&st, test_basic_on_tcp_session, plugin_id), 0);
+
+ env.basic_exdata_idx=stellar_exdata_new_index(&st, "SESSION_EXDATA", test_basic_session_exdata_free,&env);
+ EXPECT_GE(env.basic_exdata_idx, 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+ pesudo_session_load(&st, &env);
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ plugin_manager_on_polling(plug_mgr);
+ }
+
+ pesudo_session_unload(&st, &env);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_tcp_called, env.N_session*env.N_per_session_pkt_cnt);
+ EXPECT_EQ(env.basic_exdata_free_called, env.N_session);
+
+}
+
+//TODO: test case, message pub overlimit
+#if 0
+static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+
+static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt);
+ FREE(ctx);
+ return;
+}
+
+static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+
+static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg
+ FREE(ctx);
+ return;
+}
+
+struct test_overlimit_msg
+{
+ struct session *sess;
+ int called;
+};
+
+static void test_overlimit_pub_on_session(int topic_id, const void *msg, void *plugin_env)
+{
+ struct session *sess=(struct session *)msg;
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ struct test_overlimit_msg *pub_msg;
+ if (msg)
+ {
+ env->test_mq_pub_called += 1;
+ ctx->pkt_cnt += 1;
+ for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++)
+ {
+ pub_msg = CALLOC(struct test_overlimit_msg, 1);
+ pub_msg->called = env->test_mq_pub_called;
+ pub_msg->sess=sess;
+ if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg
+ {
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), 0);
+ ctx->pub_cnt+=1;
+ }
+ else
+ {
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, pub_msg), -1);
+ FREE(pub_msg);
+ }
+ }
+ }
+ return;
+}
+
+static void test_overlimit_on_sub_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env;
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)msg;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(recv_msg->called, env->test_mq_pub_called);
+ env->test_mq_sub_called+=1;
+ ctx->sub_cnt+=1;
+ return;
+}
+
+static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg)
+{
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)msg_free_arg;
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
+ if(recv_msg)
+ {
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free
+ EXPECT_EQ(env->test_mq_pub_called, recv_msg->called);
+ env->test_mq_free_called+=1;
+ FREE(msg);
+ }
+ return;
+}
+
+TEST(plugin_manager,DISABLED_session_plugin_pub_msg_overlimt) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_manager_plugin_env env;
+ memset(&env, 0, sizeof(struct session_manager_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
+ EXPECT_GE(env.test_mq_pub_plugin_id, 0);
+
+ env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
+ EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_overlimit_session_msg_free, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+ env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
+ EXPECT_GE(env.test_mq_sub_plugin_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]);
+ }
+
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ session_exdata_runtime_free(sess[i].session_exdat_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session);
+ EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+ EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+
+}
+#endif
+
+
+//TODO: test case, publish msg on session closed
+#if 0
+struct test_session_closing_ctx
+{
+ int pkt_called;
+ int session_free_called;
+ int userdefine_on_msg_called;
+};
+
+static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1);
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_new_called+=1;
+ return ctx;
+}
+
+static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt);
+ EXPECT_EQ(ctx->session_free_called,1);
+ FREE(ctx);
+}
+
+
+static void test_session_closing_on_session_free(struct session *sess, void *per_session_ctx, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ if(sess->state==SESSION_STATE_CLOSING)
+ {
+ ctx->session_free_called+=1;
+ stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, env);
+ env->test_mq_pub_called+=1;
+ }
+}
+
+
+static void test_session_closing_on_intrisic_msg( int topic_id, const void *msg, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg;
+ if(msg)ctx->pkt_called+=1;
+}
+
+static void test_session_closing_on_userdefine_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)msg;
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env;
+ ctx->userdefine_on_msg_called+=1;
+ EXPECT_EQ(msg, plugin_env);
+ env->test_mq_sub_called+=1;
+}
+
+TEST(plugin_manager, DISABLED_session_plugin_pub_msg_on_closing) {
+
+ struct stellar st={0};
+ struct session_manager_plugin_env env;
+ memset(&env, 0, sizeof(struct session_manager_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
+ EXPECT_GE(plugin_id,0);
+
+ env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
+ EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]);
+ }
+
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ session_exdata_runtime_free(sess[i].session_exdat_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ //EXPECT_EQ(env.basic_ctx_new_called,env.N_session);
+ //EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+ //EXPECT_EQ(env.test_mq_pub_called,env.N_session);
+ //EXPECT_EQ(env.test_mq_sub_called,env.N_session);
+}
+
+#endif
+
+
+//TODO: test case, mq priority
+#if 0
+
+//test dettach session
+static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->plugin_id_1_called+=1;
+ if(topic_id == env->intrinsc_tcp_topic_id)
+ {
+ struct session *sess = (struct session *)msg;
+ struct test_session_called_ctx *ctx =
+ (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id);
+ if (ctx == NULL)
+ {
+ ctx = CALLOC(struct test_session_called_ctx, 1);
+ session_exdata_set(sess, env->exdata_ctx_1_id, ctx);
+ }
+ ctx->called+=1;
+ EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority
+ EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0);
+ }
+ if(topic_id == env->test_mq_topic_id)
+ {
+ if(ctx->called%3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_2);
+ }
+ if(ctx->called%3 == 0)
+ {
+ EXPECT_EQ((int )(long)msg, env->plugin_id_1);
+ }
+ }
+ return;
+}
+
+static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+
+ env->plugin_id_2_called+=1;
+
+ if(topic_id == env->intrinsc_tcp_topic_id)
+ {
+ struct session *sess = (struct session *)msg;
+ struct test_session_called_ctx *ctx =
+ (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id);
+ if (ctx == NULL)
+ {
+ ctx = CALLOC(struct test_session_called_ctx, 1);
+ session_exdata_set(sess, env->exdata_ctx_2_id, ctx);
+ }
+ ctx->called+=1;
+ EXPECT_EQ(ctx->called % 3, 1);
+ // publish msg has normal priority
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0);
+ }
+ if(topic_id == env->test_mq_topic_id)
+ {
+ if(ctx->called%3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_2);
+ }
+ if(ctx->called%3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_1);
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, test_session_mq_priority) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ;
+ env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ;
+
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]);
+ }
+
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ session_exdata_runtime_free(sess[i].session_exdat_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
+
+}
+
+#endif
+
+// TODO: test case, session_exdata_free_pub_msg
+#if 0
+
+void test_session_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)arg;
+ EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->intrinsc_tcp_input_topic_id, arg), -1);
+ env->basic_exdata_free_called+=1;
+}
+
+static void test_session_exdata_free_pub_msg_on_session(int topic_id, const void *msg, void *plugin_env)
+{
+ struct session *sess=(struct session *)msg;
+ struct session_manager_plugin_env *env = (struct session_manager_plugin_env *)plugin_env;
+ EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0);
+ if(msg)env->plugin_id_1_called+=1;
+}
+
+TEST(plugin_manager, session_exdata_free_pub_msg) {
+
+ struct stellar st={0};
+ struct session_manager_plugin_env env;
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ env.plugin_id_1=stellar_plugin_register(&st, 0, NULL, NULL, &env);
+ EXPECT_GE(env.plugin_id_1,0);
+
+ env.intrinsc_tcp_input_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
+ EXPECT_GE(env.intrinsc_tcp_input_topic_id, 0);
+ EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_input_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0);
+
+ env.basic_exdata_idx=stellar_exdata_new_index(&st, "BASIC_EXDATA", test_session_exdata_free_pub_msg_exdata_free, &env) ;
+ EXPECT_GE(env.basic_exdata_idx, 0);
+
+// pesudo packet and session
+
+ memset(&env, 0, sizeof(struct session_manager_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_input(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ stellar_mq_publish_message(&st, env.intrinsc_tcp_input_topic_id, &sess[i]);
+ }
+
+ plugin_manager_on_packet_output(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ session_exdata_runtime_free(sess[i].session_exdat_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_exdata_free_called,env.N_session);
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*env.N_per_session_pkt_cnt);
+}
+
+
+#endif
+
+/**********************************************
+ * TEST PLUGIN MANAGER ON POLLING PLUGIN RUNTIME *
+ **********************************************/
+
+struct polling_plugin_env
+{
+ struct plugin_manager_schema *plug_mgr;
+ int N_polling;
+ int return0_polling_called;
+ int return1_polling_called;
+};
+
+int return1_plugin_on_polling(void *plugin_env)
+{
+ struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env;
+ env->return1_polling_called+=1;
+ return 1;
+}
+
+int return0_plugin_on_polling(void *plugin_env)
+{
+ struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env;
+ env->return0_polling_called+=1;
+ return 0;
+}
+
+TEST(plugin_manager, basic_polling_plugins) {
+
+// pesudo init stage
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
+ st.plug_mgr=plug_mgr;
+ struct polling_plugin_env env;
+ memset(&env, 0, sizeof(struct polling_plugin_env));
+ env.plug_mgr=plug_mgr;
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+ int plugin_id = stellar_polling_plugin_register(&st, return0_plugin_on_polling, &env);
+ EXPECT_TRUE(plugin_id>=0);
+ plugin_id = stellar_polling_plugin_register(&st, return1_plugin_on_polling, &env);
+ EXPECT_TRUE(plugin_id>=0);
+
+// pesudo runtime stage
+
+ env.plug_mgr=plug_mgr;
+ env.N_polling=10;
+
+ for(int i=0; i<env.N_polling; i++)
+ {
+ EXPECT_EQ(plugin_manager_on_polling(plug_mgr), 1);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.return0_polling_called,env.N_polling);
+ EXPECT_EQ(env.return1_polling_called,env.N_polling);
+
+}
+
+#endif
+
+/**********************************************
+ * GTEST MAIN *
+ **********************************************/
+
+int main(int argc, char ** argv)
+{
+ int ret=0;
+ ::testing::InitGoogleTest(&argc, argv);
+ ret=RUN_ALL_TESTS();
+ return ret;
+} \ No newline at end of file
diff --git a/infra/plugin_manager/CMakeLists.txt b/infra/plugin_manager/CMakeLists.txt
deleted file mode 100644
index fed928c..0000000
--- a/infra/plugin_manager/CMakeLists.txt
+++ /dev/null
@@ -1,11 +0,0 @@
-add_library(plugin_manager plugin_manager.c)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/packet_manager)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/session_manager)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/tuple)
-target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
-target_link_libraries(plugin_manager PUBLIC session_manager bitmap toml exdata ${CMAKE_DL_LIBS})
-
-add_subdirectory(test) \ No newline at end of file
diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c
deleted file mode 100644
index 5026ccb..0000000
--- a/infra/plugin_manager/plugin_manager.c
+++ /dev/null
@@ -1,361 +0,0 @@
-#include "plugin_manager_interna.h"
-#include "stellar/utils.h"
-#include "toml/toml.h"
-#include <dlfcn.h>
-#include <stdbool.h>
-
-#include "stellar_core.h"
-
-#if 0
-void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
-{
- plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment;
-}
-
-void stellar_per_stage_message_counter_set(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
-{
- plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=increment;
-}
-
-bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *plug_mgr, int tid)
-{
- if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true;
- return false;
-}
-#endif
-
-UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
-
-static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
-{
- *spec_num = 0;
- FILE* fp = fopen(toml_conf_path, "r");
- if(fp==NULL)return NULL;
- char errbuf[256];
- toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
- fclose(fp);
- if (!conf) {
- fprintf(stderr, "Error parsing toml: %s\n", errbuf);
- return NULL;
- }
- struct plugin_specific* plugins=NULL;
- toml_array_t* plugin_array = toml_array_in(conf, "plugin");
- if(plugin_array==NULL)goto PLUGIN_SPEC_LOAD_ERROR;
- *spec_num = toml_array_nelem(plugin_array);
- plugins = CALLOC(struct plugin_specific, *spec_num);
-
- for (int i = 0; i < *spec_num; i++) {
- toml_table_t* plugin = toml_table_at(plugin_array, i);
-
- const char *path_raw = toml_raw_in(plugin, "path");
- const char *init_func_name_raw = toml_raw_in(plugin, "init");
- const char *exit_func_name_raw = toml_raw_in(plugin, "exit");
- char *path = NULL;
- char *init_func_name = NULL;
- char *exit_func_name = NULL;
- if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
- toml_rtos(exit_func_name_raw, &exit_func_name))
- {
- goto PLUGIN_SPEC_LOAD_ERROR;
- }
- void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
- if (!handle) {
- fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
- goto PLUGIN_SPEC_LOAD_ERROR;
- }
-
- plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name);
- if (!plugins[i].load_cb) {
- fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
- }
-
- plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name);
- if (!plugins[i].unload_cb) {
- fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
- }
- FREE(path);
- FREE(init_func_name);
- FREE(exit_func_name);
- }
- toml_free(conf);
- return plugins;
-PLUGIN_SPEC_LOAD_ERROR:
- toml_free(conf);
- if(plugins)FREE(plugins);
- return NULL;
-}
-
-#if 0
-static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
-{
- if(st == NULL)return NULL;
- int thread_num=stellar_get_worker_thread_num(st);
- struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num);
- return per_thread_data;
-}
-static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st)
-{
- if(per_thread_data == NULL || st == NULL)return;
- int thread_num=stellar_get_worker_thread_num(st);
- struct plugin_manager_per_thread_data *p_data;
- for (int i = 0; i < thread_num; i++)
- {
- p_data=per_thread_data+i;
- exdata_handle_free(p_data->exdata_array);
- }
- FREE(per_thread_data);
- return;
-}
-#endif
-
-struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
-{
- int spec_num;
- struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
- if(spec_num < 0)
- {
- return NULL;
- }
- struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
- //plug_mgr->max_message_dispatch=max_msg_per_stage;
- if(spec_num > 0)
- {
- utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
- utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num);
- }
-
- plug_mgr->st = st;
- stellar_set_plugin_manger(st, plug_mgr);
-
- //plug_mgr->exdata_schema=exdata_schema_new();
-
- for(int i = 0; i < spec_num; i++)
- {
- if (specs[i].load_cb != NULL)
- {
- specs[i].plugin_ctx=specs[i].load_cb(st);
- utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]);
- }
- }
- FREE(specs);
- //plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
- return plug_mgr;
-}
-
-void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL)return;
- struct plugin_specific *p=NULL;
- if (plug_mgr->plugin_load_specs_array)
- {
- while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p)))
- {
- if (p->unload_cb)
- p->unload_cb(p->plugin_ctx);
- }
- utarray_free(plug_mgr->plugin_load_specs_array);
- }
-#if 0
- if(plug_mgr->stellar_mq_schema_array)
- {
- for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
- {
- stellar_mq_destroy_topic( plug_mgr->st, i);
- }
- utarray_free(plug_mgr->stellar_mq_schema_array);
- }
-
- //if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
- if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
- if(plug_mgr->registered_packet_plugin_array)
- {
- struct registered_plugin_schema *s = NULL;
- while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
- {
- if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
- }
- utarray_free(plug_mgr->registered_packet_plugin_array);
- }
-#endif
- //plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
- //exdata_schema_free(plug_mgr->exdata_schema);
- FREE(plug_mgr);
- return;
-}
-
-/*******************************
- * STELLAR EXDATA *
- *******************************/
-#if 0
-int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
-{
- if(st==NULL || name==NULL)return -1;
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->exdata_schema==NULL)return -1;
- return exdata_new_index(plug_mgr->exdata_schema, name, free_func, free_arg);
-}
-
-/*******************************
- * PACKET EXDATA *
- *******************************/
-static struct exdata_runtime *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return NULL;
- int tid=stellar_get_current_thread_index();
- if(plug_mgr->per_thread_data[tid].exdata_array == NULL)
- {
- plug_mgr->per_thread_data[tid].exdata_array = exdata_handle_new(plug_mgr->exdata_schema);
- }
- return plug_mgr->per_thread_data[tid].exdata_array;
-}
-
-static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL || plug_mgr->exdata_schema == NULL)return;
- struct exdata_runtime *per_thread_exdata_handle = per_thread_packet_exdata_arrary_get(plug_mgr);
- return exdata_handle_reset(per_thread_exdata_handle);
-}
-
-int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
-{
- if(pkt == NULL)return -1;
- struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
- return exdata_set(per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
-}
-
-void *packet_exdata_get(struct packet *pkt, int idx)
-{
- if(pkt == NULL)return NULL;
- struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
- return exdata_get( per_thread_packet_exdata_arrary_get(plug_mgr), idx);
-}
-
-/*******************************
- * SESSION EXDATA *
- *******************************/
-
-int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
-{
- struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess);
- if(sess_exdata == NULL)return -1;
- return exdata_set(sess_exdata,idx, ex_ptr);
-}
-
-void *session_exdata_get(struct session *sess, int idx)
-{
- struct exdata_runtime *sess_exdata = (struct exdata_runtime *)session_get_user_data(sess);
- if(sess_exdata == NULL)return NULL;
- return exdata_get(sess_exdata, idx);
-}
-#endif
-
-
-#if 0
-/*******************************
- * PLUGIN MANAGER SESSION RUNTIME *
- *******************************/
-struct exdata_runtime *session_exdata_runtime_new(struct stellar *st)
-{
- struct plugin_manager_schema *plug_mgr=stellar_get_plugin_manager(st);
- return exdata_handle_new(plug_mgr->exdata_schema);
-}
-
-void session_exdata_runtime_free(struct exdata_runtime *exdata_h)
-{
- return exdata_handle_free(exdata_h);
-}
-
-/*********************************************
- * PLUGIN MANAGER PLUGIN *
- *********************************************/
-UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL};
-
-int stellar_plugin_register(struct stellar *st, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->registered_packet_plugin_array == NULL)
- {
- utarray_new(plug_mgr->registered_packet_plugin_array, &registered_plugin_array_icd);
- }
- struct registered_plugin_schema packet_plugin_schema;
- memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
- packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
- packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
- packet_plugin_schema.plugin_env = plugin_env;
- utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
- return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
-}
-
-static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
-{
- if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
- struct registered_plugin_schema *p=NULL;
-
- //int tid=stellar_get_current_thread_index();
- //stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
- while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
- {
- if(p->on_packet[in_out])
- {
- p->on_packet[in_out](pkt, p->plugin_env);
- }
- }
- //stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
- return;
-}
-
-void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
-{
- plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_INPUT);
-}
-
-void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
-{
- if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
- plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
- //int tid=stellar_get_current_thread_index();
- //stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
- //stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
- // plug_mgr->stellar_mq_schema_array);
- //per_thread_packet_exdata_arrary_clean(plug_mgr);
-}
-
-/*********************************************
- * PLUGIN MANAGER POLLING PLUGIN *
- *********************************************/
-UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
-
-int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->registered_polling_plugin_array == NULL)
- {
- utarray_new(plug_mgr->registered_polling_plugin_array, &registered_polling_plugin_array_icd);
- }
- struct registered_polling_plugin_schema polling_plugin_schema;
- memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema));
- polling_plugin_schema.on_polling = on_polling;
- polling_plugin_schema.plugin_env = plugin_env;
- utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
- return (utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
-}
-
-int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL || plug_mgr->registered_polling_plugin_array == NULL)return 0;
- struct registered_polling_plugin_schema *p=NULL;
- int polling_state=0;
- while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
- {
- if(p->on_polling)
- {
- if(p->on_polling(p->plugin_env)==1)
- {
- polling_state=1;
- }
- }
- }
- return polling_state;
-}
-
-#endif \ No newline at end of file
diff --git a/infra/plugin_manager/plugin_manager.h b/infra/plugin_manager/plugin_manager.h
deleted file mode 100644
index f2e9147..0000000
--- a/infra/plugin_manager/plugin_manager.h
+++ /dev/null
@@ -1,26 +0,0 @@
-#pragma once
-
-#include "stellar/stellar.h"
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-struct plugin_manager_schema;
-struct plugin_manager_runtime;
-
-struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
-void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
-
-//TODO
-void *plugin_manager_get_plugin_env(const char *plugin_name);
-
-//void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-//void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-//return polling work state, 0: idle, 1: working
-//int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h
deleted file mode 100644
index b918e0e..0000000
--- a/infra/plugin_manager/plugin_manager_interna.h
+++ /dev/null
@@ -1,33 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "stellar/stellar.h"
-
-#include "uthash/utarray.h"
-
-
-/*******************************
- * PLUGIN MANAGER INIT & EXIT *
- *******************************/
-
-struct plugin_manager_schema
-{
- struct stellar *st;
- UT_array *plugin_load_specs_array;
-}__attribute__((aligned(sizeof(void*))));
-
-struct plugin_specific
-{
- char plugin_name[256];
- plugin_on_load_func *load_cb;
- plugin_on_unload_func *unload_cb;
- void *plugin_ctx;
-}__attribute__((aligned(sizeof(void*))));
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/plugin_manager/test/plugin_manager_gtest_mock.h b/infra/plugin_manager/test/plugin_manager_gtest_mock.h
deleted file mode 100644
index 778a0b8..0000000
--- a/infra/plugin_manager/test/plugin_manager_gtest_mock.h
+++ /dev/null
@@ -1,106 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "plugin_manager/plugin_manager_interna.h"
-
-#include "stellar/session.h"
-#include "tuple.h"
-
-//mock stellar
-struct stellar
-{
- struct plugin_manager_schema *plug_mgr;
-};
-
-enum packet_type
-{
- UNKNOWN,
- IPv4,
- IPv6,
- UDP,
- TCP,
- TCP_STREAM,
- CONTROL,
-};
-
-struct packet
-{
- struct stellar *st;
- enum packet_type type;
- unsigned char ip_proto;
-};
-
-
-struct session
-{
- struct exdata_handle *session_exdat_rt;
- enum session_type type;
- enum session_state state;
- int sess_pkt_cnt;
-};
-
-struct plugin_manager_schema * stellar_get_plugin_manager(struct stellar *st)
-{
- return st->plug_mgr;
-}
-
-int stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *pm)
-{
- st->plug_mgr=pm;
- return 0;
-}
-
-int stellar_get_worker_thread_num(struct stellar *st __attribute__((unused)))
-{
- return 16;
-}
-
-uint16_t stellar_get_current_thread_index()
-{
- return 0;
-}
-
-unsigned char packet_get_ip_protocol(struct packet *pkt)
-{
- return pkt->ip_proto;
-}
-
-enum session_type session_get_type(const struct session *sess)
-{
- return sess->type;
-
-}
-
-void session_set_user_data(struct session *sess, void *user_data)
-{
- sess->session_exdat_rt = (struct exdata_handle *)user_data;
-}
-
-void *session_get_user_data(const struct session *sess)
-{
- return sess->session_exdat_rt;
-}
-
-void *packet_get_user_data(const struct packet *pkt)
-{
- return pkt->st->plug_mgr;
-}
-
-int packet_get_innermost_tuple6(const struct packet *pkt, struct tuple6 *tuple)
-{
- tuple->ip_proto = pkt->ip_proto;
- return 0;
-}
-
-uint8_t packet_is_ctrl(const struct packet *pkt __attribute__((unused)))
-{
- return 0;
-}
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index f72574e..0c2a1f4 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -8,13 +8,14 @@
#include <pthread.h>
#include <sys/prctl.h>
+#include "stellar/module_manager.h"
+
#include "utils.h"
#include "packet_io.h"
#include "log_private.h"
#include "stellar_stat.h"
#include "stellar_core.h"
#include "packet_private.h"
-#include "plugin_manager.h"
#include "session_private.h"
#include "session_manager.h"
@@ -51,7 +52,8 @@ struct stellar_runtime
struct logger *logger;
struct stellar_stat *stat;
struct packet_io *packet_io;
- struct plugin_manager_schema *plug_mgr;
+ struct mq_schema *mq_schema;
+ struct stellar_module_manager *mod_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
@@ -130,7 +132,11 @@ static void *worker_thread(void *arg)
struct stellar *st = thread->st;
struct stellar_runtime *runtime = &st->runtime;
struct packet_io *packet_io = runtime->packet_io;
- struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
+ struct stellar_module_manager *mod_mgr = runtime->mod_mgr;
+ struct mq_runtime *mq_rt = mq_runtime_new(runtime->mq_schema);
+
+ stellar_module_manager_register_thread(mod_mgr, thread->tid, mq_rt);
+
struct thread_stat thr_stat = {
.pkt_io = packet_io_stat(packet_io, thread->idx),
.ip_reass = ip_reassembly_stat(ip_reass),
@@ -145,7 +151,7 @@ static void *worker_thread(void *arg)
for (int i = 0; i < RX_BURST_MAX; i++)
{
- packet_set_user_data(&packets[i], (void *)plug_mgr);
+ packet_set_user_data(&packets[i], (void *)mod_mgr);
}
snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx);
@@ -292,6 +298,8 @@ static void *worker_thread(void *arg)
stellar_stat_merge(runtime->stat, &thr_stat, thread->idx, UINT64_MAX);
stellar_stat_print(runtime->stat, &thr_stat, thread->idx);
+ mq_runtime_free(mq_rt);
+
ATOMIC_SET(&thread->is_runing, 0);
CORE_LOG_FATAL("worker thread %d exit", thr_idx);
@@ -448,8 +456,9 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
- runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file);
- if (runtime->plug_mgr == NULL)
+ runtime->mq_schema=mq_schema_new();
+ runtime->mod_mgr = stellar_module_manager_new(plugin_cfg_file, config->pkt_io_cfg->nr_worker_thread, runtime->mq_schema);
+ if (runtime->mod_mgr == NULL)
{
CORE_LOG_ERROR("unable to create plugin manager");
goto error_out;
@@ -527,7 +536,8 @@ void stellar_free(struct stellar *st)
struct stellar_config *config = &st->config;
packet_io_free(runtime->packet_io);
- plugin_manager_exit(runtime->plug_mgr);
+ stellar_module_manager_free(runtime->mod_mgr);
+ mq_schema_free(runtime->mq_schema);
stellar_stat_free(runtime->stat);
session_manager_config_free(config->sess_mgr_cfg);
@@ -564,20 +574,11 @@ void stellar_reload_log_level(struct stellar *st)
* Stellar Utility Function
******************************************************************************/
-struct plugin_manager_schema *stellar_get_plugin_manager(const struct stellar *st)
-{
- return st->runtime.plug_mgr;
-}
-
-void stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *plug_mgr)
-{
- st->runtime.plug_mgr = plug_mgr;
-}
// only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt)
{
- uint16_t thr_idx = stellar_get_current_thread_index();
+ uint16_t thr_idx = stellar_module_manager_get_thread_id(st->runtime.mod_mgr);
struct packet_io *packet_io = st->runtime.packet_io;
struct session_manager *sess_mgr = st->runtime.threads[thr_idx].sess_mgr;
session_manager_record_duplicated_packet(sess_mgr, pkt);