diff options
| author | yangwei <[email protected]> | 2024-05-24 04:57:48 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-26 18:39:47 +0800 |
| commit | dfa960c6ec7d99f5d833046d579599d10674bac2 (patch) | |
| tree | 4df0d1f872830b4e27eac4b1ae6437b591874f2a /src | |
| parent | 6e3992676dbecf43d6552db703995714644aba50 (diff) | |
✨ feat(packet mq): build success, to be tested
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 351 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 1 |
2 files changed, 286 insertions, 66 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 65a7200..cb57e37 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -17,27 +17,39 @@ struct per_thread_exdata_array; +struct per_thread_mq_array; struct plugin_manager_schema { struct stellar *st; UT_array *packet_exdata_schema_array; + UT_array *packet_mq_schema_array; UT_array *session_exdata_schema_array; UT_array *plugin_load_specs_array; UT_array *session_mq_schema_array; UT_array *registered_session_plugin_array; UT_array *registered_packet_plugin_array; UT_array *registered_polling_plugin_array; - int topic_num; - int subscriber_num; + int packet_mq_topic_num; + int session_mq_topic_num; + int packet_topic_subscriber_num; + int session_topic_subscriber_num; int tcp_topic_id; int udp_topic_id; int tcp_stream_topic_id; int egress_topic_id; int control_packet_topic_id; struct per_thread_exdata_array *per_thread_pkt_exdata_array; + struct per_thread_mq_array *per_thread_pkt_mq_array; }; +struct stellar_message; +struct per_thread_mq_array +{ + struct stellar_message *mq; +}; + + struct stellar_exdata { void *exdata; @@ -49,8 +61,6 @@ struct per_thread_exdata_array }; - - struct stellar_exdata_schema { char *name; @@ -76,7 +86,7 @@ struct stellar_message typedef struct stellar_mq_subscriber { int topic_subscriber_idx; - int session_plugin_id; + int plugin_id; union { on_session_msg_cb_func *sess_msg_cb; @@ -85,7 +95,6 @@ typedef struct stellar_mq_subscriber struct stellar_mq_subscriber *next, *prev; }stellar_mq_subscriber; -typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg); struct stellar_mq_topic_schema { @@ -93,7 +102,12 @@ struct stellar_mq_topic_schema void *free_cb_arg; int topic_id; int subscriber_cnt; - stellar_msg_free_cb_func *free_cb; + union + { + void *free_cb; + session_msg_free_cb_func *sess_msg_free_cb; + packet_msg_free_cb_func *pkt_msg_free_cb; + }; struct stellar_mq_subscriber *subscribers; }; @@ -119,6 +133,7 @@ struct plugin_manager_runtime struct stellar_exdata *sess_exdata_array; struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; + int enable_session_mq; }; struct registered_packet_plugin_schema @@ -126,6 +141,7 @@ struct registered_packet_plugin_schema char ip_protocol; plugin_on_packet_func *on_packet; void *plugin_env; + UT_array *registed_packet_mq_subscriber_info; }; struct registered_polling_plugin_schema @@ -228,6 +244,7 @@ PLUGIN_SPEC_LOAD_ERROR: } static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr); +static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr); struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { @@ -264,10 +281,12 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char } FREE(specs); pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm); + pm->per_thread_pkt_mq_array=per_thread_mq_arrary_new(st, pm); return pm; } static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr); +static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { @@ -289,6 +308,15 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->session_mq_schema_array); } + if(plug_mgr->packet_mq_schema_array) + { + for(unsigned int i = 0; i < utarray_len(plug_mgr->packet_mq_schema_array); i++) + { + stellar_packet_mq_destroy_topic(plug_mgr->st, i); + } + utarray_free(plug_mgr->packet_mq_schema_array); + } + if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); @@ -392,7 +420,7 @@ static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struc static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr) { - if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return; + if(st == NULL || plug_mgr == NULL || plug_mgr->per_thread_pkt_exdata_array)return; int thread_num=stellar_get_worker_thread_num(st); for (int i = 0; i < thread_num; i++) { @@ -505,35 +533,16 @@ static void stellar_mq_topic_schema_dtor(void *_elt) // FREE(elt); // free the item } -UT_icd session_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; +UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; -void session_mq_free(struct stellar_message *head, UT_array *mq_schema_array) +int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array) { - struct stellar_message *mq_elt, *tmp; - struct stellar_mq_topic_schema *topic; - DL_FOREACH_SAFE(head, mq_elt, tmp) - { - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, - (unsigned int)(mq_elt->topic_id)); - if (topic && topic->free_cb) - { - topic->free_cb(mq_elt->msg_data, topic->free_cb_arg); - } - DL_DELETE(head, mq_elt); - FREE(mq_elt); - } - FREE(head); -} - -int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);; - if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(topic_name == NULL || mq_schema_array == NULL )return -1; + unsigned int len = utarray_len(mq_schema_array); struct stellar_mq_topic_schema *t_schema; for(unsigned int i = 0; i < len; i++) { - t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i); + t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i); if(strcmp(t_schema->topic_name, topic_name) == 0) { return i; @@ -542,27 +551,25 @@ int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) return -1; } -int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg, UT_array *mq_schema_array) { - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->session_mq_schema_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(mq_schema_array); if(len < (unsigned int)topic_id)return -1; - struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); if(t_schema == NULL)return -1; t_schema->free_cb=msg_free_cb; t_schema->free_cb_arg=msg_free_arg; return 0; } -int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *msg_free_cb, void *msg_free_arg, UT_array **mq_schema_array) { - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->session_mq_schema_array == NULL) + if(*mq_schema_array == NULL) { - utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd); + utarray_new(*mq_schema_array, &stellar_mq_topic_schema_icd); } - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + unsigned int len = utarray_len(*mq_schema_array); if(stellar_session_mq_get_topic_id(st, topic_name) >= 0) { return -1; @@ -575,20 +582,18 @@ int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, t_schema.free_cb_arg=msg_free_arg; t_schema.subscribers=NULL; t_schema.subscriber_cnt=0; - utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema); - plug_mgr->topic_num+=1; + utarray_push_back(*mq_schema_array, &t_schema); return t_schema.topic_id; } -int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) +int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) { - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->session_mq_schema_array==NULL)return 0; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(mq_schema_array==NULL)return 0; + unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id) return -1; struct stellar_mq_topic_schema *topic = - (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); struct stellar_mq_subscriber *sub_elt, *sub_tmp; if (topic) @@ -602,20 +607,229 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) return 0; // success } -int session_mq_publish_message(struct session *sess, int topic_id, void *data) +int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq) { - struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); - if(plug_mgr_rt==NULL || topic_id < 0)return -1; - if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1; - unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array); + if(mq_schema_array==NULL || topic_id < 0)return -1; + unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id)return -1; struct stellar_message *msg= CALLOC(struct stellar_message,1); msg->topic_id = topic_id; msg->msg_data = data; - DL_APPEND(plug_mgr_rt->pending_mq, msg); + DL_APPEND(mq, msg); return 0; } +UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; + +/******************************* + * PACKET MQ * + *******************************/ + +static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + if(st == NULL || plug_mgr == NULL || plug_mgr->packet_mq_schema_array == NULL || plug_mgr->per_thread_pkt_mq_array)return NULL; + int thread_num=stellar_get_worker_thread_num(st); + struct per_thread_mq_array *per_thread_pkt_mq_array = CALLOC(struct per_thread_mq_array, thread_num); + return per_thread_pkt_mq_array; +} + +static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr == NULL || plug_mgr->per_thread_pkt_mq_array)return; + FREE(plug_mgr->per_thread_pkt_mq_array); + return; +} + +int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->packet_mq_schema_array); + if(topic_id>0)plug_mgr->packet_mq_topic_num+=1; + return topic_id; +} + +int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_get_topic_id(topic_name, plug_mgr->packet_mq_schema_array); +} + +int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->packet_mq_schema_array); +} + +int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array); +} + +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only +{ + if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + if(plug_mgr->packet_mq_schema_array==NULL)return -1; + unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + + struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_id); + if(packet_plugin_schema==NULL)return -1; + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL) + { + utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd); + } + + // if plugin already subscribe current topic, return 0 + struct stellar_mq_subscriber_info *p=NULL; + while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + return 0; + }; + + struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->plugin_id = plugin_id; + new_subscriber->pkt_msg_cb = plugin_on_msg_cb; + DL_APPEND(topic->subscribers, new_subscriber); + + struct stellar_mq_subscriber_info sub_info; + memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); + sub_info.topic_id=topic_id; + sub_info.subscriber_idx=topic->subscriber_cnt; + utarray_push_back(packet_plugin_schema->registed_packet_mq_subscriber_info, &sub_info); + topic->subscriber_cnt+=1; + plug_mgr->packet_topic_subscriber_num+=1; + return 0; +} + +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg) +{ + struct stellar *st = packet_stellar_get(pkt); + assert(st); + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + int tid = stellar_get_current_thread_id(st); + return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, (plug_mgr->per_thread_pkt_mq_array+tid)->mq); +} + +static void plugin_manager_packet_message_dispatch(struct packet *pkt) +{ + + struct stellar *st = packet_stellar_get(pkt); + assert(st); + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + + if(plug_mgr->packet_mq_schema_array==NULL)return; + + int tid = stellar_get_current_thread_id(st); + + struct stellar_message *mq= (plug_mgr->per_thread_pkt_mq_array+tid)->mq; + + struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct stellar_mq_topic_schema *topic; + struct registered_packet_plugin_schema *packet_plugin_schema; + while (mq != NULL) + { + DL_FOREACH_SAFE(mq, mq_elt, mq_tmp) + { + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, + (unsigned int)(mq_elt->topic_id)); + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + if (sub_elt->pkt_msg_cb) + { + packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_id); + sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + } + } + if (topic->pkt_msg_free_cb) + { + topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg); + } + } + DL_DELETE(mq, mq_elt); + FREE(mq_elt); + } + } + return; +} + +/******************************* + * SESSION MQ * + *******************************/ + +void session_mq_free(struct session *sess, struct stellar_message *head, UT_array *mq_schema_array) +{ + struct stellar_message *mq_elt, *tmp; + struct stellar_mq_topic_schema *topic; + DL_FOREACH_SAFE(head, mq_elt, tmp) + { + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, + (unsigned int)(mq_elt->topic_id)); + if (topic && topic->free_cb) + { + topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg); + } + DL_DELETE(head, mq_elt); + FREE(mq_elt); + } + FREE(head); +} + +inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_get_topic_id(topic_name, plug_mgr->session_mq_schema_array); +} + +int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->session_mq_schema_array); +} + +int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->session_mq_schema_array); + if(topic_id>0)plug_mgr->session_mq_topic_num+=1; + return topic_id; +} + +int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) +{ + struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); + assert(plug_mgr); + return stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array); +} + +int session_mq_publish_message(struct session *sess, int topic_id, void *data) +{ + struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); + assert(plug_mgr_rt); + if(plug_mgr_rt->enable_session_mq==0)return -1; + return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->pending_mq); +} + static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) { if(bit_value!=0 && bit_value!=1)return -1; @@ -623,7 +837,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int if(topic_id < 0 || plugin_id < 0)return -1; struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return -1; - if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range + if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id); if(topic==NULL)return -1; @@ -658,7 +872,6 @@ int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_i return session_mq_set_message_status(sess, topic_id, plugin_id, 1); } -UT_icd session_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id) { @@ -676,7 +889,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms if(session_plugin_schema->registed_session_mq_subscriber_info==NULL) { - utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd); + utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd); } // if plugin already subscribe current topic, return 0 @@ -689,7 +902,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->session_plugin_id = plugin_id; + new_subscriber->plugin_id = plugin_id; new_subscriber->sess_msg_cb = plugin_on_msg_cb; DL_APPEND(topic->subscribers, new_subscriber); @@ -699,7 +912,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms sub_info.subscriber_idx=topic->subscriber_cnt; utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info); topic->subscriber_cnt+=1; - plug_mgr->subscriber_num+=1; + plug_mgr->session_topic_subscriber_num+=1; return 0; } @@ -726,8 +939,8 @@ static void plugin_manager_session_message_dispatch(struct session *sess) { if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0) { - plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id); - session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id); + plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_id); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_id); if(plugin_ctx_rt->state==INIT) { if(session_plugin_schema->on_ctx_new) @@ -791,7 +1004,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ rt->sess = sess; rt->pending_mq = NULL; rt->delivered_mq = NULL; - rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1); + rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); return rt; @@ -803,12 +1016,12 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) if(rt==NULL)return; if(rt->pending_mq != NULL) { - session_mq_free(rt->pending_mq, rt->plug_mgr->session_mq_schema_array); + session_mq_free(rt->sess, rt->pending_mq, rt->plug_mgr->session_mq_schema_array); rt->pending_mq=NULL; } if(rt->delivered_mq != NULL) { - session_mq_free(rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); + session_mq_free(rt->sess, rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); rt->delivered_mq=NULL; } if(rt->session_mq_status != NULL) @@ -868,6 +1081,7 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac p->on_packet(pkt, ip_proto, p->plugin_env); } } + plugin_manager_packet_message_dispatch(pkt); return; } @@ -961,9 +1175,11 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) default: break; } + plug_mgr_rt->enable_session_mq=1; //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); + plug_mgr_rt->enable_session_mq=0; return; } @@ -971,10 +1187,13 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; + plug_mgr_rt->enable_session_mq=1; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); + session_mq_free(plug_mgr_rt->sess,plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); plug_mgr_rt->delivered_mq=NULL; + plug_mgr_rt->enable_session_mq=0; + plugin_manager_packet_message_dispatch(pkt); per_thread_packet_exdata_arrary_clean(plug_mgr_rt->plug_mgr, pkt); return; } diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index b3eed53..68bb2a7 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -111,6 +111,7 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea sess->pstream=stream; sess->session_direction=-1; memset(&sess->cur_pkt, 0, sizeof(struct packet)); + sess->cur_pkt.st=st; sess->plug_mgr_rt=plugin_manager_session_runtime_new(st->plug_mgr, sess); return sess; } |
