diff options
| author | yangwei <[email protected]> | 2024-07-16 18:24:23 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-17 11:28:35 +0800 |
| commit | 161aa7da1e82d939ba09f8dc211ffc5f216f963b (patch) | |
| tree | ac116d4baff4a6580d0d4c57d07d84eed5199449 | |
| parent | cc862437769735aa5ea31f5b875ea8e25afed726 (diff) | |
🦄 refactor(packet mq & exdata): backport packet mq & exdata
| -rw-r--r-- | .gitlab-ci.yml | 2 | ||||
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 39 | ||||
| -rw-r--r-- | include/stellar/packet_exdata.h | 8 | ||||
| -rw-r--r-- | include/stellar/packet_mq.h | 22 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 590 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.h | 2 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 40 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_internal.h | 4 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 13 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 579 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_mock.h | 4 |
11 files changed, 1056 insertions, 247 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 57264b9..7cdd2fd 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -122,7 +122,7 @@ test_in_centos8: script: - *everything_before_script - ls -l /opt/MESA/lib && echo "/opt/MESA/lib" >> /etc/ld.so.conf - - cd build; make test + - cd build; ctest -V dependencies: - develop_build_for_centos8 - release_build_for_centos8 diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index 467a38b..5625cc9 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -3,6 +3,8 @@ #include "stellar/utils.h" #include "stellar/session_exdata.h" #include "stellar/session_mq.h" +#include "stellar/packet_exdata.h" +#include "stellar/packet_mq.h" #include <stdio.h> #include <string.h> @@ -14,6 +16,8 @@ struct simple_stellar_plugin_env struct stellar *st; int session_plugin_id; int session_exdata_idx; + int packet_exdata_idx; + int packet_topic_id; int stat_topic_id; int egress_topic_id; int tcp_topic_id; @@ -130,6 +134,8 @@ static void simple_plugin_on_session_msg(struct session *sess, int topic_id, con void simple_plugin_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) { struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + packet_exdata_set(pkt, env->packet_exdata_idx, env); + packet_mq_publish_message(pkt, env->packet_topic_id, env); switch (ip_protocol) { case IPPROTO_TCP: @@ -152,6 +158,15 @@ void simple_plugin_on_packet(struct packet *pkt, unsigned char ip_protocol, voi return; } +void simple_plugin_packet_get_exdata(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; + struct simple_stellar_plugin_env *exdata = (struct simple_stellar_plugin_env *)packet_exdata_get(pkt, env->packet_exdata_idx); + assert(memcmp(env, exdata, sizeof(struct simple_stellar_plugin_env)) == 0); + return; +} + + int simple_plugin_on_polling(void *plugin_env) { struct simple_stellar_plugin_env *env = (struct simple_stellar_plugin_env *)plugin_env; @@ -203,6 +218,30 @@ void *simple_session_packet_plugin_init(struct stellar *st) exit(-1); } + env->packet_topic_id=stellar_packet_mq_get_topic_id(st, "TOPIC_PACKET_ENV"); + if(env->packet_topic_id < 0) + { + env->packet_topic_id=stellar_packet_mq_create_topic(st, "TOPIC_PACKET_ENV", simple_plugin_packet_msg_free, env); + } + + tcp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_TCP, simple_plugin_packet_get_exdata, env); + udp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_UDP, simple_plugin_packet_get_exdata, env); + icmp_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMP, simple_plugin_packet_get_exdata, env); + icmp6_plugin_id=stellar_packet_plugin_register(st, IPPROTO_ICMPV6, simple_plugin_packet_get_exdata, env); + + if(tcp_plugin_id < 0 || udp_plugin_id < 0 || icmp_plugin_id < 0 || icmp6_plugin_id < 0) + { + perror("register packet plugin get exdata return invalid plugin id\n"); + exit(-1); + } + + stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, tcp_plugin_id); + stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, udp_plugin_id); + stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, icmp_plugin_id); + stellar_packet_mq_subscribe(st, env->packet_topic_id, simple_plugin_on_packet_msg_cb, icmp6_plugin_id); + + env->packet_exdata_idx=stellar_packet_exdata_new_index(st, "EXDATA_PACKET_ENV", simple_plugin_packet_exdata_free, env); + int polling_plugin_id=stellar_polling_plugin_register(st, simple_plugin_on_polling, env); if(polling_plugin_id < 0) { diff --git a/include/stellar/packet_exdata.h b/include/stellar/packet_exdata.h new file mode 100644 index 0000000..7620eae --- /dev/null +++ b/include/stellar/packet_exdata.h @@ -0,0 +1,8 @@ +#pragma once + +#include "stellar.h" + +typedef void packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg); +int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *arg); +int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr); +void *packet_exdata_get(struct packet *pkt, int idx);
\ No newline at end of file diff --git a/include/stellar/packet_mq.h b/include/stellar/packet_mq.h new file mode 100644 index 0000000..94bb39b --- /dev/null +++ b/include/stellar/packet_mq.h @@ -0,0 +1,22 @@ +#pragma once + +#include "stellar.h" + +//session mq +typedef void packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg); +typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env); + +//return topic_id +int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg); + +int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name); + +int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg); + +int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id); + +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only + +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg); + diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 83b1ad6..218b23e 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -1,17 +1,12 @@ #include "plugin_manager_interna.h" - #include "stellar_internal.h" - #include "stellar/session.h" - #include "stellar/utils.h" #include "toml/toml.h" #include "uthash/utlist.h" - UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; - static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) { *spec_num = 0; @@ -70,6 +65,28 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } +static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) +{ + if(st == NULL)return NULL; + int thread_num=stellar_get_worker_thread_num(st); + struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num); + return per_thread_data; +} + +static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st) +{ + if(per_thread_data == NULL || st == NULL)return; + int thread_num=stellar_get_worker_thread_num(st); + struct plugin_manger_per_thread_data *p_data; + for (int i = 0; i < thread_num; i++) + { + p_data=per_thread_data+i; + if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array); + } + FREE(per_thread_data); + return; +} + struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { int spec_num; @@ -105,9 +122,11 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char } } FREE(specs); + plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); return plug_mgr; } +static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { @@ -121,14 +140,15 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->plugin_load_specs_array); } - if(plug_mgr->session_mq_schema_array) + if(plug_mgr->stellar_mq_schema_array) { - for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++) + for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) { - stellar_session_mq_destroy_topic(plug_mgr->st, i); + stellar_mq_destroy_topic( i, plug_mgr->stellar_mq_schema_array); } - utarray_free(plug_mgr->session_mq_schema_array); + utarray_free(plug_mgr->stellar_mq_schema_array); } + if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array); if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); if(plug_mgr->registered_packet_plugin_array) @@ -149,6 +169,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_session_plugin_array); } + plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); FREE(plug_mgr); return; } @@ -223,9 +244,65 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_ } /******************************* - * SESSION EXDATA * + * PACKET EXDATA * *******************************/ +static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL; + int tid=stellar_get_current_thread_id(plug_mgr->st); + if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL) + { + unsigned int len = utarray_len(plug_mgr->packet_exdata_schema_array); + plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len); + } + return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array; +} + +static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return; + unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); + struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr); + if(per_thread_pkt_exdata_arrary == NULL)return; + for (unsigned int i = 0; i < len; i++) + { + void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata; + struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i); + if (exdata) + { + if (schema->pkt_free_func) + { + schema->pkt_free_func(pkt, i, exdata, schema->free_arg); + } + (per_thread_pkt_exdata_arrary + i)->exdata=NULL; + } + } +} + +int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_exdata_new_index(st, name, &plug_mgr->packet_exdata_schema_array, (void*)free_func, free_arg); +} +int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr) +{ + if(pkt == NULL)return -1; + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); + return stellar_exdata_set(plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr); +} + +void *packet_exdata_get(struct packet *pkt, int idx) +{ + if(pkt == NULL)return NULL; + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); + return stellar_exdata_get( plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx); +} + +/******************************* + * SESSION EXDATA * + *******************************/ int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); @@ -252,7 +329,6 @@ void *session_exdata_get(struct session *sess, int idx) /******************************* * STELLAR MQ * *******************************/ - static void stellar_mq_topic_schema_copy(void *_dst, const void *_src) { struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst, @@ -326,7 +402,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms return t_schema.topic_id; } -int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) +static int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) { if(mq_schema_array==NULL)return -1; unsigned int len = utarray_len(mq_schema_array); @@ -349,27 +425,176 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) return 1; // success } -static int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq[], enum session_mq_priority priority) +static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum session_mq_priority priority) { - if(mq_schema_array==NULL || topic_id < 0)return -1; - unsigned int len = utarray_len(mq_schema_array); + if(stellar_mq_schema==NULL || topic_id < 0)return -1; + unsigned int len = utarray_len(stellar_mq_schema); if (len <= (unsigned int)topic_id)return -1; struct stellar_message *msg= CALLOC(struct stellar_message,1); msg->header.topic_id = topic_id; + msg->header.type=type; msg->header.priority = priority; msg->body = data; - DL_APPEND(mq[priority], msg); + DL_APPEND(priority_mq[priority], msg); return 0; } UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; +static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info) +{ + if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1; -/******************************* - * SESSION MQ * - *******************************/ + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + // if plugin already subscribe current topic, return 0 + struct stellar_mq_subscriber_info *p=NULL; + while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + { + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } + } + }; + + struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->plugin_idx = plugin_idx; + new_subscriber->msg_cb = plugin_on_msg_cb; + DL_APPEND(topic->subscribers, new_subscriber); + + struct stellar_mq_subscriber_info sub_info; + memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); + sub_info.topic_id=topic_id; + sub_info.subscriber_idx=topic->subscriber_cnt; + utarray_push_back(registed_mq_subscriber_info, &sub_info); + topic->subscriber_cnt+=1; + plug_mgr->session_topic_subscriber_num+=1; + return 0; +} + +static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt) +{ + struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct registered_session_plugin_schema *session_plugin_schema; + struct session_plugin_ctx_runtime *plugin_ctx_rt; + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, + (unsigned int)(mq_elt->header.topic_id)); + + if (topic) + { + int cur_sub_idx = 0; + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx; + if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) + { + plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr( + plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (session_plugin_schema) + { + if (plugin_ctx_rt->state == INIT) + { + if (session_plugin_schema->on_ctx_new) + { + plugin_ctx_rt->plugin_ctx = + session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); + if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) + { + session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + plugin_ctx_rt->plugin_ctx = NULL; + } + else + { + plugin_ctx_rt->state = ACTIVE; + } + } + } + if (sub_elt->sess_msg_cb && + bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != + 0) // ctx_new maybe call detach, need check again + { + sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + } + } + } + cur_sub_idx++; + } + if (cur_sub_idx == 0) + bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); + } +} -void session_mq_free(struct session *sess, struct stellar_message **head, UT_array *mq_schema_array) +static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt) +{ + struct stellar *st = packet_stellar_get(pkt); + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct registered_packet_plugin_schema *packet_plugin_schema; + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, + (unsigned int)(mq_elt->header.topic_id)); + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + if (sub_elt->pkt_msg_cb) + { + packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr( + plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (packet_plugin_schema) + { + sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env); + } + } + } + } +} + +static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt) +{ + struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; + int cur_priority = SESSION_MQ_PRIORITY_HIGH; + while(cur_priority >= SESSION_MQ_PRIORITY_LOW) + { + if(priority_mq[cur_priority]==NULL) + { + cur_priority--; + continue; + } + DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp) + { + if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt); + if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt); + DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt); + DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list + + cur_priority=SESSION_MQ_PRIORITY_HIGH; + break; + } + } + return; +} + +static void stellar_mq_free(struct session *sess, struct packet *pkt, struct stellar_message **head, UT_array *mq_schema_array) { struct stellar_message *mq_elt, *tmp; struct stellar_mq_topic_schema *topic; @@ -379,33 +604,105 @@ void session_mq_free(struct session *sess, struct stellar_message **head, UT_arr (unsigned int)(mq_elt->header.topic_id)); if (topic && topic->free_cb) { - topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg); + if(mq_elt->header.type==ON_SESSION_TOPIC)topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg); + if(mq_elt->header.type==ON_PACKET_TOPIC)topic->pkt_msg_free_cb(pkt, mq_elt->body, topic->free_cb_arg); } DL_DELETE(*head, mq_elt); FREE(mq_elt); } - FREE(*head); } +/******************************* + * PACKET MQ * + *******************************/ +int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array); + if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1; + return topic_id; +} + +int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array); +} + +int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array); +} + +int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array); + if(ret==1)plug_mgr->packet_mq_topic_num-=1; + return ret; +} + +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) +{ + if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin + int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; + + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; + + struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); + if(packet_plugin_schema==NULL)return -1; + + if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL) + { + utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd); + } + + return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info); +} + +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data) +{ + struct stellar *st = packet_stellar_get(pkt); + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + int tid = stellar_get_current_thread_id(plug_mgr->st); + if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1; + if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0) + { + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1; + return 0; + } + return -1; +} + +/******************************* + * SESSION MQ * + *******************************/ inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); - return stellar_mq_get_topic_id(topic_name, plug_mgr->session_mq_schema_array); + return stellar_mq_get_topic_id(topic_name, plug_mgr->stellar_mq_schema_array); } int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); - return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->session_mq_schema_array); + return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->stellar_mq_schema_array); } int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); - int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->session_mq_schema_array); + int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->stellar_mq_schema_array); if(topic_id>=0)plug_mgr->session_mq_topic_num+=1; return topic_id; } @@ -414,26 +711,27 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); - int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array); + int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->stellar_mq_schema_array); if(ret==1)plug_mgr->session_mq_topic_num-=1; return ret; } - - int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; - int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->priority_mq, priority); - if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1; - return ret; + int tid = stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); + if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0) + { + plug_mgr_rt->pub_session_msg_cnt+=1; + return 0; + } + return -1; } - -int session_mq_publish_message(struct session *sess, int topic_id, void *data) +inline int session_mq_publish_message(struct session *sess, int topic_id, void *data) { return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL); } @@ -463,7 +761,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return -1; if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); if(topic==NULL)return -1; struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id); @@ -486,170 +784,32 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int return -1; } - -int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id) +inline int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id) { return session_mq_set_message_status(sess, topic_id, plugin_id, 0); } -int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id) +inline int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id) { return session_mq_set_message_status(sess, topic_id, plugin_id, 1); } - int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id) { - if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin + if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr == NULL || plug_mgr->session_mq_schema_array==NULL || plug_mgr->registered_session_plugin_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; + if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1; struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id); - if(session_plugin_schema==NULL)return -1; - - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); - if(topic==NULL)return -1; - + if(session_plugin_schema==NULL)return -1; if(session_plugin_schema->registed_session_mq_subscriber_info==NULL) { utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd); - } - - // if plugin already subscribe current topic, return 0 - struct stellar_mq_subscriber_info *p=NULL; - while( (p=(struct stellar_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) - { - if(p->topic_id==topic_id) - { - struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; - int cnt=0; - while(tmp_subscriber) - { - if(cnt==p->subscriber_idx) - { - tmp_subscriber->sess_msg_cb=plugin_on_msg_cb; - return 0; - } - cnt++; - tmp_subscriber=tmp_subscriber->next; - } - } - }; - - struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_idx = plugin_id; - new_subscriber->sess_msg_cb = plugin_on_msg_cb; - DL_APPEND(topic->subscribers, new_subscriber); - - struct stellar_mq_subscriber_info sub_info; - memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); - sub_info.topic_id=topic_id; - sub_info.subscriber_idx=topic->subscriber_cnt; - utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info); - topic->subscriber_cnt+=1; - plug_mgr->session_topic_subscriber_num+=1; - return 0; -} - -static void session_mq_dispatch_one_message(struct session *sess, struct stellar_message *mq_elt) -{ - struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); - struct stellar_mq_subscriber *sub_elt, *sub_tmp; - struct registered_session_plugin_schema *session_plugin_schema; - struct session_plugin_ctx_runtime *plugin_ctx_rt; - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( - plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)(mq_elt->header.topic_id)); - - if (topic) - { - int cur_sub_idx = 0; - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx; - if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) - { - plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx); - session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr( - plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); - if (session_plugin_schema) - { - if (plugin_ctx_rt->state == INIT) - { - if (session_plugin_schema->on_ctx_new) - { - plugin_ctx_rt->plugin_ctx = - session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) - { - session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, - session_plugin_schema->plugin_env); - plugin_ctx_rt->plugin_ctx = NULL; - } - else - { - plugin_ctx_rt->state = ACTIVE; - } - } - } - if (sub_elt->sess_msg_cb && - bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != - 0) // ctx_new maybe call detach, need check again - { - sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, - session_plugin_schema->plugin_env); - } - } - } - cur_sub_idx++; - } - if (cur_sub_idx == 0) - bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); - } -} - -static void plugin_manager_session_message_dispatch(struct session *sess) -{ - struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); - if(plug_mgr_rt==NULL)return; - - struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; - - int cur_priority = SESSION_MQ_PRIORITY_HIGH; - while(cur_priority >= SESSION_MQ_PRIORITY_LOW) - { - if(plug_mgr_rt->priority_mq[cur_priority]==NULL) - { - cur_priority--; - continue; - } - DL_FOREACH_SAFE(plug_mgr_rt->priority_mq[cur_priority], mq_elt, mq_tmp) - { - session_mq_dispatch_one_message(sess, mq_elt); - DL_DELETE(plug_mgr_rt->priority_mq[mq_elt->header.priority], mq_elt); - DL_APPEND(plug_mgr_rt->dealth_letter_queue, mq_elt); // move to dlq list - - cur_priority=SESSION_MQ_PRIORITY_HIGH; - break; - } - } - -#if 0 - while (plug_mgr_rt->pending_mq != NULL) - { - DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) - { - session_mq_dispatch_one_message(sess, mq_elt); - DL_DELETE(plug_mgr_rt->pending_mq[mq_elt->header.priority], mq_elt); - DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt); // move to delivered message list - } - } -#endif - return; + } + //session plugin id equals to plugin idx + return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info); } int session_mq_topic_is_active(struct session *sess, int topic_id) @@ -665,7 +825,6 @@ int session_mq_topic_is_active(struct session *sess, int topic_id) /******************************* * PLUGIN MANAGER SESSION RUNTIME * *******************************/ - static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr) { struct stellar_exdata *exdata_rt = NULL; @@ -702,8 +861,6 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1); rt->plug_mgr = plug_mgr; rt->sess = sess; - memset(rt->priority_mq, 0, sizeof(rt->priority_mq)); - rt->dealth_letter_queue = NULL; rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1); rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); @@ -716,21 +873,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; - assert(rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); - assert(rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); - assert(rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); - for(int i=0; i < SESSION_MQ_PRIORITY_MAX; i++) - { - if(rt->priority_mq[i] != NULL) - { - session_mq_free(rt->sess, &rt->priority_mq[i], rt->plug_mgr->session_mq_schema_array); - } - } - assert(rt->dealth_letter_queue==NULL); - if(rt->dealth_letter_queue != NULL) - { - session_mq_free(rt->sess, &rt->dealth_letter_queue, rt->plug_mgr->session_mq_schema_array); - } + if(rt->session_mq_status != NULL) { bitmap_free(rt->session_mq_status); @@ -752,7 +895,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE) { session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, - session_plugin_schema->plugin_env); + session_plugin_schema->plugin_env); } } FREE(rt->plugin_ctx_array); @@ -766,9 +909,6 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) /********************************************* * PLUGIN MANAGER PACKET PLUGIN * *********************************************/ - - - UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env) @@ -792,6 +932,9 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; unsigned char ip_proto=packet_get_ip_protocol(pkt); + int tid=stellar_get_current_thread_id(plug_mgr->st); + //TODO : provide public api to reset pub_msg_cnt + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet) @@ -799,14 +942,22 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st p->on_packet(pkt, ip_proto, p->plugin_env); } } + stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); return; } +void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; + per_thread_packet_exdata_arrary_clean(plug_mgr, pkt); + stellar_mq_free(NULL, pkt, + &plug_mgr->per_thread_data[stellar_get_current_thread_id(plug_mgr->st)].dealth_letter_queue, + plug_mgr->stellar_mq_schema_array); +} + /********************************************* * PLUGIN MANAGER POLLING PLUGIN * *********************************************/ - - UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) @@ -845,12 +996,8 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) /********************************************* * PLUGIN MANAGER SESSION PLUGIN * *********************************************/ - - - UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL}; - int stellar_session_plugin_register(struct stellar *st, session_ctx_new_func session_ctx_new, session_ctx_free_func session_ctx_free, @@ -894,7 +1041,8 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) } plug_mgr_rt->pub_session_msg_cnt=0; session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, SESSION_MQ_PRIORITY_HIGH); - plugin_manager_session_message_dispatch(sess); + int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); return; } @@ -903,12 +1051,9 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH); - plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); - assert(plug_mgr_rt->dealth_letter_queue==NULL); + int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); + stellar_mq_free(plug_mgr_rt->sess,pkt, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); return; } @@ -928,12 +1073,9 @@ void plugin_manager_on_session_closing(struct session *sess) default: break; } - plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); - assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); - assert(plug_mgr_rt->dealth_letter_queue==NULL); + int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL); + stellar_mq_free(plug_mgr_rt->sess,NULL,&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); return; } @@ -952,11 +1094,10 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) { struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id); session_mq_update_topic_status(plug_mgr_rt, topic); } } - //dettach in ctx INIT, do not call on_ctx_free immidiately if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free)) { @@ -966,5 +1107,4 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT; return; -} - +}
\ No newline at end of file diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h index 23c4297..cf07167 100644 --- a/src/plugin_manager/plugin_manager.h +++ b/src/plugin_manager/plugin_manager.h @@ -9,7 +9,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); - +void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); //return polling work state, 0: idle, 1: working int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 82b2e7c..4f97e9a 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -5,21 +5,44 @@ #include "stellar/session_exdata.h" #include "stellar/session_mq.h" + +#include "stellar/packet_exdata.h" +#include "stellar/packet_mq.h" + + #include "bitmap/bitmap.h" #include "uthash/utarray.h" +struct per_thread_exdata_array +{ + struct stellar_exdata *exdata_array; +}; + struct stellar_message; +struct plugin_manger_per_thread_data +{ + struct per_thread_exdata_array per_thread_pkt_exdata_array; + struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list + struct stellar_message *dealth_letter_queue;// dlq list + long long pub_packet_msg_cnt; +}; + + + struct plugin_manager_schema { struct stellar *st; UT_array *plugin_load_specs_array; + UT_array *packet_exdata_schema_array; UT_array *session_exdata_schema_array; - UT_array *session_mq_schema_array; + UT_array *stellar_mq_schema_array; UT_array *registered_session_plugin_array; UT_array *registered_packet_plugin_array; UT_array *registered_polling_plugin_array; + int packet_mq_topic_num; int session_mq_topic_num; + int packet_topic_subscriber_num; int session_topic_subscriber_num; int tcp_topic_id; int tcp_stream_topic_id; @@ -27,6 +50,7 @@ struct plugin_manager_schema int egress_topic_id; int control_packet_topic_id; int max_message_dispatch; + struct plugin_manger_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); @@ -44,17 +68,26 @@ struct stellar_exdata_schema { void *free_func; session_exdata_free *sess_free_func; + packet_exdata_free *pkt_free_func; }; void *free_arg; int idx; }__attribute__((aligned(sizeof(void*)))); + +enum stellar_topic_type +{ + ON_SESSION_TOPIC, + ON_PACKET_TOPIC, +}; + struct stellar_message { struct { int topic_id; + enum stellar_topic_type type; enum session_mq_priority priority; } header; void *body; @@ -68,6 +101,8 @@ typedef struct stellar_mq_subscriber union { on_session_msg_cb_func *sess_msg_cb; + on_packet_msg_cb_func *pkt_msg_cb; + void *msg_cb; }; struct stellar_mq_subscriber *next, *prev; }stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); @@ -84,6 +119,7 @@ struct stellar_mq_topic_schema { void *free_cb; session_msg_free_cb_func *sess_msg_free_cb; + packet_msg_free_cb_func *pkt_msg_free_cb; }; struct stellar_mq_subscriber *subscribers; }__attribute__((aligned(sizeof(void*)))); @@ -104,8 +140,6 @@ struct plugin_manager_runtime { struct plugin_manager_schema *plug_mgr; struct session *sess; - struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list - struct stellar_message *dealth_letter_queue;// dlq list struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber struct bitmap *session_topic_status; //N bits, N topic struct stellar_exdata *sess_exdata_array; diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h index 3358e7c..e42dfed 100644 --- a/src/stellar_on_sapp/stellar_internal.h +++ b/src/stellar_on_sapp/stellar_internal.h @@ -22,4 +22,6 @@ enum packet_type CONTROL, }; -enum packet_type packet_get_type(const struct packet *pkt);
\ No newline at end of file +enum packet_type packet_get_type(const struct packet *pkt); + +struct stellar * packet_stellar_get(struct packet *pkt);
\ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index 72935c6..04a67e8 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -41,6 +41,11 @@ struct session struct plugin_manager_runtime *plug_mgr_rt; }__attribute__((aligned(sizeof(void*)))); +inline struct stellar * packet_stellar_get(struct packet *pkt) +{ + return pkt->st; +} + inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st) { @@ -140,6 +145,8 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c struct packet *pkt = &sess->cur_pkt; pkt->raw_pkt=raw_pkt; pkt->type=type; + pkt->sess=sess; + pkt->st=sess->st; if(raw_pkt)plugin_manager_on_session_ingress(sess, pkt); //check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead @@ -162,6 +169,7 @@ void session_defer_on_sapp(struct session *sess) if(pkt->raw_pkt) { plugin_manager_on_session_egress(sess, pkt); + plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt); } } sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt @@ -197,6 +205,11 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void } //FIXME: defer TCP/UDP packet on session update plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); + if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) + { + plugin_manager_on_packet_egress(st->plug_mgr, &pkt); + } + } inline int polling_on_sapp(struct stellar *st) diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index eb1cebc..381c6d5 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -3,6 +3,9 @@ #include "stellar/utils.h" #include "plugin_manager_gtest_mock.h" +#define STELLAR_INTRINSIC_TOPIC_NUM 5 +#define TOPIC_NAME_MAX 512 + void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) { SCOPED_TRACE("whitebox test intrisic metadata"); @@ -15,34 +18,37 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL); //packet exdata & mq schema null + EXPECT_TRUE(plug_mgr->packet_exdata_schema_array==NULL); //session exdata schema null EXPECT_TRUE(plug_mgr->session_exdata_schema_array==NULL); //session mq schema not null - EXPECT_TRUE(plug_mgr->session_mq_schema_array!=NULL); + EXPECT_TRUE(plug_mgr->stellar_mq_schema_array!=NULL); //registered plugin array null EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL); - int intrinsic_topic_num=utarray_len(plug_mgr->session_mq_schema_array); + EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0); + + int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array); EXPECT_EQ(plug_mgr->session_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_TCP); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_TCP_STREAM); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_UDP); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_EGRESS); - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id); EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET); //intrinsic topic @@ -51,6 +57,16 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_UDP), 0); EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_EGRESS), 0); EXPECT_GE(stellar_session_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0); + + EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); + int thread_num=stellar_get_worker_thread_num(st); + for(int i=0; i<thread_num; i++) + { + EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL); + EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); + for(int j=0; j<SESSION_MQ_PRIORITY_MAX; j++) + EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL); + } } /*********************************** @@ -67,25 +83,220 @@ TEST(plugin_manager_init, init_with_null_toml) { plugin_manager_exit(plug_mgr); } +/****************************************** + * TEST PLUGIN MANAGER PACKET PLUGIN INIT * + ******************************************/ + +static void test_mock_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg){} + +static void test_mock_overwrite_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg){} + +TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *exdata_name="PACKET_EXDATA"; + int exdata_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st); + EXPECT_GE(exdata_idx, 0); + int overwrite_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr); + EXPECT_GE(overwrite_idx, 0); + EXPECT_EQ(overwrite_idx, exdata_idx); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr( + plug_mgr->packet_exdata_schema_array, (unsigned int)exdata_idx); + EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, plug_mgr); + EXPECT_EQ(exdata_schema->idx, exdata_idx); + EXPECT_STREQ(exdata_schema->name, exdata_name); + + int exdata_num = utarray_len(plug_mgr->packet_exdata_schema_array); + EXPECT_EQ(exdata_num, 1); + } + + plugin_manager_exit(plug_mgr); +} + +void test_mock_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){} +void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *msg_free_arg){} + +TEST(plugin_manager_init, packet_mq_topic_create_and_update) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="PACKET_TOPIC"; + + EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name + + int topic_id = stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + struct stellar_mq_topic_schema *topic_schema = NULL; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_packet_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_packet_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr), + -1); // duplicate create, return error + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_packet_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); + } + + EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, 10), -1); // illgeal topic_id + + EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_packet_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0; + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema + EXPECT_EQ(plug_mgr->packet_mq_topic_num, 0); + } + plugin_manager_exit(plug_mgr); +} + +void test_mock_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){} + +void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){} + +TEST(plugin_manager_init, packet_mq_subscribe) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id + EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + plugin_manager_exit(plug_mgr); +} + + /******************************************* * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME* *******************************************/ #define PACKET_PROTO_PLUGIN_NUM 128 +#define PACKET_EXDATA_NUM 2 +#define PACKET_TOPIC_NUM 2 +#define PACKET_MQ_SUB_NUM 2 struct packet_plugin_env { struct plugin_manager_schema *plug_mgr; int basic_on_packet_called; int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM]; int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM]; + int exdata_set_on_packet_called; + int exdata_get_on_packet_called; + unsigned int packet_exdata_idx[PACKET_EXDATA_NUM]; + int exdata_free_called[PACKET_EXDATA_NUM]; + unsigned int packet_topic_id[PACKET_TOPIC_NUM]; + unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM]; + int msg_pub_cnt; + int msg_sub_cnt; + int msg_free_cnt; }; +static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + env->basic_on_packet_called+=1; + return; +} + +TEST(plugin_manager, packet_plugin_illegal_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array); + EXPECT_EQ(packet_plugin_num, 1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); +} + static void test_proto_filter_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) { struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; EXPECT_TRUE(env!=NULL); EXPECT_EQ(pkt->ip_proto, ip_protocol); EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get env->proto_filter_plugin_called[ip_protocol]+=1; return; } @@ -123,6 +334,7 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) { { pkt.ip_proto = i; plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); } } plugin_manager_exit(plug_mgr); @@ -133,6 +345,341 @@ TEST(plugin_manager, packet_plugins_with_proto_filter) { } } +static void test_exdata_set_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->exdata_set_on_packet_called+=1; + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; i<exdata_idx_len; i++) + { + long long *exdata_val=CALLOC(long long , 1); + *exdata_val=i; + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0); + } + return; +} + +static void test_exdata_get_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; i<exdata_idx_len; i++) + { + long long *exdata_val=(long long *)packet_exdata_get(pkt, env->packet_exdata_idx[i]); + EXPECT_EQ(*exdata_val, i); + } + env->exdata_get_on_packet_called+=1; + return; +} + +static void test_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + EXPECT_EQ(*(long long *)ex_ptr, idx); + FREE(ex_ptr); + env->exdata_free_called[idx]+=1; + return; +} + + +TEST(plugin_manager, packet_plugins_share_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX]; + int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0])); + for(int i=0; i<exdata_idx_len; i++) + { + sprintf(exdata_name[i], "PACKET_EXDATA_%d", i); + env.packet_exdata_idx[i]=stellar_packet_exdata_new_index(&st, exdata_name[i], test_packet_exdata_free, &env); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr( + plug_mgr->packet_exdata_schema_array, env.packet_exdata_idx[i]); + + EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, &env); + EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]); + EXPECT_STREQ(exdata_schema->name, exdata_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->packet_exdata_schema_array), exdata_idx_len); + } + + int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env); + EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE); + + int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env); + EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + + for(int i=0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.exdata_set_on_packet_called, N_packet); + EXPECT_EQ(env.exdata_get_on_packet_called, N_packet); + + for(int i=0; i < exdata_idx_len; i++) + { + EXPECT_EQ(env.exdata_free_called[i], N_packet); + } +} + +static void test_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + EXPECT_EQ(pkt, msg); + env->msg_free_cnt+=1; + return; +} + +static void test_mq_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->msg_sub_cnt+=1; + return; +} + +static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + for(int i=0; i<topic_id_num; i++) + { + EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[i], pkt), 0); + env->msg_pub_cnt+=1; + } + return; +} + +TEST(plugin_manager, packet_plugins_mq_pub_sub) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; i<topic_id_num; i++) + { + sprintf(topic_name[i], "PACKET_TOPIC_%d", i); + env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env); + EXPECT_GE(env.packet_topic_id[i], 0); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env); + EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + +static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + int cnt=0; + int *msg; + for(int i=0; i<topic_id_num; i++) + { + for(int j=0; j < MAX_MSG_PER_DISPATCH; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg); + if(cnt < MAX_MSG_PER_DISPATCH) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(plugin_manager, packet_plugins_pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; i<topic_id_num; i++) + { + sprintf(topic_name[i], "PACKET_TOPIC_%d", i); + env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env); + EXPECT_GE(env.packet_topic_id[i], 0); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env); + EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + + /********************************************** * TEST PLUGIN MANAGER ON SESSION PLUGIN INIT * **********************************************/ @@ -185,7 +732,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { { SCOPED_TRACE("White-box test, check stellar internal schema"); topic_schema = - (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); EXPECT_EQ(topic_schema->free_cb_arg, &st); EXPECT_EQ(topic_schema->topic_id, topic_id); @@ -198,7 +745,7 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { { SCOPED_TRACE("White-box test, check stellar internal schema"); topic_schema = - (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); EXPECT_EQ(topic_schema->free_cb_arg, &st); EXPECT_EQ(topic_schema->topic_id, topic_id); @@ -210,13 +757,13 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { { SCOPED_TRACE("White-box test, check stellar internal schema"); topic_schema = - (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_session_msg_free); EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr); EXPECT_EQ(topic_schema->topic_id, topic_id); EXPECT_STREQ(topic_schema->topic_name, topic_name); - EXPECT_EQ(utarray_len(plug_mgr->session_mq_schema_array), 6); // 5 intrinsic topic + 1 created topic + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic } EXPECT_EQ(stellar_session_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id @@ -226,9 +773,9 @@ TEST(plugin_manager_init, session_mq_topic_create_and_update) { { SCOPED_TRACE("White-box test, check stellar internal schema"); - EXPECT_EQ(utarray_len(plug_mgr->session_mq_schema_array), 6);//destory won't delete the topic schema + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema } - EXPECT_EQ(plug_mgr->session_mq_topic_num, 5);//intrinsic topic number + EXPECT_EQ(plug_mgr->session_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number plugin_manager_exit(plug_mgr); } @@ -260,7 +807,7 @@ TEST(plugin_manager_init, session_mq_subscribe_overwrite) { struct stellar_mq_topic_schema *topic_schema; { SCOPED_TRACE("White-box test, check stellar internal schema"); - topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array,(unsigned int)topic_id); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id); EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); EXPECT_EQ(topic_schema->free_cb_arg, &st); EXPECT_EQ(topic_schema->topic_id, topic_id); @@ -615,8 +1162,8 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { plugin_manager_exit(plug_mgr); - EXPECT_TRUE(env.test_mq_free_called == env.test_mq_pub_called && env.test_mq_sub_called == env.N_session*env.N_per_session_pkt_cnt); - + EXPECT_EQ(env.test_mq_free_called, env.test_mq_pub_called); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt); } struct test_overlimit_session_mq_ctx diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h index 9962872..7d6ffc3 100644 --- a/test/plugin_manager/plugin_manager_gtest_mock.h +++ b/test/plugin_manager/plugin_manager_gtest_mock.h @@ -61,6 +61,10 @@ int stellar_get_current_thread_id(struct stellar *st) return 0; } +struct stellar * packet_stellar_get(struct packet *pkt) +{ + return pkt->st; +} struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess) { |
