diff options
| author | yangwei <[email protected]> | 2024-07-24 10:38:06 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-24 11:46:42 +0800 |
| commit | f1b6662e4d561bb924efe4dc4256d4f34a5173a5 (patch) | |
| tree | a4092ccd6c54dcb48d69309a9bb388894537e4c3 | |
| parent | bba15d7ffdd3e9add76a2d1e287a0899369ac96b (diff) | |
🦄 refactor(stellar mq api): integration mq in one .h
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 8 | ||||
| -rw-r--r-- | include/stellar/packet_mq.h | 22 | ||||
| -rw-r--r-- | include/stellar/session_mq.h | 36 | ||||
| -rw-r--r-- | include/stellar/stellar_mq.h | 42 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 144 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 20 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 4 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 148 |
8 files changed, 173 insertions, 251 deletions
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index 5625cc9..f612b9a 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -4,7 +4,7 @@ #include "stellar/session_exdata.h" #include "stellar/session_mq.h" #include "stellar/packet_exdata.h" -#include "stellar/packet_mq.h" +#include "stellar/stellar_mq.h" #include <stdio.h> #include <string.h> @@ -182,7 +182,7 @@ static void simple_plugin_packet_exdata_free(struct packet *pkt, int idx, void * assert(memcmp(env, exdata, sizeof(struct simple_stellar_plugin_env)) == 0); } -static void simple_plugin_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg) +static void simple_plugin_packet_msg_free(void *msg, void *msg_free_arg) { struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)msg_free_arg; assert(env); @@ -218,10 +218,10 @@ void *simple_session_packet_plugin_init(struct stellar *st) exit(-1); } - env->packet_topic_id=stellar_packet_mq_get_topic_id(st, "TOPIC_PACKET_ENV"); + env->packet_topic_id=stellar_mq_get_topic_id(st, "TOPIC_PACKET_ENV"); if(env->packet_topic_id < 0) { - env->packet_topic_id=stellar_packet_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env); + env->packet_topic_id=stellar_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env); } tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_plugin_packet_get_exdata, env); diff --git a/include/stellar/packet_mq.h b/include/stellar/packet_mq.h deleted file mode 100644 index 94bb39b..0000000 --- a/include/stellar/packet_mq.h +++ /dev/null @@ -1,22 +0,0 @@ -#pragma once - -#include "stellar.h" - -//session mq -typedef void packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg); -typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env); - -//return topic_id -int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name); - -int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id); - -//return 0 if success, otherwise return -1. -int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only - -int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg); - diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h deleted file mode 100644 index 3c62f69..0000000 --- a/include/stellar/session_mq.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include "stellar.h" - -//session mq -typedef void session_msg_free_cb_func(struct session *sess, void *msg, void *msg_free_arg); -typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env); - -//return topic_id -int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name); - -int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id); - -//return 0 if success, otherwise return -1. -int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id); - -int session_mq_publish_message(struct session *sess, int topic_id, void *msg); - -int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); -int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); - -int session_mq_topic_is_active(struct session *sess, int topic_id); - -enum session_mq_priority -{ - SESSION_MQ_PRIORITY_LOW, - SESSION_MQ_PRIORITY_NORMAL, - SESSION_MQ_PRIORITY_HIGH, - SESSION_MQ_PRIORITY_MAX, -}; - -int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum session_mq_priority priority);
\ No newline at end of file diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h new file mode 100644 index 0000000..4b0fb00 --- /dev/null +++ b/include/stellar/stellar_mq.h @@ -0,0 +1,42 @@ +#pragma once +#include "stellar.h" + +//topic api +typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg); + +//return topic_id +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name); +int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_destroy_topic(struct stellar *st, int topic_id); + + +enum stellar_mq_priority +{ + STELLAR_MQ_PRIORITY_LOW, + STELLAR_MQ_PRIORITY_NORMAL, + STELLAR_MQ_PRIORITY_HIGH, + STELLAR_MQ_PRIORITY_MAX, +}; + +//session mq api +typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env); + +//return 0 if success, otherwise return -1. +int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id); +int session_mq_publish_message(struct session *sess, int topic_id, void *msg); +int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum stellar_mq_priority priority); + +int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); +int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); + +int session_mq_topic_is_active(struct session *sess, int topic_id); + + +//packet mq api + +typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env); +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg); +int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *msg, enum stellar_mq_priority priority);
\ No newline at end of file diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index a1c1d84..18b849d 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -107,11 +107,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char stellar_plugin_manager_schema_set(st, plug_mgr); - plug_mgr->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL); - plug_mgr->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL); - plug_mgr->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL); - plug_mgr->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); - plug_mgr->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); + plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL); + plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL); + plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL); + plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); + plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); for(int i = 0; i < spec_num; i++) { @@ -126,8 +126,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char return plug_mgr; } -static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array); - void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { struct plugin_specific *p=NULL; @@ -144,7 +142,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) { - stellar_mq_destroy_topic( i, plug_mgr->stellar_mq_schema_array); + stellar_mq_destroy_topic( plug_mgr->st, i); } utarray_free(plug_mgr->stellar_mq_schema_array); } @@ -355,8 +353,11 @@ static void stellar_mq_topic_schema_dtor(void *_elt) UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; -int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array) +int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name) { + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; + if(topic_name == NULL || mq_schema_array == NULL )return -1; unsigned int len = utarray_len(mq_schema_array); struct stellar_mq_topic_schema *t_schema; @@ -371,8 +372,10 @@ int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array) return -1; } -int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg, UT_array *mq_schema_array) +int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; if(mq_schema_array == NULL)return -1; unsigned int len = utarray_len(mq_schema_array); if(len < (unsigned int)topic_id)return -1; @@ -383,14 +386,15 @@ int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg, return 0; } -int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *msg_free_cb, void *msg_free_arg, UT_array **mq_schema_array) +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { - if(*mq_schema_array == NULL) + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->stellar_mq_schema_array == NULL) { - utarray_new(*mq_schema_array, &stellar_mq_topic_schema_icd); + utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd); } - unsigned int len = utarray_len(*mq_schema_array); - if(stellar_mq_get_topic_id(topic_name, *mq_schema_array) >= 0) + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); + if(stellar_mq_get_topic_id(st, topic_name) >= 0) { return -1; } @@ -402,18 +406,20 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms t_schema.free_cb_arg=msg_free_arg; t_schema.subscribers=NULL; t_schema.subscriber_cnt=0; - utarray_push_back(*mq_schema_array, &t_schema); + utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema); + plug_mgr->stellar_mq_topic_num+=1; return t_schema.topic_id; } -static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) +int stellar_mq_destroy_topic(struct stellar *st, int topic_id) { - if(mq_schema_array==NULL)return -1; - unsigned int len = utarray_len(mq_schema_array); + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->stellar_mq_schema_array==NULL)return -1; + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); if (len <= (unsigned int)topic_id) return -1; struct stellar_mq_topic_schema *topic = - (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); struct stellar_mq_subscriber *sub_elt, *sub_tmp; if(topic == NULL)return -1; @@ -426,10 +432,11 @@ static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) FREE(sub_elt); } topic->is_destroyed = 1; + plug_mgr->stellar_mq_topic_num-=1; return 1; // success } -static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum session_mq_priority priority) +static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority) { if(stellar_mq_schema==NULL || topic_id < 0)return -1; unsigned int len = utarray_len(stellar_mq_schema); @@ -576,8 +583,8 @@ static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct st static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt) { struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; - int cur_priority = SESSION_MQ_PRIORITY_HIGH; - while(cur_priority >= SESSION_MQ_PRIORITY_LOW) + int cur_priority = STELLAR_MQ_PRIORITY_HIGH; + while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) { if(priority_mq[cur_priority]==NULL) { @@ -591,7 +598,7 @@ static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct st DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt); DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list - cur_priority=SESSION_MQ_PRIORITY_HIGH; + cur_priority=STELLAR_MQ_PRIORITY_HIGH; break; } } @@ -608,8 +615,7 @@ static void stellar_mq_free(struct session *sess, struct packet *pkt, struct ste (unsigned int)(mq_elt->header.topic_id)); if (topic && topic->free_cb) { - if(mq_elt->header.type==ON_SESSION_TOPIC)topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg); - if(mq_elt->header.type==ON_PACKET_TOPIC)topic->pkt_msg_free_cb(pkt, mq_elt->body, topic->free_cb_arg); + topic->free_cb(mq_elt->body, topic->free_cb_arg); } DL_DELETE(*head, mq_elt); FREE(mq_elt); @@ -619,37 +625,6 @@ static void stellar_mq_free(struct session *sess, struct packet *pkt, struct ste /******************************* * PACKET MQ * *******************************/ -int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array); - if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1; - return topic_id; -} - -int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array); -} - -int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array); -} - -int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array); - if(ret==1)plug_mgr->packet_mq_topic_num-=1; - return ret; -} //return 0 if success, otherwise return -1. int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) @@ -678,7 +653,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data) int tid = stellar_get_current_thread_id(plug_mgr->st); if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1; if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1; - if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0) + if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,STELLAR_MQ_PRIORITY_HIGH)==0) { plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1; return 0; @@ -689,39 +664,8 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data) /******************************* * SESSION MQ * *******************************/ -inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array); -} - -int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array); -} - -int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array); - if(topic_id>=0)plug_mgr->session_mq_topic_num+=1; - return topic_id; -} - -int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array); - if(ret==1)plug_mgr->session_mq_topic_num-=1; - return ret; -} -int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority) +int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); @@ -739,7 +683,7 @@ int session_mq_publish_message_with_priority(struct session *sess, int topic_id, inline int session_mq_publish_message(struct session *sess, int topic_id, void *data) { - return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL); + return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL); } static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic) @@ -766,7 +710,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int if(topic_id < 0 || plugin_id < 0)return -1; struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return -1; - if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range + if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); if(topic==NULL)return -1; @@ -823,7 +767,7 @@ int session_mq_topic_is_active(struct session *sess, int topic_id) struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message - if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range + if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0; return 1; } @@ -868,8 +812,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1); rt->plug_mgr = plug_mgr; rt->sess = sess; - rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1); - rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1); + rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1); + rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); if(plug_mgr->registered_session_plugin_array) rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); @@ -1050,7 +994,7 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) break; } plug_mgr_rt->pub_session_msg_cnt=0; - session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, SESSION_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); return; @@ -1060,7 +1004,7 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH); int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish @@ -1076,11 +1020,11 @@ void plugin_manager_on_session_closing(struct session *sess) switch (session_get_type(sess)) { case SESSION_TYPE_TCP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, SESSION_MQ_PRIORITY_HIGH); - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, SESSION_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH); break; case SESSION_TYPE_UDP: - session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, SESSION_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); break; default: break; diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 6ea3804..c85415d 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -2,12 +2,10 @@ #include "stellar/stellar.h" -#include "stellar/session_exdata.h" -#include "stellar/session_mq.h" - +#include "stellar/stellar_mq.h" +#include "stellar/session_exdata.h" #include "stellar/packet_exdata.h" -#include "stellar/packet_mq.h" #include "bitmap/bitmap.h" @@ -23,7 +21,7 @@ struct stellar_message; struct plugin_manger_per_thread_data { struct per_thread_exdata_array per_thread_pkt_exdata_array; - struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list + struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list struct stellar_message *dealth_letter_queue;// dlq list long long pub_packet_msg_cnt; }; @@ -40,8 +38,7 @@ struct plugin_manager_schema UT_array *registered_session_plugin_array; UT_array *registered_packet_plugin_array; UT_array *registered_polling_plugin_array; - int packet_mq_topic_num; - int session_mq_topic_num; + int stellar_mq_topic_num; int packet_topic_subscriber_num; int session_topic_subscriber_num; int tcp_topic_id; @@ -91,7 +88,7 @@ struct stellar_message { int topic_id; enum stellar_topic_type type; - enum session_mq_priority priority; + enum stellar_mq_priority priority; } header; void *body; struct stellar_message *next, *prev; @@ -118,12 +115,7 @@ struct stellar_mq_topic_schema int topic_id; int subscriber_cnt; int is_destroyed; - union - { - void *free_cb; - session_msg_free_cb_func *sess_msg_free_cb; - packet_msg_free_cb_func *pkt_msg_free_cb; - }; + stellar_msg_free_cb_func *free_cb; struct stellar_mq_subscriber *subscribers; }__attribute__((aligned(sizeof(void*)))); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index 05b6e0a..01b62bb 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -6,7 +6,7 @@ #include "stellar/utils.h" #include "stellar/stellar.h" -#include "stellar/session_mq.h" +#include "stellar/stellar_mq.h" #include "plugin_manager/plugin_manager.h" @@ -76,7 +76,7 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path) return NULL; } st->plug_mgr=pm; - st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); + st->tcp_stream_topic_id=stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM); return st; } diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index 3282625..ac134d4 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -31,10 +31,8 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL); - EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0); - int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array); - EXPECT_EQ(plug_mgr->session_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL + EXPECT_EQ(plug_mgr->stellar_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_TCP); @@ -52,11 +50,11 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET); //intrinsic topic - EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_TCP), 0); - EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0); - EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_UDP), 0); - EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_EGRESS), 0); - EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_EGRESS), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0); EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); int thread_num=stellar_get_worker_thread_num(st); @@ -64,7 +62,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p { EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL); EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); - for(int j=0; j<SESSION_MQ_PRIORITY_MAX; j++) + for(int j=0; j<STELLAR_MQ_PRIORITY_MAX; j++) EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL); } } @@ -119,8 +117,8 @@ TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { plugin_manager_exit(plug_mgr); } -void test_mock_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){} -void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){} +void test_mock_packet_msg_free(void *msg, void *msg_free_arg){} +void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){} TEST(plugin_manager_init, packet_mq_topic_create_and_update) { struct stellar st={0}; @@ -129,9 +127,9 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { const char *topic_name="PACKET_TOPIC"; - EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name - int topic_id = stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); struct stellar_mq_topic_schema *topic_schema = NULL; { @@ -144,8 +142,8 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), topic_id); - EXPECT_EQ(stellar_packet_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr), + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -157,7 +155,7 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_packet_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -170,15 +168,14 @@ TEST(plugin_manager_init, packet_mq_topic_create_and_update) { EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); } - EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, 10), -1); // illgeal topic_id + EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id - EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 1); - EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0; + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0; { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema - EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0); } plugin_manager_exit(plug_mgr); } @@ -196,7 +193,7 @@ TEST(plugin_manager_init, packet_mq_subscribe) { const char *topic_name="PACKET_TOPIC"; - int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id @@ -461,10 +458,9 @@ TEST(plugin_manager, packet_plugins_share_exdata) { } } -static void test_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg) +static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; - EXPECT_EQ(pkt, msg); env->msg_free_cnt+=1; return; } @@ -510,7 +506,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { for(int i=0; i<topic_id_num; i++) { sprintf(topic_name[i], "PACKET_TOPIC_%d", i); - env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env); + env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env); EXPECT_GE(env.packet_topic_id[i], 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -563,7 +559,7 @@ TEST(plugin_manager, packet_plugins_mq_pub_sub) { EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } -static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg) +static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; env->msg_free_cnt+=1; @@ -629,7 +625,7 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) { for(int i=0; i<topic_id_num; i++) { sprintf(topic_name[i], "PACKET_TOPIC_%d", i); - env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env); + env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env); EXPECT_GE(env.packet_topic_id[i], 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -695,12 +691,11 @@ static void test_exdata_free_pub_msg_exdata_free(struct packet *pkt, int idx, vo return; } -static void test_exdata_free_pub_msg_free(struct packet *pkt, void *msg, void *msg_free_arg) +static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg) { struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; - EXPECT_EQ(pkt, msg); env->msg_free_cnt+=1; - EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), -1 );// publish message in packet msg_free is illegal + EXPECT_EQ(packet_mq_publish_message((struct packet *)msg, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal return; } @@ -736,7 +731,7 @@ TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); env.packet_exdata_idx[0]=stellar_packet_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); - env.packet_topic_id[0]=stellar_packet_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env); + env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env); EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); @@ -788,8 +783,8 @@ TEST(plugin_manager_init, session_exdata_new_index_overwrite) { plugin_manager_exit(plug_mgr); } -void test_mock_session_msg_free(struct session *sess, void *msg, void *msg_free_arg){} -void test_mock_overwrite_session_msg_free(struct session *sess, void *msg, void *msg_free_arg){} +void test_mock_session_msg_free(void *msg, void *msg_free_arg){} +void test_mock_overwrite_session_msg_free(void *msg, void *msg_free_arg){} TEST(plugin_manager_init, session_mq_topic_create_and_update) { struct stellar st={0}; @@ -798,9 +793,9 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { const char *topic_name="SESSION_TOPIC"; - EXPECT_EQ(stellar_session_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name - int topic_id=stellar_session_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); EXPECT_GE(topic_id, 0); struct stellar_mq_topic_schema *topic_schema; { @@ -813,8 +808,8 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_session_mq_get_topic_id(&st, topic_name), topic_id); - EXPECT_EQ(stellar_session_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -826,7 +821,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { EXPECT_STREQ(topic_schema->topic_name, topic_name); } - EXPECT_EQ(stellar_session_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0); + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0); { SCOPED_TRACE("White-box test, check stellar internal schema"); @@ -840,16 +835,16 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic } - EXPECT_EQ(stellar_session_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id + EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id - EXPECT_EQ(stellar_session_mq_destroy_topic(&st, topic_id), 1); - EXPECT_EQ(stellar_session_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0; + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0; { SCOPED_TRACE("White-box test, check stellar internal schema"); EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema } - EXPECT_EQ(plug_mgr->session_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number + EXPECT_EQ(plug_mgr->stellar_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number plugin_manager_exit(plug_mgr); } @@ -866,7 +861,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) { const char *topic_name="SESSION_TOPIC"; - int topic_id=stellar_session_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); EXPECT_GE(topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id @@ -1058,11 +1053,11 @@ TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) { int plugin_id=stellar_session_plugin_register(&st, test_basic_session_ctx_new, test_basic_session_ctx_free, &env); EXPECT_GE(plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0); - env.intrinsc_egress_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_EGRESS); + env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_EGRESS); EXPECT_GE(env.intrinsc_egress_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_egress, plugin_id), 0); @@ -1173,10 +1168,9 @@ static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *m return; } -static void test_session_msg_free(struct session *sess, void *msg, void *msg_free_arg) +static void test_session_msg_free(void *msg, void *msg_free_arg) { struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; - EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); if(msg) { EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); @@ -1202,11 +1196,11 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_mq_pub_session_ctx_new, test_mq_pub_session_ctx_free, &env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_mq_sub_session_ctx_new, test_mq_sub_session_ctx_free, &env); @@ -1284,6 +1278,12 @@ static void test_overlimit_sub_session_ctx_free(struct session *sess, void *sess return; } +struct test_overlimit_msg +{ + struct session *sess; + int called; +}; + static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) { struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; @@ -1291,15 +1291,16 @@ static void test_overlimit_pub_on_session(struct session *sess, int topic_id, co EXPECT_TRUE(env!=NULL); EXPECT_TRUE(ctx!=NULL); EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); - int *pub_msg; + struct test_overlimit_msg *pub_msg; if (msg) { env->test_mq_pub_called += 1; ctx->pkt_cnt += 1; for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++) { - pub_msg = CALLOC(int, 1); - *pub_msg = env->test_mq_pub_called; + pub_msg = CALLOC(struct test_overlimit_msg, 1); + pub_msg->called = env->test_mq_pub_called; + pub_msg->sess=sess; if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg { EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0); @@ -1318,24 +1319,26 @@ static void test_overlimit_pub_on_session(struct session *sess, int topic_id, co static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) { struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx; EXPECT_TRUE(env!=NULL); EXPECT_TRUE(ctx!=NULL); EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); - EXPECT_EQ(*(int *)msg, env->test_mq_pub_called); + EXPECT_EQ(recv_msg->called, env->test_mq_pub_called); env->test_mq_sub_called+=1; ctx->sub_cnt+=1; return; } -static void test_overlimit_session_msg_free(struct session *sess, void *msg, void *msg_free_arg) +static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg) { struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; - EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); - EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free - if(msg) + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + if(recv_msg) { - EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); + EXPECT_EQ(recv_msg->sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(session_mq_publish_message(recv_msg->sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free + EXPECT_EQ(env->test_mq_pub_called, recv_msg->called); env->test_mq_free_called+=1; FREE(msg); } @@ -1358,11 +1361,11 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) { env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env); EXPECT_GE(env.test_mq_pub_plugin_id, 0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env); @@ -1405,10 +1408,9 @@ TEST(plugin_manager, session_plugin_pub_msg_overlimt) { } -static void test_dettach_msg_free(struct session *sess, void *msg, void *msg_free_arg) +static void test_dettach_msg_free(void *msg, void *msg_free_arg) { struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; - EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); env->test_mq_free_called+=1; return; } @@ -1468,11 +1470,11 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) { int plugin_id=stellar_session_plugin_register(&st, test_dettach_session_ctx_new, test_dettach_session_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0); @@ -1552,11 +1554,11 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) { int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); // pesudo packet and session @@ -1667,11 +1669,11 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) { int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env); EXPECT_GE(plugin_id,0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); @@ -1795,7 +1797,7 @@ TEST(plugin_manager, test_session_mq_topic_is_active) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0); @@ -1902,7 +1904,7 @@ TEST(plugin_manager, test_session_dettach) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0); @@ -1968,7 +1970,7 @@ static void test_session_mq_priority_plugin_1_on_msg(struct session *sess, int t if(topic_id == env->intrinsc_tcp_topic_id) { EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority - EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, SESSION_MQ_PRIORITY_LOW), 0); + EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); } if(topic_id == env->test_mq_topic_id) { @@ -2022,12 +2024,12 @@ TEST(plugin_manager, test_session_mq_priority) { env.plugin_id_1=plugin_id_1; env.plugin_id_2=plugin_id_2; - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); - env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); @@ -2110,7 +2112,7 @@ TEST(plugin_manager, session_exdata_free_pub_msg) { env.plugin_id_1=stellar_session_plugin_register(&st, NULL, NULL, &env); EXPECT_GE(env.plugin_id_1,0); - env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0); |
