summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/stellar/stellar_mq.h11
-rw-r--r--infra/plugin_manager/plugin_manager.c160
-rw-r--r--infra/plugin_manager/plugin_manager_interna.h7
-rw-r--r--infra/plugin_manager/test/plugin_manager_gtest_main.cpp66
4 files changed, 119 insertions, 125 deletions
diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h
index 4780990..b2a5f44 100644
--- a/include/stellar/stellar_mq.h
+++ b/include/stellar/stellar_mq.h
@@ -16,10 +16,13 @@ inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unus
if(msg)FREE(msg);
}
+typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env);
+typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *sub_plugin_env);
+
//return topic_id
-int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
-int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_mq_destroy_topic(struct stellar *st, int topic_id);
@@ -31,9 +34,9 @@ enum stellar_mq_priority
STELLAR_MQ_PRIORITY_MAX,
};
-typedef void on_msg_cb_func(int topic_id, const void *msg, void *plugin_env);
+
//return 0 if success, otherwise return -1.
-int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id);
+int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, int plugin_id);
int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg);
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority);
diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c
index 21af4a3..05e72f1 100644
--- a/infra/plugin_manager/plugin_manager.c
+++ b/infra/plugin_manager/plugin_manager.c
@@ -149,14 +149,14 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
- if(plug_mgr->registered_packet_plugin_array)
+ if(plug_mgr->registered_plugin_array)
{
- struct registered_packet_plugin_schema *s = NULL;
- while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
+ struct registered_plugin_schema *s = NULL;
+ while ((s = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, s)))
{
- if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info);
+ if(s->registed_mq_subscriber_info)utarray_free(s->registed_mq_subscriber_info);
}
- utarray_free(plug_mgr->registered_packet_plugin_array);
+ utarray_free(plug_mgr->registered_plugin_array);
}
plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
@@ -314,11 +314,7 @@ static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
*src = (struct stellar_mq_topic_schema *)_src;
- dst->subscribers = src->subscribers;
- dst->free_cb = src->free_cb;
- dst->free_cb_arg = src->free_cb_arg;
- dst->topic_id = src->topic_id;
- dst->subscriber_cnt = src->subscriber_cnt;
+ memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
@@ -351,7 +347,7 @@ int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
return -1;
}
-int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
@@ -360,12 +356,13 @@ int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_c
if(len < (unsigned int)topic_id)return -1;
struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
+ t_schema->dispatch_cb=on_dispatch_cb;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
-int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->stellar_mq_schema_array == NULL)
@@ -379,6 +376,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_
}
struct stellar_mq_topic_schema t_schema;
memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
+ t_schema.dispatch_cb=on_dispatch_cb;
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
@@ -415,60 +413,12 @@ int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
return 1; // success
}
-UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
-
-static int __stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
-{
- if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
-
- unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
-
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
- if(topic==NULL)return -1;
-
- // if plugin already subscribe current topic, return 0
- struct stellar_mq_subscriber_info *p=NULL;
- while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p)))
- {
- if(p->topic_id==topic_id)
- {
- struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
- int cnt=0;
- while(tmp_subscriber)
- {
- if(cnt==p->subscriber_idx)
- {
- tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb;
- return 0;
- }
- cnt++;
- tmp_subscriber=tmp_subscriber->next;
- }
- }
- };
-
- struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_idx = plugin_idx;
- new_subscriber->plugin_msg_cb = plugin_on_msg_cb;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- struct stellar_mq_subscriber_info sub_info;
- memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
- sub_info.topic_id=topic_id;
- sub_info.subscriber_idx=topic->subscriber_cnt;
- utarray_push_back(registed_mq_subscriber_info, &sub_info);
- topic->subscriber_cnt+=1;
- plug_mgr->mq_topic_subscriber_num+=1;
- return 0;
-}
static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
{
struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- struct registered_packet_plugin_schema *packet_plugin_schema;
+ struct registered_plugin_schema *plugin_schema;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
(unsigned int)(mq_elt->header.topic_id));
if (topic)
@@ -477,11 +427,13 @@ static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
{
if (sub_elt->plugin_msg_cb)
{
- packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
- plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if (packet_plugin_schema)
+ plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(
+ plug_mgr->registered_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (plugin_schema)
{
- sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
+ //TODO: maybe need pub_plugin_env as dispatch_cb parameter
+ if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, plugin_schema->plugin_env);
+ else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env);
}
}
}
@@ -529,9 +481,7 @@ static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_a
}
}
-/*******************************
- * PACKET MQ *
- *******************************/
+UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
//return 0 if success, otherwise return -1.
int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
@@ -539,17 +489,57 @@ int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugi
int plugin_idx=plugin_id;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
+ if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL)return -1;
- struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
- if(packet_plugin_schema==NULL)return -1;
+ struct registered_plugin_schema *plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(plug_mgr->registered_plugin_array, (unsigned)plugin_idx);
+ if(plugin_schema==NULL)return -1;
- if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ if(plugin_schema->registed_mq_subscriber_info==NULL)
{
- utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
+ utarray_new(plugin_schema->registed_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
}
- return __stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
+ // if plugin already subscribe current topic, return 0
+ struct stellar_mq_subscriber_info *p=NULL;
+ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(plugin_schema->registed_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ {
+ struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
+ int cnt=0;
+ while(tmp_subscriber)
+ {
+ if(cnt==p->subscriber_idx)
+ {
+ tmp_subscriber->plugin_msg_cb=plugin_on_msg_cb;
+ return 0;
+ }
+ cnt++;
+ tmp_subscriber=tmp_subscriber->next;
+ }
+ }
+ };
+
+ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->plugin_idx = plugin_idx;
+ new_subscriber->plugin_msg_cb = plugin_on_msg_cb;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ struct stellar_mq_subscriber_info sub_info;
+ memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
+ sub_info.topic_id=topic_id;
+ sub_info.subscriber_idx=topic->subscriber_cnt;
+ utarray_push_back(plugin_schema->registed_mq_subscriber_info, &sub_info);
+ topic->subscriber_cnt+=1;
+ plug_mgr->mq_topic_subscriber_num+=1;
+ return 0;
}
int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority)
@@ -618,31 +608,31 @@ void session_exdata_runtime_free(struct stellar_exdata *exdata_rt)
/*********************************************
- * PLUGIN MANAGER PACKET PLUGIN *
+ * PLUGIN MANAGER PLUGIN *
*********************************************/
-UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
+UT_icd registered_plugin_array_icd = {sizeof(struct registered_plugin_schema), NULL, NULL, NULL};
int stellar_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_input, plugin_on_packet_func on_packet_output, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->registered_packet_plugin_array == NULL)
+ if(plug_mgr->registered_plugin_array == NULL)
{
- utarray_new(plug_mgr->registered_packet_plugin_array, &registered_packet_plugin_array_icd);
+ utarray_new(plug_mgr->registered_plugin_array, &registered_plugin_array_icd);
}
- struct registered_packet_plugin_schema packet_plugin_schema;
+ struct registered_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
packet_plugin_schema.ip_protocol = ip_proto;
packet_plugin_schema.on_packet[PACKET_STAGE_INPUT] = on_packet_input;
packet_plugin_schema.on_packet[PACKET_STAGE_OUTPUT] = on_packet_output;
packet_plugin_schema.plugin_env = plugin_env;
- utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
- return (utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
+ utarray_push_back(plug_mgr->registered_plugin_array, &packet_plugin_schema);
+ return (utarray_len(plug_mgr->registered_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index
}
static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt, enum packet_stage in_out)
{
- if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
- struct registered_packet_plugin_schema *p=NULL;
+ if(plug_mgr==NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
+ struct registered_plugin_schema *p=NULL;
//TODO: get innermost layer ip protocol by packet api
struct tuple6 t6;
@@ -652,7 +642,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
int tid=stellar_get_current_thread_index();
//TODO : provide public api to reset pub_msg_cnt
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
- while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
+ while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet[in_out])
{
@@ -670,7 +660,7 @@ void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, stru
void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
- if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
+ if(plug_mgr == NULL || plug_mgr->registered_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
int tid=stellar_get_current_thread_index();
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h
index 0929956..7dc224e 100644
--- a/infra/plugin_manager/plugin_manager_interna.h
+++ b/infra/plugin_manager/plugin_manager_interna.h
@@ -37,7 +37,7 @@ struct plugin_manager_schema
UT_array *plugin_load_specs_array;
UT_array *stellar_exdata_schema_array;
UT_array *stellar_mq_schema_array;
- UT_array *registered_packet_plugin_array;
+ UT_array *registered_plugin_array;
UT_array *registered_polling_plugin_array;
int stellar_mq_topic_num;
int mq_topic_subscriber_num;
@@ -93,6 +93,7 @@ struct stellar_mq_topic_schema
int topic_id;
int subscriber_cnt;
int is_destroyed;
+ on_msg_dispatch_cb_func *dispatch_cb;
stellar_msg_free_cb_func *free_cb;
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
@@ -105,12 +106,12 @@ enum packet_stage
PACKET_STAGE_MAX
};
-struct registered_packet_plugin_schema
+struct registered_plugin_schema
{
char ip_protocol;
plugin_on_packet_func *on_packet[PACKET_STAGE_MAX];
void *plugin_env;
- UT_array *registed_packet_mq_subscriber_info;
+ UT_array *registed_mq_subscriber_info;
}__attribute__((aligned(sizeof(void*))));
struct registered_polling_plugin_schema
diff --git a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp
index 89b7927..2b8436c 100644
--- a/infra/plugin_manager/test/plugin_manager_gtest_main.cpp
+++ b/infra/plugin_manager/test/plugin_manager_gtest_main.cpp
@@ -27,7 +27,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
//registered plugin array null
EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL);
- EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL);
+ EXPECT_TRUE(plug_mgr->registered_plugin_array==NULL);
EXPECT_TRUE(plug_mgr->per_thread_data!=NULL);
int thread_num=stellar_get_worker_thread_num(st);
@@ -102,7 +102,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
- int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ int topic_id = stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
struct stellar_mq_topic_schema *topic_schema = NULL;
{
@@ -116,7 +116,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
}
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
- EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr),
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_packet_msg_free, plug_mgr),
-1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -128,7 +128,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -166,7 +166,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) {
const char *topic_name="PACKET_TOPIC";
- int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10),-1);//illgeal plugin_id
@@ -246,7 +246,7 @@ TEST(plugin_manager, packet_plugin_illegal_exdata) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array);
+ int packet_plugin_num = utarray_len(plug_mgr->registered_plugin_array);
EXPECT_EQ(packet_plugin_num, 1);
}
@@ -292,7 +292,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num);
+ EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), proto_filter_plugin_num);
}
struct packet pkt={&st, IPv4, 0};
@@ -420,7 +420,7 @@ TEST(plugin_manager, packet_plugins_share_exdata) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number
+ EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), 2); // Fix plugin number
}
struct packet pkt={&st, IPv4, ip_proto};
@@ -490,7 +490,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, test_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -525,7 +525,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), topic_sub_num+1);
}
struct packet pkt={&st, IPv4, ip_proto};
@@ -607,7 +607,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], NULL, overlimit_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -642,7 +642,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ EXPECT_EQ(utarray_len(plug_mgr->registered_plugin_array), topic_sub_num+1);
}
struct packet pkt={&st, IPv4, ip_proto};
@@ -712,7 +712,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
EXPECT_GE(plugin_id, 0);
env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
- env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env);
+ env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", NULL, test_exdata_free_pub_msg_free, &env);
EXPECT_EQ(stellar_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
@@ -776,7 +776,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name
- int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st);
EXPECT_GE(topic_id, 0);
struct stellar_mq_topic_schema *topic_schema;
{
@@ -790,7 +790,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
}
EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
- EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, NULL, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -802,7 +802,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0);
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, NULL, test_mock_overwrite_session_msg_free, plug_mgr), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -842,7 +842,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) {
const char *topic_name="SESSION_TOPIC";
- int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, NULL, test_mock_session_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id
@@ -1014,11 +1014,11 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) {
int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0);
- env.intrinsc_egress_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_OUTPUT, NULL, NULL);
+ env.intrinsc_egress_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_OUTPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_egress_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error
@@ -1122,11 +1122,11 @@ TEST(plugin_manager, DISABLED_session_plugin_ignore_on_ctx_new_sub_other_msg) {
env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL,&env);
EXPECT_GE(env.test_mq_pub_plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_session_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
@@ -1288,11 +1288,11 @@ TEST(plugin_manager,DISABLED_session_plugin_pub_msg_overlimt) {
env.test_mq_pub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(env.test_mq_pub_plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_overlimit_session_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
env.test_mq_sub_plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
@@ -1399,11 +1399,11 @@ TEST(plugin_manager, DISABLED_session_plugin_on_ctx_new_then_dettach) {
int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, test_dettach_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0);
@@ -1486,11 +1486,11 @@ TEST(plugin_manager, DISABLED_session_plugin_pub_on_ctx_free) {
int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
// pesudo packet and session
@@ -1608,11 +1608,11 @@ TEST(plugin_manager, DISABLED_session_plugin_pub_msg_on_closing) {
int plugin_id=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0);
@@ -1729,7 +1729,7 @@ TEST(plugin_manager, DISABLED_test_session_mq_topic_is_active) {
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0);
@@ -1828,7 +1828,7 @@ TEST(plugin_manager, DISABLED_test_session_dettach) {
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0);
@@ -1974,7 +1974,7 @@ TEST(plugin_manager, test_session_mq_priority) {
env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ;
env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ;
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
@@ -2063,7 +2063,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) {
env.plugin_id_1=stellar_plugin_register(&st, 0, NULL, NULL, &env);
EXPECT_GE(env.plugin_id_1,0);
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL);
+ env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0);