summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-16 18:24:23 +0800
committeryangwei <[email protected]>2024-07-17 11:28:35 +0800
commit161aa7da1e82d939ba09f8dc211ffc5f216f963b (patch)
treeac116d4baff4a6580d0d4c57d07d84eed5199449
parentcc862437769735aa5ea31f5b875ea8e25afed726 (diff)
🦄 refactor(packet mq & exdata): backport packet mq & exdata
-rw-r--r--.gitlab-ci.yml2
-rw-r--r--examples/stellar_plugin/simple_stellar_plugin.c39
-rw-r--r--include/stellar/packet_exdata.h8
-rw-r--r--include/stellar/packet_mq.h22
-rw-r--r--src/plugin_manager/plugin_manager.c590
-rw-r--r--src/plugin_manager/plugin_manager.h2
-rw-r--r--src/plugin_manager/plugin_manager_interna.h40
-rw-r--r--src/stellar_on_sapp/stellar_internal.h4
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c13
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp579
-rw-r--r--test/plugin_manager/plugin_manager_gtest_mock.h4
11 files changed, 1056 insertions, 247 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 57264b9..7cdd2fd 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -122,7 +122,7 @@ test_in_centos8:
script:
- *everything_before_script
- ls -l /opt/MESA/lib && echo "/opt/MESA/lib" >> /etc/ld.so.conf
- - cd build; make test
+ - cd build; ctest -V
dependencies:
- develop_build_for_centos8
- release_build_for_centos8
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c
index 467a38b..5625cc9 100644
--- a/examples/stellar_plugin/simple_stellar_plugin.c
+++ b/examples/stellar_plugin/simple_stellar_plugin.c
@@ -3,6 +3,8 @@
#include "stellar/utils.h"
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
+#include "stellar/packet_exdata.h"
+#include "stellar/packet_mq.h"
#include <stdio.h>
#include <string.h>
@@ -14,6 +16,8 @@ struct simple_stellar_plugin_env
struct stellar *st;
int session_plugin_id;
int session_exdata_idx;
+ int packet_exdata_idx;
+ int packet_topic_id;
int stat_topic_id;
int egress_topic_id;
int tcp_topic_id;
@@ -130,6 +134,8 @@ static void simple_plugin_on_session_msg(struct session *sess, int topic_id, con
void simple_plugin_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
{
struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ packet_exdata_set(pkt, env->packet_exdata_idx, env);
+ packet_mq_publish_message(pkt, env->packet_topic_id, env);
switch (ip_protocol)
{
case IPPROTO_TCP:
@@ -152,6 +158,15 @@ void simple_plugin_on_packet(struct packet *pkt, unsigned char ip_protocol, voi
return;
}
+void simple_plugin_packet_get_exdata(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
+ struct simple_stellar_plugin_env *exdata = (struct simple_stellar_plugin_env *)packet_exdata_get(pkt, env->packet_exdata_idx);
+ assert(memcmp(env, exdata, sizeof(struct simple_stellar_plugin_env)) == 0);
+ return;
+}
+
+
int simple_plugin_on_polling(void *plugin_env)
{
struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env;
@@ -203,6 +218,30 @@ void *simple_session_packet_plugin_init(struct stellar *st)
exit(-1);
}
+ env->packet_topic_id=stellar_packet_mq_get_topic_id(st, "TOPIC_PACKET_ENV");
+ if(env->packet_topic_id < 0)
+ {
+ env->packet_topic_id=stellar_packet_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env);
+ }
+
+ tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_plugin_packet_get_exdata, env);
+ udp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_UDP, simple_plugin_packet_get_exdata, env);
+ icmp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMP, simple_plugin_packet_get_exdata, env);
+ icmp6_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMPV6, simple_plugin_packet_get_exdata, env);
+
+ if(tcp_plugin_id < 0 || udp_plugin_id < 0 || icmp_plugin_id < 0 || icmp6_plugin_id < 0)
+ {
+ perror("register packet plugin get exdata return invalid plugin id\n");
+ exit(-1);
+ }
+
+ stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, tcp_plugin_id);
+ stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, udp_plugin_id);
+ stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, icmp_plugin_id);
+ stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, icmp6_plugin_id);
+
+ env->packet_exdata_idx=stellar_packet_exdata_new_index(st, "EXDATA_PACKET_ENV", simple_plugin_packet_exdata_free, env);
+
int polling_plugin_id=stellar_polling_plugin_register(st, simple_plugin_on_polling, env);
if(polling_plugin_id < 0)
{
diff --git a/include/stellar/packet_exdata.h b/include/stellar/packet_exdata.h
new file mode 100644
index 0000000..7620eae
--- /dev/null
+++ b/include/stellar/packet_exdata.h
@@ -0,0 +1,8 @@
+#pragma once
+
+#include "stellar.h"
+
+typedef void packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg);
+int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *arg);
+int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr);
+void *packet_exdata_get(struct packet *pkt, int idx); \ No newline at end of file
diff --git a/include/stellar/packet_mq.h b/include/stellar/packet_mq.h
new file mode 100644
index 0000000..94bb39b
--- /dev/null
+++ b/include/stellar/packet_mq.h
@@ -0,0 +1,22 @@
+#pragma once
+
+#include "stellar.h"
+
+//session mq
+typedef void packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg);
+typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
+
+//return topic_id
+int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+
+int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name);
+
+int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+
+int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id);
+
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only
+
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg);
+
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 83b1ad6..218b23e 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -1,17 +1,12 @@
#include "plugin_manager_interna.h"
-
#include "stellar_internal.h"
-
#include "stellar/session.h"
-
#include "stellar/utils.h"
#include "toml/toml.h"
#include "uthash/utlist.h"
-
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
-
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
*spec_num = 0;
@@ -70,6 +65,28 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
+static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
+{
+ if(st == NULL)return NULL;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num);
+ return per_thread_data;
+}
+
+static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
+{
+ if(per_thread_data == NULL || st == NULL)return;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct plugin_manger_per_thread_data *p_data;
+ for (int i = 0; i < thread_num; i++)
+ {
+ p_data=per_thread_data+i;
+ if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
+ }
+ FREE(per_thread_data);
+ return;
+}
+
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
@@ -105,9 +122,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
}
}
FREE(specs);
+ plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
return plug_mgr;
}
+static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
@@ -121,14 +140,15 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->plugin_load_specs_array);
}
- if(plug_mgr->session_mq_schema_array)
+ if(plug_mgr->stellar_mq_schema_array)
{
- for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++)
+ for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
{
- stellar_session_mq_destroy_topic(plug_mgr->st, i);
+ stellar_mq_destroy_topic( i, plug_mgr->stellar_mq_schema_array);
}
- utarray_free(plug_mgr->session_mq_schema_array);
+ utarray_free(plug_mgr->stellar_mq_schema_array);
}
+ if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array);
if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array)
@@ -149,6 +169,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
+ plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
return;
}
@@ -223,9 +244,65 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_
}
/*******************************
- * SESSION EXDATA *
+ * PACKET EXDATA *
*******************************/
+static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL;
+ int tid=stellar_get_current_thread_id(plug_mgr->st);
+ if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL)
+ {
+ unsigned int len = utarray_len(plug_mgr->packet_exdata_schema_array);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len);
+ }
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array;
+}
+
+static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+{
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return;
+ unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
+ struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
+ if(per_thread_pkt_exdata_arrary == NULL)return;
+ for (unsigned int i = 0; i < len; i++)
+ {
+ void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
+ struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i);
+ if (exdata)
+ {
+ if (schema->pkt_free_func)
+ {
+ schema->pkt_free_func(pkt, i, exdata, schema->free_arg);
+ }
+ (per_thread_pkt_exdata_arrary + i)->exdata=NULL;
+ }
+ }
+}
+
+int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_exdata_new_index(st, name, &plug_mgr->packet_exdata_schema_array, (void*)free_func, free_arg);
+}
+int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
+{
+ if(pkt == NULL)return -1;
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
+ return stellar_exdata_set(plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
+}
+
+void *packet_exdata_get(struct packet *pkt, int idx)
+{
+ if(pkt == NULL)return NULL;
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
+ return stellar_exdata_get( plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
+}
+
+/*******************************
+ * SESSION EXDATA *
+ *******************************/
int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
@@ -252,7 +329,6 @@ void *session_exdata_get(struct session *sess, int idx)
/*******************************
* STELLAR MQ *
*******************************/
-
static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
@@ -326,7 +402,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms
return t_schema.topic_id;
}
-int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
+static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
{
if(mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(mq_schema_array);
@@ -349,27 +425,176 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
return 1; // success
}
-static int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq[], enum session_mq_priority priority)
+static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum session_mq_priority priority)
{
- if(mq_schema_array==NULL || topic_id < 0)return -1;
- unsigned int len = utarray_len(mq_schema_array);
+ if(stellar_mq_schema==NULL || topic_id < 0)return -1;
+ unsigned int len = utarray_len(stellar_mq_schema);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->header.topic_id = topic_id;
+ msg->header.type=type;
msg->header.priority = priority;
msg->body = data;
- DL_APPEND(mq[priority], msg);
+ DL_APPEND(priority_mq[priority], msg);
return 0;
}
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
+static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
+{
+ if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
-/*******************************
- * SESSION MQ *
- *******************************/
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ // if plugin already subscribe current topic, return 0
+ struct stellar_mq_subscriber_info *p=NULL;
+ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ {
+ struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
+ int cnt=0;
+ while(tmp_subscriber)
+ {
+ if(cnt==p->subscriber_idx)
+ {
+ tmp_subscriber->msg_cb=plugin_on_msg_cb;
+ return 0;
+ }
+ cnt++;
+ tmp_subscriber=tmp_subscriber->next;
+ }
+ }
+ };
+
+ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->plugin_idx = plugin_idx;
+ new_subscriber->msg_cb = plugin_on_msg_cb;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ struct stellar_mq_subscriber_info sub_info;
+ memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
+ sub_info.topic_id=topic_id;
+ sub_info.subscriber_idx=topic->subscriber_cnt;
+ utarray_push_back(registed_mq_subscriber_info, &sub_info);
+ topic->subscriber_cnt+=1;
+ plug_mgr->session_topic_subscriber_num+=1;
+ return 0;
+}
+
+static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct registered_session_plugin_schema *session_plugin_schema;
+ struct session_plugin_ctx_runtime *plugin_ctx_rt;
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array,
+ (unsigned int)(mq_elt->header.topic_id));
+
+ if (topic)
+ {
+ int cur_sub_idx = 0;
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
+ if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
+ {
+ plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
+ plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (session_plugin_schema)
+ {
+ if (plugin_ctx_rt->state == INIT)
+ {
+ if (session_plugin_schema->on_ctx_new)
+ {
+ plugin_ctx_rt->plugin_ctx =
+ session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
+ if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
+ {
+ session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ plugin_ctx_rt->plugin_ctx = NULL;
+ }
+ else
+ {
+ plugin_ctx_rt->state = ACTIVE;
+ }
+ }
+ }
+ if (sub_elt->sess_msg_cb &&
+ bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
+ 0) // ctx_new maybe call detach, need check again
+ {
+ sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ }
+ }
+ }
+ cur_sub_idx++;
+ }
+ if (cur_sub_idx == 0)
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
+ }
+}
-void session_mq_free(struct session *sess, struct stellar_message **head, UT_array *mq_schema_array)
+static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt)
+{
+ struct stellar *st = packet_stellar_get(pkt);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct registered_packet_plugin_schema *packet_plugin_schema;
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
+ (unsigned int)(mq_elt->header.topic_id));
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ if (sub_elt->pkt_msg_cb)
+ {
+ packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
+ plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (packet_plugin_schema)
+ {
+ sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
+ }
+ }
+ }
+ }
+}
+
+static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt)
+{
+ struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
+ int cur_priority = SESSION_MQ_PRIORITY_HIGH;
+ while(cur_priority >= SESSION_MQ_PRIORITY_LOW)
+ {
+ if(priority_mq[cur_priority]==NULL)
+ {
+ cur_priority--;
+ continue;
+ }
+ DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
+ {
+ if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt);
+ if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt);
+ DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
+ DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
+
+ cur_priority=SESSION_MQ_PRIORITY_HIGH;
+ break;
+ }
+ }
+ return;
+}
+
+static void stellar_mq_free(struct session *sess, struct packet *pkt, struct stellar_message **head, UT_array *mq_schema_array)
{
struct stellar_message *mq_elt, *tmp;
struct stellar_mq_topic_schema *topic;
@@ -379,33 +604,105 @@ void session_mq_free(struct session *sess, struct stellar_message **head, UT_arr
(unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
- topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg);
+ if(mq_elt->header.type==ON_SESSION_TOPIC)topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg);
+ if(mq_elt->header.type==ON_PACKET_TOPIC)topic->pkt_msg_free_cb(pkt, mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(*head, mq_elt);
FREE(mq_elt);
}
- FREE(*head);
}
+/*******************************
+ * PACKET MQ *
+ *******************************/
+int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array);
+ if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1;
+ return topic_id;
+}
+
+int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array);
+}
+
+int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array);
+}
+
+int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array);
+ if(ret==1)plug_mgr->packet_mq_topic_num-=1;
+ return ret;
+}
+
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id)
+{
+ if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
+ int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
+
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
+
+ struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
+ if(packet_plugin_schema==NULL)return -1;
+
+ if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
+ {
+ utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
+ }
+
+ return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
+}
+
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
+{
+ struct stellar *st = packet_stellar_get(pkt);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
+ if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0)
+ {
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
+ return 0;
+ }
+ return -1;
+}
+
+/*******************************
+ * SESSION MQ *
+ *******************************/
inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- return stellar_mq_get_topic_id(topic_name, plug_mgr->session_mq_schema_array);
+ return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array);
}
int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->session_mq_schema_array);
+ return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array);
}
int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->session_mq_schema_array);
+ int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array);
if(topic_id>=0)plug_mgr->session_mq_topic_num+=1;
return topic_id;
}
@@ -414,26 +711,27 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array);
+ int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array);
if(ret==1)plug_mgr->session_mq_topic_num-=1;
return ret;
}
-
-
int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
- int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->priority_mq, priority);
- if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1;
- return ret;
+ int tid = stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
+ if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
+ {
+ plug_mgr_rt->pub_session_msg_cnt+=1;
+ return 0;
+ }
+ return -1;
}
-
-int session_mq_publish_message(struct session *sess, int topic_id, void *data)
+inline int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL);
}
@@ -463,7 +761,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return -1;
if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
@@ -486,170 +784,32 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
return -1;
}
-
-int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
+inline int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
}
-int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
+inline int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
}
-
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
- if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
+ if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr == NULL || plug_mgr->session_mq_schema_array==NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
+ if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
- if(session_plugin_schema==NULL)return -1;
-
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
- if(topic==NULL)return -1;
-
+ if(session_plugin_schema==NULL)return -1;
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
{
utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
- }
-
- // if plugin already subscribe current topic, return 0
- struct stellar_mq_subscriber_info *p=NULL;
- while( (p=(struct stellar_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
- {
- if(p->topic_id==topic_id)
- {
- struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
- int cnt=0;
- while(tmp_subscriber)
- {
- if(cnt==p->subscriber_idx)
- {
- tmp_subscriber->sess_msg_cb=plugin_on_msg_cb;
- return 0;
- }
- cnt++;
- tmp_subscriber=tmp_subscriber->next;
- }
- }
- };
-
- struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_idx = plugin_id;
- new_subscriber->sess_msg_cb = plugin_on_msg_cb;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- struct stellar_mq_subscriber_info sub_info;
- memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
- sub_info.topic_id=topic_id;
- sub_info.subscriber_idx=topic->subscriber_cnt;
- utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info);
- topic->subscriber_cnt+=1;
- plug_mgr->session_topic_subscriber_num+=1;
- return 0;
-}
-
-static void session_mq_dispatch_one_message(struct session *sess, struct stellar_message *mq_elt)
-{
- struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
- struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- struct registered_session_plugin_schema *session_plugin_schema;
- struct session_plugin_ctx_runtime *plugin_ctx_rt;
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
- plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)(mq_elt->header.topic_id));
-
- if (topic)
- {
- int cur_sub_idx = 0;
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
- {
- plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
- session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
- plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if (session_plugin_schema)
- {
- if (plugin_ctx_rt->state == INIT)
- {
- if (session_plugin_schema->on_ctx_new)
- {
- plugin_ctx_rt->plugin_ctx =
- session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
- if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
- {
- session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
- session_plugin_schema->plugin_env);
- plugin_ctx_rt->plugin_ctx = NULL;
- }
- else
- {
- plugin_ctx_rt->state = ACTIVE;
- }
- }
- }
- if (sub_elt->sess_msg_cb &&
- bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
- 0) // ctx_new maybe call detach, need check again
- {
- sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
- session_plugin_schema->plugin_env);
- }
- }
- }
- cur_sub_idx++;
- }
- if (cur_sub_idx == 0)
- bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
- }
-}
-
-static void plugin_manager_session_message_dispatch(struct session *sess)
-{
- struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
- if(plug_mgr_rt==NULL)return;
-
- struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
-
- int cur_priority = SESSION_MQ_PRIORITY_HIGH;
- while(cur_priority >= SESSION_MQ_PRIORITY_LOW)
- {
- if(plug_mgr_rt->priority_mq[cur_priority]==NULL)
- {
- cur_priority--;
- continue;
- }
- DL_FOREACH_SAFE(plug_mgr_rt->priority_mq[cur_priority], mq_elt, mq_tmp)
- {
- session_mq_dispatch_one_message(sess, mq_elt);
- DL_DELETE(plug_mgr_rt->priority_mq[mq_elt->header.priority], mq_elt);
- DL_APPEND(plug_mgr_rt->dealth_letter_queue, mq_elt); // move to dlq list
-
- cur_priority=SESSION_MQ_PRIORITY_HIGH;
- break;
- }
- }
-
-#if 0
- while (plug_mgr_rt->pending_mq != NULL)
- {
- DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
- {
- session_mq_dispatch_one_message(sess, mq_elt);
- DL_DELETE(plug_mgr_rt->pending_mq[mq_elt->header.priority], mq_elt);
- DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt); // move to delivered message list
- }
- }
-#endif
- return;
+ }
+ //session plugin id equals to plugin idx
+ return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info);
}
int session_mq_topic_is_active(struct session *sess, int topic_id)
@@ -665,7 +825,6 @@ int session_mq_topic_is_active(struct session *sess, int topic_id)
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
-
static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
{
struct stellar_exdata *exdata_rt = NULL;
@@ -702,8 +861,6 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
- memset(rt->priority_mq, 0, sizeof(rt->priority_mq));
- rt->dealth_letter_queue = NULL;
rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1);
rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
@@ -716,21 +873,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
- assert(rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
- assert(rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
- assert(rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
- for(int i=0; i < SESSION_MQ_PRIORITY_MAX; i++)
- {
- if(rt->priority_mq[i] != NULL)
- {
- session_mq_free(rt->sess, &rt->priority_mq[i], rt->plug_mgr->session_mq_schema_array);
- }
- }
- assert(rt->dealth_letter_queue==NULL);
- if(rt->dealth_letter_queue != NULL)
- {
- session_mq_free(rt->sess, &rt->dealth_letter_queue, rt->plug_mgr->session_mq_schema_array);
- }
+
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
@@ -752,7 +895,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE)
{
session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx,
- session_plugin_schema->plugin_env);
+ session_plugin_schema->plugin_env);
}
}
FREE(rt->plugin_ctx_array);
@@ -766,9 +909,6 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
/*********************************************
* PLUGIN MANAGER PACKET PLUGIN *
*********************************************/
-
-
-
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
@@ -792,6 +932,9 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
unsigned char ip_proto=packet_get_ip_protocol(pkt);
+ int tid=stellar_get_current_thread_id(plug_mgr->st);
+ //TODO : provide public api to reset pub_msg_cnt
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
@@ -799,14 +942,22 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
p->on_packet(pkt, ip_proto, p->plugin_env);
}
}
+ stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
return;
}
+void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+{
+ if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
+ per_thread_packet_exdata_arrary_clean(plug_mgr, pkt);
+ stellar_mq_free(NULL, pkt,
+ &plug_mgr->per_thread_data[stellar_get_current_thread_id(plug_mgr->st)].dealth_letter_queue,
+ plug_mgr->stellar_mq_schema_array);
+}
+
/*********************************************
* PLUGIN MANAGER POLLING PLUGIN *
*********************************************/
-
-
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
@@ -845,12 +996,8 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
/*********************************************
* PLUGIN MANAGER SESSION PLUGIN *
*********************************************/
-
-
-
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
-
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
@@ -894,7 +1041,8 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
}
plug_mgr_rt->pub_session_msg_cnt=0;
session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, SESSION_MQ_PRIORITY_HIGH);
- plugin_manager_session_message_dispatch(sess);
+ int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
return;
}
@@ -903,12 +1051,9 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH);
- plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
- assert(plug_mgr_rt->dealth_letter_queue==NULL);
+ int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
+ stellar_mq_free(plug_mgr_rt->sess,pkt, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
return;
}
@@ -928,12 +1073,9 @@ void plugin_manager_on_session_closing(struct session *sess)
default:
break;
}
- plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
- assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
- assert(plug_mgr_rt->dealth_letter_queue==NULL);
+ int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
+ stellar_mq_free(plug_mgr_rt->sess,NULL,&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
return;
}
@@ -952,11 +1094,10 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
{
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
session_mq_update_topic_status(plug_mgr_rt, topic);
}
}
-
//dettach in ctx INIT, do not call on_ctx_free immidiately
if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free))
{
@@ -966,5 +1107,4 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT;
return;
-}
-
+} \ No newline at end of file
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index 23c4297..cf07167 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -9,7 +9,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-
+void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 82b2e7c..4f97e9a 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -5,21 +5,44 @@
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
+
+#include "stellar/packet_exdata.h"
+#include "stellar/packet_mq.h"
+
+
#include "bitmap/bitmap.h"
#include "uthash/utarray.h"
+struct per_thread_exdata_array
+{
+ struct stellar_exdata *exdata_array;
+};
+
struct stellar_message;
+struct plugin_manger_per_thread_data
+{
+ struct per_thread_exdata_array per_thread_pkt_exdata_array;
+ struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list
+ struct stellar_message *dealth_letter_queue;// dlq list
+ long long pub_packet_msg_cnt;
+};
+
+
+
struct plugin_manager_schema
{
struct stellar *st;
UT_array *plugin_load_specs_array;
+ UT_array *packet_exdata_schema_array;
UT_array *session_exdata_schema_array;
- UT_array *session_mq_schema_array;
+ UT_array *stellar_mq_schema_array;
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
+ int packet_mq_topic_num;
int session_mq_topic_num;
+ int packet_topic_subscriber_num;
int session_topic_subscriber_num;
int tcp_topic_id;
int tcp_stream_topic_id;
@@ -27,6 +50,7 @@ struct plugin_manager_schema
int egress_topic_id;
int control_packet_topic_id;
int max_message_dispatch;
+ struct plugin_manger_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
@@ -44,17 +68,26 @@ struct stellar_exdata_schema
{
void *free_func;
session_exdata_free *sess_free_func;
+ packet_exdata_free *pkt_free_func;
};
void *free_arg;
int idx;
}__attribute__((aligned(sizeof(void*))));
+
+enum stellar_topic_type
+{
+ ON_SESSION_TOPIC,
+ ON_PACKET_TOPIC,
+};
+
struct stellar_message
{
struct
{
int topic_id;
+ enum stellar_topic_type type;
enum session_mq_priority priority;
} header;
void *body;
@@ -68,6 +101,8 @@ typedef struct stellar_mq_subscriber
union
{
on_session_msg_cb_func *sess_msg_cb;
+ on_packet_msg_cb_func *pkt_msg_cb;
+ void *msg_cb;
};
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
@@ -84,6 +119,7 @@ struct stellar_mq_topic_schema
{
void *free_cb;
session_msg_free_cb_func *sess_msg_free_cb;
+ packet_msg_free_cb_func *pkt_msg_free_cb;
};
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
@@ -104,8 +140,6 @@ struct plugin_manager_runtime
{
struct plugin_manager_schema *plug_mgr;
struct session *sess;
- struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list
- struct stellar_message *dealth_letter_queue;// dlq list
struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
struct bitmap *session_topic_status; //N bits, N topic
struct stellar_exdata *sess_exdata_array;
diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h
index 3358e7c..e42dfed 100644
--- a/src/stellar_on_sapp/stellar_internal.h
+++ b/src/stellar_on_sapp/stellar_internal.h
@@ -22,4 +22,6 @@ enum packet_type
CONTROL,
};
-enum packet_type packet_get_type(const struct packet *pkt); \ No newline at end of file
+enum packet_type packet_get_type(const struct packet *pkt);
+
+struct stellar * packet_stellar_get(struct packet *pkt); \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index 72935c6..04a67e8 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -41,6 +41,11 @@ struct session
struct plugin_manager_runtime *plug_mgr_rt;
}__attribute__((aligned(sizeof(void*))));
+inline struct stellar * packet_stellar_get(struct packet *pkt)
+{
+ return pkt->st;
+}
+
inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st)
{
@@ -140,6 +145,8 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
struct packet *pkt = &sess->cur_pkt;
pkt->raw_pkt=raw_pkt;
pkt->type=type;
+ pkt->sess=sess;
+ pkt->st=sess->st;
if(raw_pkt)plugin_manager_on_session_ingress(sess, pkt);
//check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
@@ -162,6 +169,7 @@ void session_defer_on_sapp(struct session *sess)
if(pkt->raw_pkt)
{
plugin_manager_on_session_egress(sess, pkt);
+ plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt);
}
}
sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt
@@ -197,6 +205,11 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void
}
//FIXME: defer TCP/UDP packet on session update
plugin_manager_on_packet_ingress(st->plug_mgr, &pkt);
+ if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP)
+ {
+ plugin_manager_on_packet_egress(st->plug_mgr, &pkt);
+ }
+
}
inline int polling_on_sapp(struct stellar *st)
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index eb1cebc..381c6d5 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -3,6 +3,9 @@
#include "stellar/utils.h"
#include "plugin_manager_gtest_mock.h"
+#define STELLAR_INTRINSIC_TOPIC_NUM 5
+#define TOPIC_NAME_MAX 512
+
void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr)
{
SCOPED_TRACE("whitebox test intrisic metadata");
@@ -15,34 +18,37 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL);
//packet exdata & mq schema null
+ EXPECT_TRUE(plug_mgr->packet_exdata_schema_array==NULL);
//session exdata schema null
EXPECT_TRUE(plug_mgr->session_exdata_schema_array==NULL);
//session mq schema not null
- EXPECT_TRUE(plug_mgr->session_mq_schema_array!=NULL);
+ EXPECT_TRUE(plug_mgr->stellar_mq_schema_array!=NULL);
//registered plugin array null
EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL);
EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL);
EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL);
- int intrinsic_topic_num=utarray_len(plug_mgr->session_mq_schema_array);
+ EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0);
+
+ int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array);
EXPECT_EQ(plug_mgr->session_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id);
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_TCP);
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_TCP_STREAM);
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_UDP);
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_EGRESS);
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET);
//intrinsic topic
@@ -51,6 +57,16 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_UDP), 0);
EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_EGRESS), 0);
EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0);
+
+ EXPECT_TRUE(plug_mgr->per_thread_data!=NULL);
+ int thread_num=stellar_get_worker_thread_num(st);
+ for(int i=0; i<thread_num; i++)
+ {
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL);
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL);
+ for(int j=0; j<SESSION_MQ_PRIORITY_MAX; j++)
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL);
+ }
}
/***********************************
@@ -67,25 +83,220 @@ TEST(plugin_manager_init, init_with_null_toml) {
plugin_manager_exit(plug_mgr);
}
+/******************************************
+ * TEST PLUGIN MANAGER PACKET PLUGIN INIT *
+ ******************************************/
+
+static void test_mock_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg){}
+
+static void test_mock_overwrite_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg){}
+
+TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *exdata_name="PACKET_EXDATA";
+ int exdata_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st);
+ EXPECT_GE(exdata_idx, 0);
+ int overwrite_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr);
+ EXPECT_GE(overwrite_idx, 0);
+ EXPECT_EQ(overwrite_idx, exdata_idx);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr(
+ plug_mgr->packet_exdata_schema_array, (unsigned int)exdata_idx);
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, plug_mgr);
+ EXPECT_EQ(exdata_schema->idx, exdata_idx);
+ EXPECT_STREQ(exdata_schema->name, exdata_name);
+
+ int exdata_num = utarray_len(plug_mgr->packet_exdata_schema_array);
+ EXPECT_EQ(exdata_num, 1);
+ }
+
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){}
+void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){}
+
+TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="PACKET_TOPIC";
+
+ EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
+
+ int topic_id = stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+ struct stellar_mq_topic_schema *topic_schema = NULL;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_packet_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr),
+ -1); // duplicate create, return error
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_packet_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, 10), -1); // illgeal topic_id
+
+ EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0;
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema
+ EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0);
+ }
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){}
+
+void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){}
+
+TEST(plugin_manager_init, packet_mq_subscribe) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+
+ const char *topic_name="PACKET_TOPIC";
+
+ int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+
+
/*******************************************
* TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME*
*******************************************/
#define PACKET_PROTO_PLUGIN_NUM 128
+#define PACKET_EXDATA_NUM 2
+#define PACKET_TOPIC_NUM 2
+#define PACKET_MQ_SUB_NUM 2
struct packet_plugin_env
{
struct plugin_manager_schema *plug_mgr;
int basic_on_packet_called;
int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM];
int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM];
+ int exdata_set_on_packet_called;
+ int exdata_get_on_packet_called;
+ unsigned int packet_exdata_idx[PACKET_EXDATA_NUM];
+ int exdata_free_called[PACKET_EXDATA_NUM];
+ unsigned int packet_topic_id[PACKET_TOPIC_NUM];
+ unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM];
+ int msg_pub_cnt;
+ int msg_sub_cnt;
+ int msg_free_cnt;
};
+static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+TEST(plugin_manager, packet_plugin_illegal_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array);
+ EXPECT_EQ(packet_plugin_num, 1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+}
+
static void test_proto_filter_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
EXPECT_TRUE(env!=NULL);
EXPECT_EQ(pkt->ip_proto, ip_protocol);
EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
env->proto_filter_plugin_called[ip_protocol]+=1;
return;
}
@@ -123,6 +334,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) {
{
pkt.ip_proto = i;
plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
}
}
plugin_manager_exit(plug_mgr);
@@ -133,6 +345,341 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) {
}
}
+static void test_exdata_set_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->exdata_set_on_packet_called+=1;
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ long long *exdata_val=CALLOC(long long , 1);
+ *exdata_val=i;
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0);
+ }
+ return;
+}
+
+static void test_exdata_get_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ long long *exdata_val=(long long *)packet_exdata_get(pkt, env->packet_exdata_idx[i]);
+ EXPECT_EQ(*exdata_val, i);
+ }
+ env->exdata_get_on_packet_called+=1;
+ return;
+}
+
+static void test_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ EXPECT_EQ(*(long long *)ex_ptr, idx);
+ FREE(ex_ptr);
+ env->exdata_free_called[idx]+=1;
+ return;
+}
+
+
+TEST(plugin_manager, packet_plugins_share_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+
+ char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX];
+ int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0]));
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ sprintf(exdata_name[i], "PACKET_EXDATA_%d", i);
+ env.packet_exdata_idx[i]=stellar_packet_exdata_new_index(&st, exdata_name[i], test_packet_exdata_free, &env);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr(
+ plug_mgr->packet_exdata_schema_array, env.packet_exdata_idx[i]);
+
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, &env);
+ EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]);
+ EXPECT_STREQ(exdata_schema->name, exdata_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->packet_exdata_schema_array), exdata_idx_len);
+ }
+
+ int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env);
+ EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env);
+ EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+
+ for(int i=0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.exdata_set_on_packet_called, N_packet);
+ EXPECT_EQ(env.exdata_get_on_packet_called, N_packet);
+
+ for(int i=0; i < exdata_idx_len; i++)
+ {
+ EXPECT_EQ(env.exdata_free_called[i], N_packet);
+ }
+}
+
+static void test_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ EXPECT_EQ(pkt, msg);
+ env->msg_free_cnt+=1;
+ return;
+}
+
+static void test_mq_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ for(int i=0; i<topic_id_num; i++)
+ {
+ EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[i], pkt), 0);
+ env->msg_pub_cnt+=1;
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_mq_pub_sub) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env);
+ EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ FREE(msg);
+ return;
+}
+
+static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ int cnt=0;
+ int *msg;
+ for(int i=0; i<topic_id_num; i++)
+ {
+ for(int j=0; j < MAX_MSG_PER_DISPATCH; j++)
+ {
+ msg=CALLOC(int, 1);
+ *msg=cnt;
+ int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg);
+ if(cnt < MAX_MSG_PER_DISPATCH)
+ {
+ ASSERT_EQ(pub_ret, 0);
+ env->msg_pub_cnt+=1;
+ }
+ else
+ {
+ ASSERT_EQ(pub_ret, -1);
+ }
+ if(pub_ret!=0)FREE(msg);
+ cnt+=1;
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_pub_overlimit) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env);
+ EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+
/**********************************************
* TEST PLUGIN MANAGER ON SESSION PLUGIN INIT *
**********************************************/
@@ -185,7 +732,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic_schema =
- (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
EXPECT_EQ(topic_schema->free_cb_arg, &st);
EXPECT_EQ(topic_schema->topic_id, topic_id);
@@ -198,7 +745,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic_schema =
- (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
EXPECT_EQ(topic_schema->free_cb_arg, &st);
EXPECT_EQ(topic_schema->topic_id, topic_id);
@@ -210,13 +757,13 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
topic_schema =
- (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_session_msg_free);
EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr);
EXPECT_EQ(topic_schema->topic_id, topic_id);
EXPECT_STREQ(topic_schema->topic_name, topic_name);
- EXPECT_EQ(utarray_len(plug_mgr->session_mq_schema_array), 6); // 5 intrinsic topic + 1 created topic
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic
}
EXPECT_EQ(stellar_session_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id
@@ -226,9 +773,9 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- EXPECT_EQ(utarray_len(plug_mgr->session_mq_schema_array), 6);//destory won't delete the topic schema
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema
}
- EXPECT_EQ(plug_mgr->session_mq_topic_num, 5);//intrinsic topic number
+ EXPECT_EQ(plug_mgr->session_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number
plugin_manager_exit(plug_mgr);
}
@@ -260,7 +807,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) {
struct stellar_mq_topic_schema *topic_schema;
{
SCOPED_TRACE("White-box test, check stellar internal schema");
- topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array,(unsigned int)topic_id);
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id);
EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
EXPECT_EQ(topic_schema->free_cb_arg, &st);
EXPECT_EQ(topic_schema->topic_id, topic_id);
@@ -615,8 +1162,8 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
plugin_manager_exit(plug_mgr);
- EXPECT_TRUE(env.test_mq_free_called == env.test_mq_pub_called && env.test_mq_sub_called == env.N_session*env.N_per_session_pkt_cnt);
-
+ EXPECT_EQ(env.test_mq_free_called, env.test_mq_pub_called);
+ EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt);
}
struct test_overlimit_session_mq_ctx
diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h
index 9962872..7d6ffc3 100644
--- a/test/plugin_manager/plugin_manager_gtest_mock.h
+++ b/test/plugin_manager/plugin_manager_gtest_mock.h
@@ -61,6 +61,10 @@ int stellar_get_current_thread_id(struct stellar *st)
return 0;
}
+struct stellar * packet_stellar_get(struct packet *pkt)
+{
+ return pkt->st;
+}
struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess)
{