summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-24 10:38:06 +0800
committeryangwei <[email protected]>2024-07-24 11:46:42 +0800
commitf1b6662e4d561bb924efe4dc4256d4f34a5173a5 (patch)
treea4092ccd6c54dcb48d69309a9bb388894537e4c3
parentbba15d7ffdd3e9add76a2d1e287a0899369ac96b (diff)
🦄 refactor(stellar mq api): integration mq in one .h
-rw-r--r--examples/stellar_plugin/simple_stellar_plugin.c8
-rw-r--r--include/stellar/packet_mq.h22
-rw-r--r--include/stellar/session_mq.h36
-rw-r--r--include/stellar/stellar_mq.h42
-rw-r--r--src/plugin_manager/plugin_manager.c144
-rw-r--r--src/plugin_manager/plugin_manager_interna.h20
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c4
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp148
8 files changed, 173 insertions, 251 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c
index 5625cc9..f612b9a 100644
--- a/examples/stellar_plugin/simple_stellar_plugin.c
+++ b/examples/stellar_plugin/simple_stellar_plugin.c
@@ -4,7 +4,7 @@
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
#include "stellar/packet_exdata.h"
-#include "stellar/packet_mq.h"
+#include "stellar/stellar_mq.h"
#include <stdio.h>
#include <string.h>
@@ -182,7 +182,7 @@ static void simple_plugin_packet_exdata_free(struct packet *pkt, int idx, void *
assert(memcmp(env, exdata, sizeof(struct simple_stellar_plugin_env)) == 0);
}
-static void simple_plugin_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg)
+static void simple_plugin_packet_msg_free(void *msg, void *msg_free_arg)
{
struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)msg_free_arg;
assert(env);
@@ -218,10 +218,10 @@ void *simple_session_packet_plugin_init(struct stellar *st)
exit(-1);
}
- env->packet_topic_id=stellar_packet_mq_get_topic_id(st, "TOPIC_PACKET_ENV");
+ env->packet_topic_id=stellar_mq_get_topic_id(st, "TOPIC_PACKET_ENV");
if(env->packet_topic_id < 0)
{
- env->packet_topic_id=stellar_packet_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env);
+ env->packet_topic_id=stellar_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env);
}
tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_plugin_packet_get_exdata, env);
diff --git a/include/stellar/packet_mq.h b/include/stellar/packet_mq.h
deleted file mode 100644
index 94bb39b..0000000
--- a/include/stellar/packet_mq.h
+++ /dev/null
@@ -1,22 +0,0 @@
-#pragma once
-
-#include "stellar.h"
-
-//session mq
-typedef void packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg);
-typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
-
-//return topic_id
-int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name);
-
-int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id);
-
-//return 0 if success, otherwise return -1.
-int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only
-
-int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg);
-
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
deleted file mode 100644
index 3c62f69..0000000
--- a/include/stellar/session_mq.h
+++ /dev/null
@@ -1,36 +0,0 @@
-#pragma once
-
-#include "stellar.h"
-
-//session mq
-typedef void session_msg_free_cb_func(struct session *sess, void *msg, void *msg_free_arg);
-typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
-
-//return topic_id
-int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name);
-
-int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id);
-
-//return 0 if success, otherwise return -1.
-int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id);
-
-int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
-
-int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
-int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
-
-int session_mq_topic_is_active(struct session *sess, int topic_id);
-
-enum session_mq_priority
-{
- SESSION_MQ_PRIORITY_LOW,
- SESSION_MQ_PRIORITY_NORMAL,
- SESSION_MQ_PRIORITY_HIGH,
- SESSION_MQ_PRIORITY_MAX,
-};
-
-int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum session_mq_priority priority); \ No newline at end of file
diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h
new file mode 100644
index 0000000..4b0fb00
--- /dev/null
+++ b/include/stellar/stellar_mq.h
@@ -0,0 +1,42 @@
+#pragma once
+#include "stellar.h"
+
+//topic api
+typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg);
+
+//return topic_id
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
+int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_destroy_topic(struct stellar *st, int topic_id);
+
+
+enum stellar_mq_priority
+{
+ STELLAR_MQ_PRIORITY_LOW,
+ STELLAR_MQ_PRIORITY_NORMAL,
+ STELLAR_MQ_PRIORITY_HIGH,
+ STELLAR_MQ_PRIORITY_MAX,
+};
+
+//session mq api
+typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
+
+//return 0 if success, otherwise return -1.
+int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id);
+int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
+int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum stellar_mq_priority priority);
+
+int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
+int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
+
+int session_mq_topic_is_active(struct session *sess, int topic_id);
+
+
+//packet mq api
+
+typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg);
+int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *msg, enum stellar_mq_priority priority); \ No newline at end of file
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index a1c1d84..18b849d 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -107,11 +107,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
stellar_plugin_manager_schema_set(st, plug_mgr);
- plug_mgr->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
- plug_mgr->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL);
- plug_mgr->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
- plug_mgr->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
- plug_mgr->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
+ plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
+ plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL);
+ plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
+ plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
+ plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
{
@@ -126,8 +126,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return plug_mgr;
}
-static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array);
-
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
struct plugin_specific *p=NULL;
@@ -144,7 +142,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
{
- stellar_mq_destroy_topic( i, plug_mgr->stellar_mq_schema_array);
+ stellar_mq_destroy_topic( plug_mgr->st, i);
}
utarray_free(plug_mgr->stellar_mq_schema_array);
}
@@ -355,8 +353,11 @@ static void stellar_mq_topic_schema_dtor(void *_elt)
UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
-int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array)
+int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
+
if(topic_name == NULL || mq_schema_array == NULL )return -1;
unsigned int len = utarray_len(mq_schema_array);
struct stellar_mq_topic_schema *t_schema;
@@ -371,8 +372,10 @@ int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array)
return -1;
}
-int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg, UT_array *mq_schema_array)
+int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
if(mq_schema_array == NULL)return -1;
unsigned int len = utarray_len(mq_schema_array);
if(len < (unsigned int)topic_id)return -1;
@@ -383,14 +386,15 @@ int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg,
return 0;
}
-int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *msg_free_cb, void *msg_free_arg, UT_array **mq_schema_array)
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
- if(*mq_schema_array == NULL)
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->stellar_mq_schema_array == NULL)
{
- utarray_new(*mq_schema_array, &stellar_mq_topic_schema_icd);
+ utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd);
}
- unsigned int len = utarray_len(*mq_schema_array);
- if(stellar_mq_get_topic_id(topic_name, *mq_schema_array) >= 0)
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
+ if(stellar_mq_get_topic_id(st, topic_name) >= 0)
{
return -1;
}
@@ -402,18 +406,20 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
- utarray_push_back(*mq_schema_array, &t_schema);
+ utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema);
+ plug_mgr->stellar_mq_topic_num+=1;
return t_schema.topic_id;
}
-static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
+int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
{
- if(mq_schema_array==NULL)return -1;
- unsigned int len = utarray_len(mq_schema_array);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->stellar_mq_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
struct stellar_mq_topic_schema *topic =
- (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
if(topic == NULL)return -1;
@@ -426,10 +432,11 @@ static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
FREE(sub_elt);
}
topic->is_destroyed = 1;
+ plug_mgr->stellar_mq_topic_num-=1;
return 1; // success
}
-static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum session_mq_priority priority)
+static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority)
{
if(stellar_mq_schema==NULL || topic_id < 0)return -1;
unsigned int len = utarray_len(stellar_mq_schema);
@@ -576,8 +583,8 @@ static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct st
static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt)
{
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
- int cur_priority = SESSION_MQ_PRIORITY_HIGH;
- while(cur_priority >= SESSION_MQ_PRIORITY_LOW)
+ int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
+ while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
{
if(priority_mq[cur_priority]==NULL)
{
@@ -591,7 +598,7 @@ static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct st
DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
- cur_priority=SESSION_MQ_PRIORITY_HIGH;
+ cur_priority=STELLAR_MQ_PRIORITY_HIGH;
break;
}
}
@@ -608,8 +615,7 @@ static void stellar_mq_free(struct session *sess, struct packet *pkt, struct ste
(unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
- if(mq_elt->header.type==ON_SESSION_TOPIC)topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg);
- if(mq_elt->header.type==ON_PACKET_TOPIC)topic->pkt_msg_free_cb(pkt, mq_elt->body, topic->free_cb_arg);
+ topic->free_cb(mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(*head, mq_elt);
FREE(mq_elt);
@@ -619,37 +625,6 @@ static void stellar_mq_free(struct session *sess, struct packet *pkt, struct ste
/*******************************
* PACKET MQ *
*******************************/
-int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array);
- if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1;
- return topic_id;
-}
-
-int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array);
-}
-
-int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array);
-}
-
-int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array);
- if(ret==1)plug_mgr->packet_mq_topic_num-=1;
- return ret;
-}
//return 0 if success, otherwise return -1.
int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id)
@@ -678,7 +653,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
int tid = stellar_get_current_thread_id(plug_mgr->st);
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1;
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
- if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0)
+ if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,STELLAR_MQ_PRIORITY_HIGH)==0)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
return 0;
@@ -689,39 +664,8 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
/*******************************
* SESSION MQ *
*******************************/
-inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array);
-}
-
-int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array);
-}
-
-int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array);
- if(topic_id>=0)plug_mgr->session_mq_topic_num+=1;
- return topic_id;
-}
-
-int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array);
- if(ret==1)plug_mgr->session_mq_topic_num-=1;
- return ret;
-}
-int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority)
+int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
@@ -739,7 +683,7 @@ int session_mq_publish_message_with_priority(struct session *sess, int topic_id,
inline int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
- return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL);
+ return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
}
static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
@@ -766,7 +710,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
if(topic_id < 0 || plugin_id < 0)return -1;
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return -1;
- if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
+ if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
@@ -823,7 +767,7 @@ int session_mq_topic_is_active(struct session *sess, int topic_id)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
- if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
+ if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
return 1;
}
@@ -868,8 +812,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
- rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1);
- rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1);
+ rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1);
+ rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
if(plug_mgr->registered_session_plugin_array)
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
@@ -1050,7 +994,7 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
break;
}
plug_mgr_rt->pub_session_msg_cnt=0;
- session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, SESSION_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
return;
@@ -1060,7 +1004,7 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
@@ -1076,11 +1020,11 @@ void plugin_manager_on_session_closing(struct session *sess)
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
- session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, SESSION_MQ_PRIORITY_HIGH);
- session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, SESSION_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
break;
case SESSION_TYPE_UDP:
- session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, SESSION_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
break;
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 6ea3804..c85415d 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -2,12 +2,10 @@
#include "stellar/stellar.h"
-#include "stellar/session_exdata.h"
-#include "stellar/session_mq.h"
-
+#include "stellar/stellar_mq.h"
+#include "stellar/session_exdata.h"
#include "stellar/packet_exdata.h"
-#include "stellar/packet_mq.h"
#include "bitmap/bitmap.h"
@@ -23,7 +21,7 @@ struct stellar_message;
struct plugin_manger_per_thread_data
{
struct per_thread_exdata_array per_thread_pkt_exdata_array;
- struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list
+ struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
struct stellar_message *dealth_letter_queue;// dlq list
long long pub_packet_msg_cnt;
};
@@ -40,8 +38,7 @@ struct plugin_manager_schema
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
- int packet_mq_topic_num;
- int session_mq_topic_num;
+ int stellar_mq_topic_num;
int packet_topic_subscriber_num;
int session_topic_subscriber_num;
int tcp_topic_id;
@@ -91,7 +88,7 @@ struct stellar_message
{
int topic_id;
enum stellar_topic_type type;
- enum session_mq_priority priority;
+ enum stellar_mq_priority priority;
} header;
void *body;
struct stellar_message *next, *prev;
@@ -118,12 +115,7 @@ struct stellar_mq_topic_schema
int topic_id;
int subscriber_cnt;
int is_destroyed;
- union
- {
- void *free_cb;
- session_msg_free_cb_func *sess_msg_free_cb;
- packet_msg_free_cb_func *pkt_msg_free_cb;
- };
+ stellar_msg_free_cb_func *free_cb;
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index 05b6e0a..01b62bb 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -6,7 +6,7 @@
#include "stellar/utils.h"
#include "stellar/stellar.h"
-#include "stellar/session_mq.h"
+#include "stellar/stellar_mq.h"
#include "plugin_manager/plugin_manager.h"
@@ -76,7 +76,7 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path)
return NULL;
}
st->plug_mgr=pm;
- st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
+ st->tcp_stream_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM);
return st;
}
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index 3282625..ac134d4 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -31,10 +31,8 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL);
EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL);
- EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0);
-
int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array);
- EXPECT_EQ(plug_mgr->session_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL
+ EXPECT_EQ(plug_mgr->stellar_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id);
EXPECT_STREQ(topic->topic_name, TOPIC_TCP);
@@ -52,11 +50,11 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET);
//intrinsic topic
- EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_TCP), 0);
- EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0);
- EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_UDP), 0);
- EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_EGRESS), 0);
- EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_EGRESS), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0);
EXPECT_TRUE(plug_mgr->per_thread_data!=NULL);
int thread_num=stellar_get_worker_thread_num(st);
@@ -64,7 +62,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
{
EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL);
EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL);
- for(int j=0; j<SESSION_MQ_PRIORITY_MAX; j++)
+ for(int j=0; j<STELLAR_MQ_PRIORITY_MAX; j++)
EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL);
}
}
@@ -119,8 +117,8 @@ TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
plugin_manager_exit(plug_mgr);
}
-void test_mock_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){}
-void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){}
+void test_mock_packet_msg_free(void *msg, void *msg_free_arg){}
+void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){}
TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
struct stellar st={0};
@@ -129,9 +127,9 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
const char *topic_name="PACKET_TOPIC";
- EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
- int topic_id = stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
struct stellar_mq_topic_schema *topic_schema = NULL;
{
@@ -144,8 +142,8 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), topic_id);
- EXPECT_EQ(stellar_packet_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr),
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr),
-1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -157,7 +155,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_packet_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -170,15 +168,14 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM);
}
- EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, 10), -1); // illgeal topic_id
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id
- EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 1);
- EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0;
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0;
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema
- EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0);
}
plugin_manager_exit(plug_mgr);
}
@@ -196,7 +193,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) {
const char *topic_name="PACKET_TOPIC";
- int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
@@ -461,10 +458,9 @@ TEST(plugin_manager, packet_plugins_share_exdata) {
}
}
-static void test_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg)
+static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
- EXPECT_EQ(pkt, msg);
env->msg_free_cnt+=1;
return;
}
@@ -510,7 +506,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -563,7 +559,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) {
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
}
-static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg)
+static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
env->msg_free_cnt+=1;
@@ -629,7 +625,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
for(int i=0; i<topic_id_num; i++)
{
sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
- env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
EXPECT_GE(env.packet_topic_id[i], 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -695,12 +691,11 @@ static void test_exdata_free_pub_msg_exdata_free(struct packet *pkt, int idx, vo
return;
}
-static void test_exdata_free_pub_msg_free(struct packet *pkt, void *msg, void *msg_free_arg)
+static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
- EXPECT_EQ(pkt, msg);
env->msg_free_cnt+=1;
- EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), -1 );// publish message in packet msg_free is illegal
+ EXPECT_EQ(packet_mq_publish_message((struct packet *)msg, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal
return;
}
@@ -736,7 +731,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
env.packet_exdata_idx[0]=stellar_packet_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
- env.packet_topic_id[0]=stellar_packet_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env);
+ env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env);
EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
@@ -788,8 +783,8 @@ TEST(plugin_manager_init, session_exdata_new_index_overwrite) {
plugin_manager_exit(plug_mgr);
}
-void test_mock_session_msg_free(struct session *sess, void *msg, void *msg_free_arg){}
-void test_mock_overwrite_session_msg_free(struct session *sess, void *msg, void *msg_free_arg){}
+void test_mock_session_msg_free(void *msg, void *msg_free_arg){}
+void test_mock_overwrite_session_msg_free(void *msg, void *msg_free_arg){}
TEST(plugin_manager_init, session_mq_topic_create_and_update) {
struct stellar st={0};
@@ -798,9 +793,9 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
const char *topic_name="SESSION_TOPIC";
- EXPECT_EQ(stellar_session_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name
- int topic_id=stellar_session_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
EXPECT_GE(topic_id, 0);
struct stellar_mq_topic_schema *topic_schema;
{
@@ -813,8 +808,8 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_session_mq_get_topic_id(&st, topic_name), topic_id);
- EXPECT_EQ(stellar_session_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -826,7 +821,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
EXPECT_STREQ(topic_schema->topic_name, topic_name);
}
- EXPECT_EQ(stellar_session_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0);
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0);
{
SCOPED_TRACE("White-box test, check stellar internal schema");
@@ -840,16 +835,16 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) {
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic
}
- EXPECT_EQ(stellar_session_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id
- EXPECT_EQ(stellar_session_mq_destroy_topic(&st, topic_id), 1);
- EXPECT_EQ(stellar_session_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0;
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0;
{
SCOPED_TRACE("White-box test, check stellar internal schema");
EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema
}
- EXPECT_EQ(plug_mgr->session_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number
+ EXPECT_EQ(plug_mgr->stellar_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number
plugin_manager_exit(plug_mgr);
}
@@ -866,7 +861,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) {
const char *topic_name="SESSION_TOPIC";
- int topic_id=stellar_session_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
EXPECT_GE(topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id
@@ -1058,11 +1053,11 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) {
int plugin_id=stellar_session_plugin_register(&st, test_basic_session_ctx_new, test_basic_session_ctx_free, &env);
EXPECT_GE(plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0);
- env.intrinsc_egress_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_EGRESS);
+ env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_EGRESS);
EXPECT_GE(env.intrinsc_egress_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_egress, plugin_id), 0);
@@ -1173,10 +1168,9 @@ static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *m
return;
}
-static void test_session_msg_free(struct session *sess, void *msg, void *msg_free_arg)
+static void test_session_msg_free(void *msg, void *msg_free_arg)
{
struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
- EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
if(msg)
{
EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);
@@ -1202,11 +1196,11 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_mq_pub_session_ctx_new, test_mq_pub_session_ctx_free, &env);
EXPECT_GE(env.test_mq_pub_plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_mq_sub_session_ctx_new, test_mq_sub_session_ctx_free, &env);
@@ -1284,6 +1278,12 @@ static void test_overlimit_sub_session_ctx_free(struct session *sess, void *sess
return;
}
+struct test_overlimit_msg
+{
+ struct session *sess;
+ int called;
+};
+
static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
{
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
@@ -1291,15 +1291,16 @@ static void test_overlimit_pub_on_session(struct session *sess, int topic_id, co
EXPECT_TRUE(env!=NULL);
EXPECT_TRUE(ctx!=NULL);
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
- int *pub_msg;
+ struct test_overlimit_msg *pub_msg;
if (msg)
{
env->test_mq_pub_called += 1;
ctx->pkt_cnt += 1;
for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++)
{
- pub_msg = CALLOC(int, 1);
- *pub_msg = env->test_mq_pub_called;
+ pub_msg = CALLOC(struct test_overlimit_msg, 1);
+ pub_msg->called = env->test_mq_pub_called;
+ pub_msg->sess=sess;
if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg
{
EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0);
@@ -1318,24 +1319,26 @@ static void test_overlimit_pub_on_session(struct session *sess, int topic_id, co
static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
{
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx;
EXPECT_TRUE(env!=NULL);
EXPECT_TRUE(ctx!=NULL);
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
- EXPECT_EQ(*(int *)msg, env->test_mq_pub_called);
+ EXPECT_EQ(recv_msg->called, env->test_mq_pub_called);
env->test_mq_sub_called+=1;
ctx->sub_cnt+=1;
return;
}
-static void test_overlimit_session_msg_free(struct session *sess, void *msg, void *msg_free_arg)
+static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg)
{
struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
- EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
- EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free
- if(msg)
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
+ if(recv_msg)
{
- EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);
+ EXPECT_EQ(recv_msg->sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(session_mq_publish_message(recv_msg->sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free
+ EXPECT_EQ(env->test_mq_pub_called, recv_msg->called);
env->test_mq_free_called+=1;
FREE(msg);
}
@@ -1358,11 +1361,11 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env);
EXPECT_GE(env.test_mq_pub_plugin_id, 0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env);
@@ -1405,10 +1408,9 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
}
-static void test_dettach_msg_free(struct session *sess, void *msg, void *msg_free_arg)
+static void test_dettach_msg_free(void *msg, void *msg_free_arg)
{
struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
- EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
env->test_mq_free_called+=1;
return;
}
@@ -1468,11 +1470,11 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) {
int plugin_id=stellar_session_plugin_register(&st, test_dettach_session_ctx_new, test_dettach_session_ctx_free, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0);
@@ -1552,11 +1554,11 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) {
int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
// pesudo packet and session
@@ -1667,11 +1669,11 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env);
EXPECT_GE(plugin_id,0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0);
@@ -1795,7 +1797,7 @@ TEST(plugin_manager, test_session_mq_topic_is_active) {
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0);
@@ -1902,7 +1904,7 @@ TEST(plugin_manager, test_session_dettach) {
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0);
@@ -1968,7 +1970,7 @@ static void test_session_mq_priority_plugin_1_on_msg(struct session *sess, int t
if(topic_id == env->intrinsc_tcp_topic_id)
{
EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority
- EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, SESSION_MQ_PRIORITY_LOW), 0);
+ EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0);
}
if(topic_id == env->test_mq_topic_id)
{
@@ -2022,12 +2024,12 @@ TEST(plugin_manager, test_session_mq_priority) {
env.plugin_id_1=plugin_id_1;
env.plugin_id_2=plugin_id_2;
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
- env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
@@ -2110,7 +2112,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) {
env.plugin_id_1=stellar_session_plugin_register(&st, NULL, NULL, &env);
EXPECT_GE(env.plugin_id_1,0);
- env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0);