summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-11-26 14:44:44 +0800
committeryangwei <[email protected]>2024-11-26 14:44:44 +0800
commit9895e932143ef65313bd1b49726e60fd63c497ed (patch)
tree49af0f3d0324c842da7cada43776d546291a253c
parent78562a8dd879a3753debae14198f00fe2dc0112a (diff)
🦄 refactor(remove mq): remove mq in stellar
-rw-r--r--include/stellar/module.h9
-rw-r--r--include/stellar/mq.h55
-rw-r--r--infra/CMakeLists.txt2
-rw-r--r--infra/module_manager/CMakeLists.txt2
-rw-r--r--infra/module_manager/module_manager.c22
-rw-r--r--infra/module_manager/module_manager_interna.h4
-rw-r--r--infra/module_manager/test/gtest_module_manager_main.cpp46
-rw-r--r--infra/mq/CMakeLists.txt3
-rw-r--r--infra/mq/mq.c279
-rw-r--r--infra/mq/mq_internal.h75
-rw-r--r--infra/mq/test/CMakeLists.txt14
-rw-r--r--infra/mq/test/gtest_mq_main.cpp917
-rw-r--r--infra/packet_manager/CMakeLists.txt2
-rw-r--r--infra/session_manager/CMakeLists.txt2
-rw-r--r--infra/stellar_core.c15
15 files changed, 25 insertions, 1422 deletions
diff --git a/include/stellar/module.h b/include/stellar/module.h
index db15bf0..f00bc65 100644
--- a/include/stellar/module.h
+++ b/include/stellar/module.h
@@ -6,7 +6,6 @@ extern "C"
{
#endif
-#include "stellar/mq.h"
#include "stellar/log.h"
/*******************************************
@@ -43,23 +42,21 @@ struct module_hooks
module_on_thread_exit_func *on_thread_exit_cb;
};
-struct module_manager *module_manager_new(struct module_hooks mod_specs[], size_t n_mod, int max_thread_num, const char *toml_path, struct mq_schema *mq_schema, struct logger *logger);
-struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct mq_schema *mq_schema, struct logger *logger);
+struct module_manager *module_manager_new(struct module_hooks mod_specs[], size_t n_mod, int max_thread_num, const char *toml_path, struct logger *logger);
+struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct logger *logger);
void module_manager_free(struct module_manager *mod_mgr);
-void module_manager_register_thread(struct module_manager *mod_mgr, int thread_id, struct mq_runtime *mq_rt);
+void module_manager_register_thread(struct module_manager *mod_mgr, int thread_id);
void module_manager_unregister_thread(struct module_manager *mod_mgr, int thread_id);
// return -1 on error
int module_manager_get_thread_id(struct module_manager *mod_mgr);
-struct mq_runtime *module_manager_get_mq_runtime(struct module_manager *mod_mgr);
struct module *module_manager_get_module(struct module_manager *mod_mgr, const char *module_name);
int module_manager_get_max_thread_num(struct module_manager *mod_mgr);
const char *module_manager_get_toml_path(struct module_manager *mod_mgr);
-struct mq_schema *module_manager_get_mq_schema(struct module_manager *mod_mgr);
struct logger *module_manager_get_logger(struct module_manager *mod_mgr);
/*******************************************
diff --git a/include/stellar/mq.h b/include/stellar/mq.h
deleted file mode 100644
index c273658..0000000
--- a/include/stellar/mq.h
+++ /dev/null
@@ -1,55 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-struct mq_schema;
-struct mq_schema *mq_schema_new();
-void mq_schema_free(struct mq_schema *s);
-
-typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg);
-typedef void on_msg_cb_func(int topic_id, void *msg, void *on_msg_arg);
-typedef void on_msg_dispatch_cb_func(int topic_id,
- void *msg,
- on_msg_cb_func *on_msg_cb,
- void *on_msg_cb_arg,
- void *dispatch_arg);
-
-//return topic_id
-int mq_schema_create_topic(struct mq_schema *s,
- const char *topic_name,
- on_msg_dispatch_cb_func *on_dispatch_cb,
- void *on_dispatch_arg,
- mq_msg_free_cb_func *msg_free_cb,
- void *msg_free_arg);
-
-int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name);
-
-int mq_schema_update_topic(struct mq_schema *s,
- int topic_id,
- on_msg_dispatch_cb_func *on_dispatch_cb,
- void *on_dispatch_arg,
- mq_msg_free_cb_func *msg_free_cb,
- void *msg_free_arg);
-
-int mq_schema_destroy_topic(struct mq_schema *s, int topic_id);
-
-//return 0 if success, otherwise return -1.
-int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg);
-
-
-struct mq_runtime;
-struct mq_runtime *mq_runtime_new(struct mq_schema *s);
-void mq_runtime_defer(struct mq_runtime *rt);
-void mq_runtime_free(struct mq_runtime *s);
-
-// return 0 if success, otherwise return -1
-int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg);// append message to pending queue
-void mq_runtime_dispatch(struct mq_runtime *rt);// dispatch all message in pending queue, dispatched message will be append to dlq
-void mq_runtime_clean(struct mq_runtime *rt); // free all message in dlq and pending queue, during this period, publish will be disabled
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt
index e76a214..179ca25 100644
--- a/infra/CMakeLists.txt
+++ b/infra/CMakeLists.txt
@@ -1,4 +1,4 @@
-set(INFRA exdata mq tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor)
+set(INFRA exdata tuple packet_manager packet_io ip_reassembly tcp_reassembly session_manager module_manager monitor)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml ringbuf)
#set(DECODERS lpi_plus)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
diff --git a/infra/module_manager/CMakeLists.txt b/infra/module_manager/CMakeLists.txt
index 131cf93..dddde31 100644
--- a/infra/module_manager/CMakeLists.txt
+++ b/infra/module_manager/CMakeLists.txt
@@ -3,6 +3,6 @@ target_include_directories(module_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/)
target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
target_include_directories(module_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
-target_link_libraries(module_manager PUBLIC toml mq ${CMAKE_DL_LIBS})
+target_link_libraries(module_manager PUBLIC toml ${CMAKE_DL_LIBS})
add_subdirectory(test) \ No newline at end of file
diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c
index 600327d..a8a3233 100644
--- a/infra/module_manager/module_manager.c
+++ b/infra/module_manager/module_manager.c
@@ -13,11 +13,10 @@
* module manager API *
*******************************************/
-struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_t n_mod, int max_thread_num, const char *toml_path, struct mq_schema *mq_schema, struct logger *logger)
+struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_t n_mod, int max_thread_num, const char *toml_path, struct logger *logger)
{
struct module_manager *mod_mgr = CALLOC(struct module_manager, 1);
mod_mgr->config.max_thread_num=max_thread_num;
- mod_mgr->config.mq_schema=mq_schema;
mod_mgr->config.logger=logger;
if(toml_path)mod_mgr->config.toml_path=strdup(toml_path);
@@ -47,22 +46,22 @@ struct module_manager *module_manager_new(struct module_hooks mod_hooks[], size_
return mod_mgr;
}
-struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct mq_schema *mq_schema, struct logger *logger)
+struct module_manager *module_manager_new_with_toml(const char *toml_path, int max_thread_num, struct logger *logger)
{
FILE *fp=fopen(toml_path, "r");
- if(!fp)return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger);
+ if(!fp)return module_manager_new(NULL, 0, max_thread_num, toml_path,logger);
toml_table_t *conf = toml_parse_file(fp, NULL, 0);
fclose(fp);
- if(conf==NULL)return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger);
+ if(conf==NULL)return module_manager_new(NULL, 0, max_thread_num, toml_path, logger);
toml_array_t* mod_array = toml_array_in(conf, "module");
if(mod_array==NULL)
{
toml_free(conf);
- return module_manager_new(NULL, 0, max_thread_num, toml_path, mq_schema, logger);
+ return module_manager_new(NULL, 0, max_thread_num, toml_path, logger);
}
int mod_num = toml_array_nelem(mod_array);
@@ -128,7 +127,7 @@ struct module_manager *module_manager_new_with_toml(const char *toml_path, int m
}
toml_free(conf);
- return module_manager_new(mod_hooks, mod_num, max_thread_num, toml_path, mq_schema, logger);
+ return module_manager_new(mod_hooks, mod_num, max_thread_num, toml_path, logger);
}
@@ -164,11 +163,6 @@ int module_manager_get_max_thread_num(struct module_manager*mod_mgr)
return mod_mgr->config.max_thread_num;
}
-struct mq_schema *module_manager_get_mq_schema(struct module_manager *mod_mgr)
-{
- if(mod_mgr==NULL)return NULL;
- return mod_mgr->config.mq_schema;
-}
struct logger *module_manager_get_logger(struct module_manager *mod_mgr)
{
@@ -195,11 +189,9 @@ struct mq_runtime *module_manager_get_mq_runtime(struct module_manager *mod_mgr
return local_mq_rt;
}
-void module_manager_register_thread(struct module_manager* mod_mgr, int thread_id, struct mq_runtime *mq_rt)
+void module_manager_register_thread(struct module_manager* mod_mgr, int thread_id)
{
local_thread_id=thread_id;
- local_mq_rt=mq_rt;
-
for(int i=0; i<mod_mgr->n_descriptor; i++)
{
if(mod_mgr->descriptors[i].mod == NULL)break;
diff --git a/infra/module_manager/module_manager_interna.h b/infra/module_manager/module_manager_interna.h
index 1b27bd7..1022da5 100644
--- a/infra/module_manager/module_manager_interna.h
+++ b/infra/module_manager/module_manager_interna.h
@@ -7,10 +7,7 @@ extern "C"
#include "stellar/module.h"
-#include "stellar/mq.h"
-
#include <limits.h>
-
#include <stdbool.h>
struct module
@@ -43,7 +40,6 @@ struct module_manager
{
char *toml_path;
int max_thread_num;
- struct mq_schema *mq_schema;
struct logger *logger;
}config;
diff --git a/infra/module_manager/test/gtest_module_manager_main.cpp b/infra/module_manager/test/gtest_module_manager_main.cpp
index 32d5f0c..7d9760b 100644
--- a/infra/module_manager/test/gtest_module_manager_main.cpp
+++ b/infra/module_manager/test/gtest_module_manager_main.cpp
@@ -23,7 +23,6 @@ const char *gtest_mock_spec_toml =
TEST(module_manager_internal, stellar_module_manager_new_with_toml) {
- struct mq_schema *mq_schema=NULL;
char toml_template[] = "./stellar.toml.XXXXXX";
int fd = mkstemp(toml_template);
@@ -31,16 +30,14 @@ TEST(module_manager_internal, stellar_module_manager_new_with_toml) {
write(fd, gtest_mock_spec_toml, strlen(gtest_mock_spec_toml));
close(fd);
- struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, mq_schema, NULL);
+ struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL);
EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10);
- EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema);
EXPECT_STREQ(module_manager_get_toml_path(mod_mgr), toml_template);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered
- EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL);
module_manager_free(mod_mgr);
@@ -71,14 +68,11 @@ TEST(stellar_module, basic_new_and_free) {
TEST(stellar_module_manager, new_with_null_toml) {
- struct mq_schema *mq_schema=NULL;
- struct module_manager *mod_mgr = module_manager_new_with_toml(NULL, 10, mq_schema, NULL);
+ struct module_manager *mod_mgr = module_manager_new_with_toml(NULL, 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL);
EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10);
- EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema);
- EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered
module_manager_free(mod_mgr);
@@ -86,41 +80,32 @@ TEST(stellar_module_manager, new_with_null_toml) {
TEST(stellar_module_manager, new_with_empty_toml) {
- struct mq_schema *mq_schema=NULL;
- struct module_manager *mod_mgr = module_manager_new_with_toml("/dev/null", 10, mq_schema, NULL);
+ struct module_manager *mod_mgr = module_manager_new_with_toml("/dev/null", 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
EXPECT_TRUE(module_manager_get_module(mod_mgr, "test")==NULL);
EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10);
- EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered
- EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL);
module_manager_free(mod_mgr);
}
TEST(stellar_module_manager, register_thread) {
- struct mq_schema *mq_schema=(struct mq_schema*)1;
- struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, mq_schema, NULL);
+ struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
- EXPECT_EQ((long)module_manager_get_mq_schema(mod_mgr), 1);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);// no thread registered
- EXPECT_TRUE(module_manager_get_mq_runtime(mod_mgr)==NULL);
- struct mq_runtime *mq_rt = (struct mq_runtime*)2;
- module_manager_register_thread(mod_mgr, 1, mq_rt);
+ module_manager_register_thread(mod_mgr, 1);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), 1);
- EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 2);
module_manager_unregister_thread(mod_mgr, 1);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);
- EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0);
module_manager_free(mod_mgr);
@@ -147,7 +132,6 @@ extern "C" void gtest_module_exit(struct module_manager *mod_mgr, struct module
EXPECT_EQ(module_manager_get_module(mod_mgr, "gtest"), mod);
EXPECT_EQ(module_manager_get_thread_id(mod_mgr), -1);
- EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0);
module_free(mod);
}
@@ -174,7 +158,6 @@ const char *gtest_module_spec_toml =
TEST(module_manager, basic_module) {
- struct mq_schema *mq_schema=(struct mq_schema *)1;
char toml_template[] = "./stellar.toml.XXXXXX";
int fd = mkstemp(toml_template);
@@ -182,24 +165,20 @@ TEST(module_manager, basic_module) {
write(fd, gtest_module_spec_toml, strlen(gtest_module_spec_toml));
close(fd);
- struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, mq_schema, NULL);
+ struct module_manager *mod_mgr=module_manager_new_with_toml(toml_template, 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
EXPECT_TRUE(module_manager_get_module(mod_mgr, "gtest")!=NULL);
EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10);
- EXPECT_EQ((long)module_manager_get_mq_schema(mod_mgr), 1);
EXPECT_STREQ(module_manager_get_toml_path(mod_mgr), toml_template);
- struct mq_runtime *mq_rt = (struct mq_runtime*)2;
- module_manager_register_thread(mod_mgr, 1, mq_rt);
+ module_manager_register_thread(mod_mgr, 1);
EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), 1);
- EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 2);
module_manager_unregister_thread(mod_mgr, 1);
EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), -1);
- EXPECT_EQ((long)module_manager_get_mq_runtime(mod_mgr), 0);
module_manager_free(mod_mgr);
unlink(toml_template);
@@ -230,25 +209,20 @@ struct test_module_polling_env
TEST(module_manager, basic_polling_module) {
- struct mq_schema *mq_schema=mq_schema_new();
-
- struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, mq_schema, NULL);
+ struct module_manager *mod_mgr=module_manager_new_with_toml(NULL, 10, NULL);
EXPECT_TRUE(mod_mgr!=NULL);
EXPECT_EQ(module_manager_get_max_thread_num(mod_mgr), 10);
- EXPECT_EQ(module_manager_get_mq_schema(mod_mgr), mq_schema);
struct test_module_polling_env env={};
env.N_round=10;
module_manager_register_polling_node(mod_mgr, test_module_on_polling, &env);
- struct mq_runtime *mq_rt = mq_runtime_new(mq_schema);
- module_manager_register_thread(mod_mgr, 1, mq_rt);
+ module_manager_register_thread(mod_mgr, 1);
EXPECT_EQ((long)module_manager_get_thread_id(mod_mgr), 1);
- EXPECT_EQ(module_manager_get_mq_runtime(mod_mgr), mq_rt);
for(int i=0; i<env.N_round; i++)
{
@@ -257,8 +231,6 @@ TEST(module_manager, basic_polling_module) {
module_manager_unregister_thread(mod_mgr, 1);
- mq_runtime_free(mq_rt);
- mq_schema_free(mq_schema);
module_manager_free(mod_mgr);
EXPECT_EQ(env.polling_count, env.N_round+env.polling_active_count);
diff --git a/infra/mq/CMakeLists.txt b/infra/mq/CMakeLists.txt
deleted file mode 100644
index 655fe76..0000000
--- a/infra/mq/CMakeLists.txt
+++ /dev/null
@@ -1,3 +0,0 @@
-add_library(mq mq.c)
-
-add_subdirectory(test) \ No newline at end of file
diff --git a/infra/mq/mq.c b/infra/mq/mq.c
deleted file mode 100644
index 33d64df..0000000
--- a/infra/mq/mq.c
+++ /dev/null
@@ -1,279 +0,0 @@
-#include "mq_internal.h"
-#include "stellar/utils.h"
-#include "uthash/utlist.h"
-
-/*******************************
- * STELLAR MQ *
- *******************************/
-static void mq_topic_schema_copy(void *_dst, const void *_src)
-{
- struct mq_topic *dst = (struct mq_topic *)_dst,
- *src = (struct mq_topic *)_src;
- memcpy(_dst, _src, sizeof(struct mq_topic));
- dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
-}
-
-static void mq_topic_schema_dtor(void *_elt)
-{
- struct mq_topic *elt = (struct mq_topic *)_elt;
- if (elt->topic_name)
- FREE(elt->topic_name);
- // FREE(elt); // free the item
-}
-
-UT_icd mq_topic_schema_icd = {sizeof(struct mq_topic), NULL, mq_topic_schema_copy, mq_topic_schema_dtor};
-
-int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name)
-{
- if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1;
- unsigned int len = utarray_len(s->topic_array);
- struct mq_topic *topic;
- for(unsigned int i = 0; i < len; i++)
- {
- topic = (struct mq_topic *)utarray_eltptr(s->topic_array, i);
- if(strcmp(topic->topic_name, topic_name) == 0)
- {
- return i;
- }
- }
- return -1;
-}
-
-static struct mq_topic *mq_schema_get_topic(struct mq_schema *s, int topic_id)
-{
- if(s==NULL || s->topic_array == NULL || topic_id < 0)return NULL;
- unsigned int len = utarray_len(s->topic_array);
- if (len <= (unsigned int)topic_id)return NULL;
- return (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
-}
-
-int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct mq_topic *topic = mq_schema_get_topic(s, topic_id);
- if(topic == NULL)return -1;
- topic->dispatch_cb=on_dispatch_cb;
- topic->dispatch_cb_arg=on_dispatch_arg;
- topic->free_cb=msg_free_cb;
- topic->free_cb_arg=msg_free_arg;
- return 0;
-}
-
-int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- if(s==NULL)return -1;
- if(s->topic_array == NULL)
- {
- utarray_new(s->topic_array, &mq_topic_schema_icd);
- }
- unsigned int len = utarray_len(s->topic_array);
- if(mq_schema_get_topic_id(s, topic_name) >= 0)
- {
- return -1;
- }
- struct mq_topic topic={};
- topic.dispatch_cb=on_dispatch_cb;
- topic.free_cb=msg_free_cb;
- topic.topic_name=(char *)topic_name;
- topic.topic_id=len;//topid_id equals arrary index
- topic.dispatch_cb_arg=on_dispatch_arg;
- topic.free_cb_arg=msg_free_arg;
- topic.subscribers=NULL;
- topic.subscriber_cnt=0;
- utarray_push_back(s->topic_array, &topic);
- s->mq_topic_num+=1;
- return topic.topic_id;
-}
-
-int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
-{
- struct mq_topic *topic = mq_schema_get_topic(s, topic_id);
- if(topic == NULL)return -1;
- if (topic->is_destroyed == 1)return 0;
-
- struct mq_subscriber *sub_elt, *sub_tmp;
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- DL_DELETE(topic->subscribers, sub_elt);
- FREE(sub_elt);
- }
- topic->is_destroyed = 1;
- s->mq_topic_num-=1;
- return 1; // success
-}
-
-
-static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt)
-{
- struct mq_subscriber *sub_elt, *sub_tmp;
- if(topic==NULL || mq_elt==NULL)return -1;
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- if (sub_elt->msg_cb)
- {
- if (topic->dispatch_cb)
- topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg,
- topic->dispatch_cb_arg);
- else
- sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
- }
- }
- return 0;
-}
-
-
-
-int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg)
-{
- if(rt==NULL || rt->schema == NULL)return -1;
-
- struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id);
- if(topic==NULL)return -1;
-
- struct mq_message mq_elt;
- mq_elt.rt=rt;
- mq_elt.header.topic_id = topic_id;
- mq_elt.body = msg;
- mq_dispatch_one_message(topic, &mq_elt);
- if (topic->free_cb)
- {
- topic->free_cb(mq_elt.body, topic->free_cb_arg);
- }
- return 0;
-}
-
-void mq_runtime_clean(struct mq_runtime *rt)
-{
- if(rt==NULL)return;
-
- struct mq_message *mq_elt, *tmp;
- struct mq_topic *topic;
- rt->is_cleaning=true;
-
- for (int i = 0; i < MQ_MAX; i++)
- {
- DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp)
- {
- topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
- if (topic && topic->free_cb)
- {
- topic->free_cb(mq_elt->body, topic->free_cb_arg);
- }
- DL_DELETE(rt->mq[i], mq_elt);
- rt->mq_len[i] -= 1;
- FREE(mq_elt);
- }
- }
- rt->is_cleaning=false;
-}
-
-void mq_runtime_dispatch(struct mq_runtime *rt)
-{
- struct mq_topic *topic=NULL;
- struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
- while (rt->mq_len[MQ_MAILBOX])
- {
- DL_FOREACH_SAFE(rt->mq[MQ_MAILBOX], mq_elt, mq_tmp)
- {
- DL_DELETE(rt->mq[MQ_MAILBOX], mq_elt);
- rt->mq_len[MQ_MAILBOX] -= 1;
- topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
- mq_dispatch_one_message(topic, mq_elt);
- if (rt->defer_enabled==true)
- {
- DL_APPEND(rt->mq[MQ_DEATH_LETTER], mq_elt); // move to dlq list
- rt->mq_len[MQ_DEATH_LETTER] += 1;
- }
- else
- {
- if(topic->free_cb)topic->free_cb(mq_elt->body, topic->free_cb_arg);
- FREE(mq_elt);
- }
- }
- }
- mq_runtime_clean(rt);
- return;
-}
-
-//return 0 if success, otherwise return -1.
-int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
-{
- struct mq_topic *topic = mq_schema_get_topic(s, topic_id);
- if(topic==NULL)return -1;
-
- struct mq_subscriber *new_subscriber = CALLOC(struct mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->msg_cb = on_msg_cb;
- new_subscriber->msg_cb_arg = on_msg_cb_arg;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- topic->subscriber_cnt+=1;
- s->mq_topic_subscriber_num+=1;
- return 0;
-}
-
-int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data)
-{
- if(rt==NULL || rt->schema == NULL)return -1;
- if(rt->is_cleaning==true)return -1;
-
- struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id);
- if(topic==NULL)return -1;
-
- struct mq_message *msg= CALLOC(struct mq_message,1);
- msg->rt=rt;
- msg->header.topic_id = topic_id;
- msg->body = data;
- DL_APPEND(rt->mq[MQ_MAILBOX], msg);
- rt->mq_len[MQ_MAILBOX]+=1;
-
- if(rt->defer_enabled==false)
- {
- mq_runtime_dispatch(rt);
- }
-
- return 0;
-}
-
-struct mq_schema *mq_schema_new()
-{
- struct mq_schema *s = CALLOC(struct mq_schema,1);
- return s;
-}
-
-void mq_schema_free(struct mq_schema *s)
-{
- if(s==NULL)return;
- if(s->topic_array)
- {
- for (unsigned int i = 0; i < utarray_len(s->topic_array); i++)
- {
- mq_schema_destroy_topic(s, i);
- }
- utarray_free(s->topic_array);
- }
- FREE(s);
- return;
-}
-
-struct mq_runtime *mq_runtime_new(struct mq_schema *s)
-{
- if(s==NULL)return NULL;
- struct mq_runtime *rt = CALLOC(struct mq_runtime,1);
- rt->schema=s;
- rt->is_cleaning=false;
- return rt;
-}
-
-void mq_runtime_defer(struct mq_runtime *rt)
-{
- if(rt==NULL)return;
- rt->defer_enabled=true;
-}
-
-void mq_runtime_free(struct mq_runtime *rt)
-{
- if(rt==NULL)return;
- mq_runtime_clean(rt);
- FREE(rt);
-}
-
diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h
deleted file mode 100644
index d0cb695..0000000
--- a/infra/mq/mq_internal.h
+++ /dev/null
@@ -1,75 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "stellar/mq.h"
-#include "uthash/utarray.h"
-
-#include <stdbool.h>
-
-struct mq_message
-{
- struct mq_runtime *rt;
- struct
- {
- int topic_id;
- } header;
- void *body;
- struct mq_message *next, *prev;
-} __attribute__((aligned(sizeof(void *))));
-
-typedef struct mq_subscriber
-{
- int topic_subscriber_idx;
- int plugin_idx;
- on_msg_cb_func *msg_cb;
- void *msg_cb_arg;
- struct mq_subscriber *next, *prev;
-}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
-
-
-struct mq_topic
-{
- char *topic_name;
- int topic_id;
- int subscriber_cnt;
- int is_destroyed;
- on_msg_dispatch_cb_func *dispatch_cb;
- void *dispatch_cb_arg;
- mq_msg_free_cb_func *free_cb;
- void *free_cb_arg;
- struct mq_subscriber *subscribers;
-}__attribute__((aligned(sizeof(void*))));
-
-struct mq_schema
-{
- UT_array *topic_array;
- int mq_topic_num;
- int mq_topic_subscriber_num;
-};
-
-
-enum mq_property
-{
- MQ_MAILBOX = 0,
- MQ_DEATH_LETTER = 1,
- MQ_MAX,
-};
-
-struct mq_runtime
-{
- struct mq_schema *schema;
- struct mq_message *mq[MQ_MAX];// message queue
- size_t mq_len[MQ_MAX];
- bool is_cleaning;
- bool defer_enabled;
-};
-
-int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/mq/test/CMakeLists.txt b/infra/mq/test/CMakeLists.txt
deleted file mode 100644
index 6b72d4e..0000000
--- a/infra/mq/test/CMakeLists.txt
+++ /dev/null
@@ -1,14 +0,0 @@
-add_executable(gtest_mq gtest_mq_main.cpp)
-
-target_include_directories(gtest_mq PRIVATE ${CMAKE_SOURCE_DIR}/infra/)
-
-target_link_libraries(
- gtest_mq
- mq
- "-rdynamic"
- gtest
- gmock
-)
-
-include(GoogleTest)
-gtest_discover_tests(gtest_mq) \ No newline at end of file
diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp
deleted file mode 100644
index 6c8d5c8..0000000
--- a/infra/mq/test/gtest_mq_main.cpp
+++ /dev/null
@@ -1,917 +0,0 @@
-#pragma GCC diagnostic ignored "-Wunused-parameter"
-
-#include <gtest/gtest.h>
-
-#include "mq/mq_internal.h"
-
-#include "stellar/utils.h"
-
-#define TOPIC_NAME_MAX 512
-
-/*******************************************
- * TEST MQ SCHEMA *
- *******************************************/
-
-TEST(mq_schema, new_and_free) {
-
- struct mq_schema *s = mq_schema_new();
- EXPECT_TRUE(s!=NULL);
- mq_schema_free(s);
-}
-
-void mock_msg_free(void *msg, void *msg_free_arg){}
-void mock_overwrite_msg_free(void *msg, void *msg_free_arg){}
-
-TEST(mq_schema, mq_topic_create_and_update) {
-
- struct mq_schema *s = mq_schema_new();
- EXPECT_TRUE(s!=NULL);
-
- const char *topic_name="PACKET_TOPIC";
- EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), -1); // illegal topic_name
-
- int topic_id = mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s);
- EXPECT_GE(topic_id, 0);
-
- struct mq_topic *topic = NULL;
- {
- SCOPED_TRACE("White-box test, check mq_schema internal ");
- topic =
- (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
- EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
- EXPECT_EQ(topic->free_cb_arg, s);
- EXPECT_EQ(topic->topic_id, topic_id);
- EXPECT_STREQ(topic->topic_name, topic_name);
- }
-
- EXPECT_EQ(mq_schema_get_topic_id(s, topic_name), topic_id);
- EXPECT_EQ(mq_schema_create_topic(s, topic_name, NULL, NULL, mock_overwrite_msg_free, s), -1); // duplicate create, return error
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- topic =
- (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
- EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
- EXPECT_EQ(topic->free_cb_arg, s);
- EXPECT_EQ(topic->topic_id, topic_id);
- EXPECT_STREQ(topic->topic_name, topic_name);
- }
-
- EXPECT_EQ(mq_schema_update_topic(s, topic_id, NULL, NULL, mock_overwrite_msg_free, s), 0);
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- topic =
- (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
- EXPECT_EQ(topic->free_cb, (void *)mock_overwrite_msg_free);
- EXPECT_EQ(topic->free_cb_arg, s);
- EXPECT_EQ(topic->topic_id, topic_id);
- EXPECT_STREQ(topic->topic_name, topic_name);
- EXPECT_EQ(utarray_len(s->topic_array), 1);
- }
-
- EXPECT_EQ(mq_schema_destroy_topic(s, 10), -1); // illgeal topic_id
-
- EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 1);
- EXPECT_EQ(mq_schema_destroy_topic(s, topic_id), 0); // duplicate destroy, return 0;
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(s->topic_array), 1); // destory won't delete the topic schema
- }
-
- mq_schema_free(s);
-}
-
-void test_mock_on_packet_msg(int topic_id, void *msg, void *sub_arg){}
-void test_mock_overwrite_on_packet_msg(int topic_id, void *msg, void *sub_arg){}
-
-TEST(mq_schema, subscribe) {
-
- struct mq_schema *s = mq_schema_new();
- EXPECT_TRUE(s!=NULL);
-
- const char *topic_name="PACKET_TOPIC";
-
- int topic_id=mq_schema_create_topic(s, topic_name, NULL, NULL, mock_msg_free, s);
- EXPECT_GE(topic_id, 0);
-
- EXPECT_EQ(mq_schema_subscribe(s, 10, test_mock_on_packet_msg, s),-1);//illgeal topic_id
-
- EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_on_packet_msg, s),0);
- EXPECT_EQ(mq_schema_subscribe(s, topic_id, test_mock_overwrite_on_packet_msg, s),0);//duplicate subscribe
-
- struct mq_topic *topic;
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
- EXPECT_EQ(topic->free_cb, (void *)mock_msg_free);
- EXPECT_EQ(topic->free_cb_arg, s);
- EXPECT_EQ(topic->topic_id, topic_id);
- EXPECT_STREQ(topic->topic_name, topic_name);
- }
-
- EXPECT_EQ(topic->subscriber_cnt, 2);
- EXPECT_EQ(topic->subscribers->msg_cb, (void *)test_mock_on_packet_msg);
- EXPECT_EQ(topic->subscribers->next->msg_cb, (void *)test_mock_overwrite_on_packet_msg);
-
- mq_schema_free(s);
-}
-
-/*******************************************
- * TEST MQ RUNTIME *
- *******************************************/
-
-TEST(mq_runtime, new_and_free) {
-
- struct mq_schema *s = mq_schema_new();
- EXPECT_TRUE(s!=NULL);
-
- struct mq_runtime *rt = mq_runtime_new(s);
- EXPECT_TRUE(rt!=NULL);
-
- mq_runtime_free(rt);
- mq_schema_free(s);
-
-}
-
-struct test_pub_and_clean_env
-{
- struct mq_schema *s;
- struct mq_runtime *rt;
- int N_round;
- int current_round;
- int topic_id;
- int on_msg_free_called;
- int on_msg_called;
-};
-
-void test_pub_and_clean_free(void *msg, void *msg_free_arg)
-{
- struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)msg_free_arg;
- env->on_msg_free_called+=1;
- FREE(msg);
- return;
-}
-
-void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg)
-{
- struct test_pub_and_clean_env *env = (struct test_pub_and_clean_env *)sub_arg;
- env->on_msg_called+=1;
- return;
-}
-
-TEST(mq_runtime, defer_pub_then_clean) {
-
- struct test_pub_and_clean_env env={};
- env.s = mq_schema_new();
- EXPECT_TRUE(env.s!=NULL);
-
- env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_and_clean_free , &env);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_and_clean_on_msg, &env), 0);
-
- env.N_round=10;
- env.rt=mq_runtime_new(env.s);
- EXPECT_TRUE(env.rt!=NULL);
-
- mq_runtime_defer(env.rt);
-
- for(int i=0; i<env.N_round;i++)
- {
- env.current_round=i;
- void *msg = CALLOC(int, 1);
- EXPECT_TRUE(msg!=NULL);
- *(int *)msg=i;
- EXPECT_EQ(mq_runtime_publish_message(env.rt, env.topic_id, msg), 0);
- mq_runtime_clean(env.rt);
- }
-
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- EXPECT_EQ(env.on_msg_free_called, env.N_round);
- EXPECT_EQ(env.on_msg_called, 0);
-
-}
-
-#define PACKET_EXDATA_NUM 2
-#define PACKET_TOPIC_NUM 2
-#define PACKET_MQ_SUB_NUM 2
-struct mock_packet_mq_env
-{
- struct mq_schema *s;
- struct mq_runtime *rt;
- int basic_on_packet_called;
- int exdata_set_on_packet_called;
- int exdata_get_on_packet_called;
- unsigned int packet_exdata_idx[PACKET_EXDATA_NUM];
- int exdata_free_called[PACKET_EXDATA_NUM];
- unsigned int packet_topic_id[PACKET_TOPIC_NUM];
- unsigned int packet_mq_sub_module_id[PACKET_MQ_SUB_NUM];
- int msg_pub_cnt;
- int msg_sub_cnt;
- int msg_free_cnt;
-};
-
-struct mock_packet_message
-{
- unsigned char ip_proto;
- int topic_in;
- int topic_out;
-};
-
-static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
-{
- struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)msg_free_arg;
- env->msg_free_cnt+=1;
- return;
-}
-
-static void test_mq_on_packet_topic_msg(int topic_id, void *msg, void *sub_arg)
-{
- struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg;
- EXPECT_TRUE(env!=NULL);
- env->msg_sub_cnt+=1;
- return;
-}
-
-static void test_mq_on_packet_in_out(int topic_id, void *msg, void *sub_arg)
-{
- struct mock_packet_mq_env *env = (struct mock_packet_mq_env *)sub_arg;
- EXPECT_TRUE(env!=NULL);
- int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
- for(int i=0; i<topic_id_num; i++)
- {
- EXPECT_EQ(mq_runtime_publish_message(env->rt, env->packet_topic_id[i], msg), 0);
- env->msg_pub_cnt+=1;
- }
- return;
-}
-
-TEST(mq_runtime, basic_pub_sub) {
-
- struct mq_schema *s = mq_schema_new();
- EXPECT_TRUE(s!=NULL);
-
- struct mock_packet_mq_env env;
- memset(&env, 0, sizeof(struct mock_packet_mq_env));
- env.s=s;
- char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
-
- int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
-
-
-
- for(int i=0; i<topic_id_num; i++)
- {
- sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=mq_schema_create_topic(s, topic_name[i], NULL, NULL, test_packet_msg_free_cb_func, &env);
- EXPECT_GE(env.packet_topic_id[i], 0);
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(
- s->topic_array, env.packet_topic_id[i]);
- EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func);
- EXPECT_EQ(topic->free_cb_arg, &env);
- EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
- EXPECT_STREQ(topic->topic_name, topic_name[i]);
- }
- }
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(s->topic_array), topic_id_num);
- }
-
- int topic_sub_num=(int)(sizeof(env.packet_mq_sub_module_id) / sizeof(env.packet_mq_sub_module_id[0]));
-
- for (int i = 0; i < topic_sub_num; i++)
- {
- for(int j = 0; j < topic_id_num; j++)
- {
- EXPECT_EQ(mq_schema_subscribe(s, env.packet_topic_id[j], test_mq_on_packet_topic_msg, &env), 0);
- }
- }
-
-
- int packet_in_topic_id=mq_schema_create_topic(s, "PACKET_IN", NULL, NULL, NULL, &env);
- int packet_out_topic_id=mq_schema_create_topic(s, "PACKET_OUT", NULL, NULL, NULL, &env);
-
- mq_schema_subscribe(s, packet_in_topic_id, test_mq_on_packet_in_out, &env);
- mq_schema_subscribe(s, packet_out_topic_id, test_mq_on_packet_in_out, &env);
- struct mock_packet_message pkt={6, packet_in_topic_id, packet_out_topic_id};
-
- struct mq_runtime *rt = mq_runtime_new(s);
- EXPECT_TRUE(rt!=NULL);
- env.rt=rt;
-
- int N_packet=1;
- for (int i = 0; i < N_packet; i++)
- {
- mq_runtime_publish_message(rt, packet_in_topic_id, &pkt);
- mq_runtime_dispatch(rt);
- mq_runtime_publish_message(rt, packet_out_topic_id, &pkt);
- mq_runtime_dispatch(rt);
- }
-
- mq_runtime_free(rt);
- mq_schema_free(s);
-
- EXPECT_EQ(N_packet*2*topic_id_num, env.msg_pub_cnt);
- EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
- EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
-
-
-}
-
-struct test_pub_on_free_env
-{
- mq_schema *s;
- mq_runtime *rt;
- int N_round;
- int current_round;
- int topic_id;
- int on_msg_free_called;
- int on_msg_called;
-};
-
-
-static void test_pub_on_msg_free(void *msg, void *msg_free_arg)
-{
- struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)msg_free_arg;
- env->on_msg_free_called+=1;
- if((long)msg!=env->N_round && (int)(long)msg==env->N_round-1)
- {
- EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void *)(long)(env->N_round)), -1);//on message free, publish always failed
- EXPECT_EQ(mq_runtime_publish_message_immediate(env->rt, env->topic_id, (void *)(long)(env->N_round)), 0);//on message free, publish at once success
- }
- return;
-}
-
-static void test_pub_on_free_on_msg(int topic_id, void *msg, void *sub_arg)
-{
- struct test_pub_on_free_env *env = (struct test_pub_on_free_env *)sub_arg;
- env->on_msg_called+=1;
- if((int)(long)msg==env->N_round)
- {
- EXPECT_EQ(env->on_msg_called, env->N_round+1);
- EXPECT_EQ(env->current_round, env->N_round-1);
- }
- return;
-}
-
-TEST(mq_runtime, pub_on_msg_free)
-{
- struct test_pub_on_free_env env={};
- env.s=mq_schema_new();
- EXPECT_TRUE(env.s!=NULL);
- env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_pub_on_msg_free , &env);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_pub_on_free_on_msg, &env), 0);
-
- env.N_round=10;
- env.rt=mq_runtime_new(env.s);
- EXPECT_TRUE(env.rt!=NULL);
- mq_runtime_defer(env.rt);
- for(int i=0; i<env.N_round;i++)
- {
- env.current_round=i;
- EXPECT_EQ(mq_runtime_publish_message(env.rt, env.topic_id, (void *)(long)i), 0);
- mq_runtime_dispatch(env.rt);
- }
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- EXPECT_EQ(env.on_msg_free_called, env.N_round+1);
- EXPECT_EQ(env.on_msg_called, env.N_round+1);
-}
-
-
-struct test_dispatch_on_msg_env
-{
- mq_schema *s;
- mq_runtime *rt;
- int N_round;
- int current_round;
- int topic_id;
- int on_msg_free_called;
- int on_msg_called;
-};
-
-
-static void test_dispatch_on_msg_free(void *msg, void *msg_free_arg)
-{
- struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)msg_free_arg;
- env->on_msg_free_called+=1;
- return;
-}
-
-static void test_dispatch_on_msg(int topic_id, void *msg, void *sub_arg)
-{
- struct test_dispatch_on_msg_env *env = (struct test_dispatch_on_msg_env *)sub_arg;
- env->on_msg_called+=1;
- if(env->current_round==(long)msg)
- {
- EXPECT_EQ(mq_runtime_publish_message(env->rt, env->topic_id, (void*)(long)env->N_round), 0);
- }
- else
- {
- EXPECT_EQ((long)msg, env->N_round);
- }
- mq_runtime_dispatch(env->rt);
- return;
-}
-
-TEST(mq_runtime, call_dispatch_when_dispatch)
-{
- struct test_dispatch_on_msg_env env={};
- env.s=mq_schema_new();
- EXPECT_TRUE(env.s!=NULL);
- env.topic_id=mq_schema_create_topic(env.s,"TEST", NULL, NULL, test_dispatch_on_msg_free , &env);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.topic_id, test_dispatch_on_msg, &env), 0);
-
- env.N_round=10;
- env.rt=mq_runtime_new(env.s);
- EXPECT_TRUE(env.rt!=NULL);
- for(int i=0; i<env.N_round;i++)
- {
- env.current_round=i;
- EXPECT_EQ(mq_runtime_publish_message(env.rt, env.topic_id, (void *)(long)i), 0);
- mq_runtime_dispatch(env.rt);
- }
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- EXPECT_EQ(env.on_msg_free_called, env.N_round*2);
- EXPECT_EQ(env.on_msg_called, env.N_round*2);
-}
-
-struct mock_session_message
-{
- int id;
- int cnt;
-};
-
-struct mock_session_mq_env
-{
- struct mq_schema *s;
- struct mq_runtime *rt;
- int N_session;
- struct mock_session_message sess[1024];
- int intrinsc_tcp_input_topic_id;
- int basic_on_tcp_called;
- int sess_dispatch_called;
- int test_mq_sub_called;
- int sess_msg_free_called;
-};
-
-#define TOPIC_TCP "TCP"
-typedef void mock_on_session_msg_cb_func(int topic_id, struct mock_session_message *sess, void *module_ctx);
-
-static void mock_on_msg_dispatch(int topic_id,
- void *msg,
- on_msg_cb_func* on_msg_cb,
- void *on_msg_cb_arg,
- void *dispatch_arg)
-{
- mock_on_session_msg_cb_func *session_cb = (mock_on_session_msg_cb_func *)on_msg_cb;
- struct mock_session_message *sess=(struct mock_session_message *)msg;
- EXPECT_TRUE(dispatch_arg==NULL);
- session_cb(topic_id, sess, on_msg_cb_arg);
- struct mock_session_mq_env *env=(struct mock_session_mq_env *)on_msg_cb_arg;
- env->sess_dispatch_called+=1;
-}
-
-static void mock_tcp_session_msg_free(void *msg, void *msg_free_arg)
-{
- struct mock_session_mq_env *env=(struct mock_session_mq_env *)msg_free_arg;
- env->sess_msg_free_called+=1;
-
-}
-static int mock_tcp_session_subscribe(struct mock_session_mq_env *env, mock_on_session_msg_cb_func *on_session_cb)
-{
- int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP);
- if(topic_id<0)
- {
- topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, mock_on_msg_dispatch, NULL, mock_tcp_session_msg_free, env);
- }
- return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env);
-}
-
-static void test_basic_on_tcp_session(int topic_id, struct mock_session_message *sess, void *sub_arg)
-{
- struct mock_session_mq_env *env = (struct mock_session_mq_env *)sub_arg;
- EXPECT_TRUE(env!=NULL);
- if(sess)
- {
- env->basic_on_tcp_called+=1;
- }
- return;
-}
-
-TEST(mq_runtime, sub_with_dispatch_cb) {
-
- struct mock_session_mq_env env;
- memset(&env, 0, sizeof(env));
- env.N_session=10;
-
- env.s=mq_schema_new();
-
- EXPECT_EQ(mock_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0);
- env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP);
-
- env.rt=mq_runtime_new(env.s);
-
- for(int i=0; i <env.N_session; i++)
- {
- env.sess[i].id=i;
- mq_runtime_publish_message(env.rt, env.intrinsc_tcp_input_topic_id, &env.sess[i]);
- env.sess[i].cnt+=1;
- mq_runtime_dispatch(env.rt);
- }
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- EXPECT_EQ(env.basic_on_tcp_called, env.N_session);
- EXPECT_EQ(env.sess_dispatch_called, env.N_session);
- EXPECT_EQ(env.sess_msg_free_called, env.N_session);
-}
-
-#if 0
-struct test_priority_mq_env
-{
- struct mq_schema *s;
- struct mq_runtime *rt;
- int N_round;
- int current_round;
- int tcp_topic_id;
- int test_mq_topic_id;
- int plugin_id_1_called;
- int plugin_id_2_called;
-
-};
-
-//test dettach session
-static void test_session_mq_priority_plugin_1_on_msg(int topic_id, void *msg, void *plugin_env)
-{
- struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
- env->plugin_id_1_called+=1;
- if(topic_id == env->tcp_topic_id)
- {
- EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority
- if((long)msg%2==0)
- {
- EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0);
- }
- else
- {
- EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0);
- }
- }
- if(topic_id == env->test_mq_topic_id)
- {
- if (env->current_round % 2 == 0)
- {
- if (env->plugin_id_1_called % 3 == 2)
- {
- EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
- }
- if (env->plugin_id_1_called % 3 == 0)
- {
- EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
- }
- }
- else
- {
- if (env->plugin_id_1_called % 3 == 2)
- {
- EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
- }
- if (env->plugin_id_1_called % 3 == 0)
- {
- EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
- }
- }
- }
- return;
-}
-
-static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env)
-{
- struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
-
- env->plugin_id_2_called+=1;
- if(topic_id == env->tcp_topic_id)
- {
- EXPECT_EQ(env->plugin_id_2_called % 3, 1);
- // tcp msg has normal priority
- EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0);
- }
- if(topic_id == env->test_mq_topic_id)
- {
- if (env->current_round % 2 == 0)
- {
- if (env->plugin_id_2_called % 3 == 2)
- {
- EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
- }
- if (env->plugin_id_2_called % 3 == 0)
- {
- EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
- }
- }
- else
- {
- if (env->plugin_id_2_called % 3 == 2)
- {
- EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
- }
- if (env->plugin_id_2_called % 3 == 0)
- {
- EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
- }
- }
- }
- return;
-}
-
-TEST(mq_runtime, basic_mq_priority) {
-
- struct test_priority_mq_env env={};
-
- env.s=mq_schema_new();
-
- env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env);
- EXPECT_GE(env.tcp_topic_id, 0);
-
- EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
-
- env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env);
- EXPECT_GE(env.test_mq_topic_id, 0);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
- EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
-
-// mock packet and session
-
- env.rt=mq_runtime_new(env.s);
- env.N_round=10;
-
- for (int i = 0; i < env.N_round; i++)
- {
- env.current_round=i;
- mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i);
- mq_runtime_dispatch(env.rt);
-
- }
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- // publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2
- EXPECT_EQ(env.plugin_id_1_called,env.N_round*3);
- EXPECT_EQ(env.plugin_id_2_called,env.N_round*3);
-}
-#endif
-
-struct test_polling_module
-{
- int mod_id;
- int called;
-};
-
-struct test_mock_polling_env
-{
- struct mq_schema *s;
- struct mq_runtime *rt;
- int N_round;
- int current_round;
- int polling_topic_id;
- int polling_dispatch_called;
- int mod_num;
- struct test_polling_module mod[1024];
-};
-
-#define TOPIC_POLLING "POLLING"
-
-typedef void mock_on_polling_cb_func(void *polling_arg);
-
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wcast-function-type"
-
-static void mock_on_polling_dispatch(int topic_id,
- void *msg,
- on_msg_cb_func* on_msg_cb,
- void *on_msg_cb_arg,
- void *dispatch_arg)
-{
- mock_on_polling_cb_func *polling = (mock_on_polling_cb_func *)on_msg_cb;
- polling(on_msg_cb_arg);
- struct test_mock_polling_env *env=(struct test_mock_polling_env *)dispatch_arg;
- env->polling_dispatch_called+=1;
-}
-
-static int mock_polling_subscribe(struct test_mock_polling_env *env, mock_on_polling_cb_func *on_polling, void *polling_arg)
-{
- int topic_id=mq_schema_get_topic_id(env->s, TOPIC_POLLING);
- if(topic_id<0)
- {
- topic_id=mq_schema_create_topic(env->s, TOPIC_POLLING, mock_on_polling_dispatch, env, NULL, NULL);
- }
- return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_polling, polling_arg);
-}
-
-#pragma GCC diagnostic pop
-
-static int mock_polling_work(struct test_mock_polling_env *env)
-{
- mq_runtime_publish_message(env->rt, env->polling_topic_id, NULL);
- return 0;
-}
-
-
-
-static void mock_on_polling(void *polling_arg)
-{
- struct test_polling_module *mod = (struct test_polling_module *)polling_arg;
- mod->called+=1;
- if(mod->mod_id==0 && mod->called==2)
- {
- struct test_mock_polling_env *env=container_of((const test_polling_module (*)[1024])polling_arg, struct test_mock_polling_env, mod);
- mock_polling_work(env);
- }
- return;
-}
-
-TEST(mq_runtime, polling)
-{
- struct test_mock_polling_env env;
- memset(&env, 0, sizeof(env));
- env.s=mq_schema_new();
- env.mod_num=10;
-
- for(int i=0; i < env.mod_num;i++)
- {
- env.mod[i].mod_id=i;
- EXPECT_EQ(mock_polling_subscribe(&env, mock_on_polling, &env.mod[i]), 0);
- }
-
- env.polling_topic_id=mq_schema_get_topic_id(env.s, TOPIC_POLLING);
- env.rt=mq_runtime_new(env.s);
-
- env.N_round=10;
- for(int i=0; i <env.N_round; i++)
- {
- env.current_round=i;
- mq_runtime_publish_message(env.rt, env.polling_topic_id, NULL);
- mq_runtime_dispatch(env.rt);
- }
-
- mq_runtime_free(env.rt);
- mq_schema_free(env.s);
-
- EXPECT_EQ(env.polling_dispatch_called, (env.N_round+1)*env.mod_num);
- for(int i = 0; i < env.mod_num; i++)
- {
- EXPECT_EQ(env.mod[i].called, env.N_round+1);
- }
-}
-
-
-#if 0
-//TODO: test case mq for overlimit
-static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
-{
- struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
- env->msg_free_cnt+=1;
- FREE(msg);
- return;
-}
-
-static void overlimit_sub_on_packet_msg(int topic_id, const void *msg, void *plugin_env)
-{
- struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
- EXPECT_TRUE(env!=NULL);
- env->msg_sub_cnt+=1;
- return;
-}
-
-static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
-{
- struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
- EXPECT_TRUE(env!=NULL);
- //EXPECT_EQ(pkt->ip_proto, ip_protocol);
- int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
- unsigned int cnt=0;
- int *msg;
- for(int i=0; i<topic_id_num; i++)
- {
- for(unsigned int j=0; j < env->plug_mgr->max_message_dispatch; j++)
- {
- msg=CALLOC(int, 1);
- *msg=cnt;
- int pub_ret=stellar_mq_publish_message(env->plug_mgr->st, env->packet_topic_id[i], msg);
- if(cnt < env->plug_mgr->max_message_dispatch)
- {
- ASSERT_EQ(pub_ret, 0);
- env->msg_pub_cnt+=1;
- }
- else
- {
- ASSERT_EQ(pub_ret, -1);
- }
- if(pub_ret!=0)FREE(msg);
- cnt+=1;
- }
- }
- return;
-}
-
-TEST(mq_runtime, pub_overlimit) {
-
- struct stellar st={0};
- struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
- whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
-
- unsigned char ip_proto=6;
- struct packet_plugin_env env;
- memset(&env, 0, sizeof(struct packet_plugin_env));
- env.plug_mgr=plug_mgr;
- char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
-
- int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
-
- for(int i=0; i<topic_id_num; i++)
- {
- sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL,NULL,overlimit_packet_msg_free_cb_func, &env);
- EXPECT_GE(env.packet_topic_id[i], 0);
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
- plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
- EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
- EXPECT_EQ(topic->free_cb_arg, &env);
- EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
- EXPECT_STREQ(topic->topic_name, topic_name[i]);
- }
- }
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
- }
-
- int pub_plugin_id=stellar_plugin_register(&st, overlimit_pub_on_packet, NULL,&env);
- EXPECT_GE(pub_plugin_id, 0);
-
- int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
-
- for (int i = 0; i < topic_sub_num; i++)
- {
- env.packet_mq_sub_plugin_id[i] = stellar_plugin_register(&st, NULL, NULL, &env);// empty on_packet is ok
- EXPECT_GE(env.packet_mq_sub_plugin_id[i], 0);
- for(int j = 0; j < topic_id_num; j++)
- {
- EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
- }
- }
-
- {
- SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
- }
-
- struct packet pkt={&st, IPv4, ip_proto};
-
- int N_packet=10;
- for (int i = 0; i < N_packet; i++)
- {
- plugin_manager_on_packet_input(plug_mgr, &pkt);
- plugin_manager_on_packet_output(plug_mgr, &pkt);
- }
-
- plugin_manager_exit(plug_mgr);
- EXPECT_EQ(N_packet*MAX_MSG_PER_STAGE, env.msg_pub_cnt);
- EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
- EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
-}
-
-
-#endif
-
-
-/**********************************************
- * GTEST MAIN *
- **********************************************/
-
-int main(int argc, char ** argv)
-{
- int ret=0;
- ::testing::InitGoogleTest(&argc, argv);
- ret=RUN_ALL_TESTS();
- return ret;
-} \ No newline at end of file
diff --git a/infra/packet_manager/CMakeLists.txt b/infra/packet_manager/CMakeLists.txt
index 05dc5ef..592a31d 100644
--- a/infra/packet_manager/CMakeLists.txt
+++ b/infra/packet_manager/CMakeLists.txt
@@ -12,6 +12,6 @@ target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/uthash
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/logger)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
target_include_directories(packet_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra)
-target_link_libraries(packet_manager tuple logger dablooms mq exdata module_manager fieldstat4)
+target_link_libraries(packet_manager tuple logger dablooms exdata module_manager fieldstat4)
add_subdirectory(test) \ No newline at end of file
diff --git a/infra/session_manager/CMakeLists.txt b/infra/session_manager/CMakeLists.txt
index ae68fdc..044ca6d 100644
--- a/infra/session_manager/CMakeLists.txt
+++ b/infra/session_manager/CMakeLists.txt
@@ -14,6 +14,6 @@ add_library(session_manager
target_include_directories(session_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/infra/)
target_include_directories(session_manager PUBLIC ${CMAKE_SOURCE_DIR}/include)
-target_link_libraries(session_manager timeout packet_manager tcp_reassembly mq exdata fieldstat4 monitor)
+target_link_libraries(session_manager timeout packet_manager tcp_reassembly exdata fieldstat4 monitor)
add_subdirectory(test) \ No newline at end of file
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index 52d9501..3a18cf2 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -35,7 +35,6 @@ struct stellar
uint64_t need_exit;
struct logger *logger;
struct packet_io *pkt_io;
- struct mq_schema *mq_schema;
struct module_manager *mod_mgr;
struct thread threads[MAX_THREAD_NUM];
};
@@ -51,7 +50,6 @@ static void *worker_thread(void *arg)
struct stellar *st = thread->st;
struct packet_io *pkt_io = st->pkt_io;
struct module_manager *mod_mgr = st->mod_mgr;
- struct mq_runtime *mq_rt = mq_runtime_new(st->mq_schema);
struct module *pkt_mgr_mod = module_manager_get_module(mod_mgr, PACKET_MANAGER_MODULE_NAME);
struct packet_manager *pkt_mgr = module_to_packet_manager(pkt_mgr_mod);
@@ -59,7 +57,7 @@ static void *worker_thread(void *arg)
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
__thread_local_logger = st->logger;
- module_manager_register_thread(mod_mgr, thread_id, mq_rt);
+ module_manager_register_thread(mod_mgr, thread_id);
ATOMIC_SET(&thread->is_runing, 1);
CORE_LOG_FATAL("worker thread %d runing", thread_id);
@@ -90,7 +88,6 @@ static void *worker_thread(void *arg)
CORE_LOG_FATAL("worker thread %d cleaning", thread_id);
module_manager_unregister_thread(mod_mgr, thread_id);
- mq_runtime_free(mq_rt);
ATOMIC_SET(&thread->is_runing, 0);
CORE_LOG_FATAL("worker thread %d exit", thread_id);
@@ -218,14 +215,7 @@ struct stellar *stellar_new(const char *toml_file)
goto error_out;
}
- st->mq_schema = mq_schema_new();
- if (st->mq_schema == NULL)
- {
- CORE_LOG_ERROR("unable to create mq schema");
- goto error_out;
- }
-
- st->mod_mgr = module_manager_new(mod_hooks, count_of(mod_hooks),st->thread_num, toml_file, st->mq_schema, st->logger);
+ st->mod_mgr = module_manager_new(mod_hooks, count_of(mod_hooks),st->thread_num, toml_file, st->logger);
if (st->mod_mgr == NULL)
{
CORE_LOG_ERROR("unable to create packet manager");
@@ -287,7 +277,6 @@ void stellar_free(struct stellar *st)
{
packet_io_free(st->pkt_io);
module_manager_free(st->mod_mgr);
- mq_schema_free(st->mq_schema);
CORE_LOG_FATAL("stellar exit\n");
log_free(st->logger);