diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 60 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 10 |
2 files changed, 53 insertions, 17 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 2f20d51..1480c25 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -89,7 +89,7 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_ { p_data=per_thread_data+i; if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array); - if(p_data->per_thread_pkt_mq_array.mq)FREE(p_data->per_thread_pkt_mq_array.mq); + if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq); } FREE(per_thread_data); return; @@ -105,6 +105,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char return NULL; } struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); + plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH; if(spec_num > 0) { utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); @@ -465,6 +466,27 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf * PACKET MQ * *******************************/ +static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return -1; + int tid = stellar_get_current_thread_id(plug_mgr->st); + plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1; + return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; +} + +static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return; + int tid = stellar_get_current_thread_id(plug_mgr->st); + plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0; +} + +static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return 0; + int tid = stellar_get_current_thread_id(plug_mgr->st); + return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; +} int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { @@ -563,9 +585,13 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg) struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); int tid = stellar_get_current_thread_id(st); - return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq)); + if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg + int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq)); + if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr); + return ret; } +// TODO: limit maximum pub message number in one loop static void plugin_manager_packet_message_dispatch(struct packet *pkt) { @@ -578,7 +604,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) int tid = stellar_get_current_thread_id(st); - struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq); + struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq); struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; struct stellar_mq_subscriber *sub_elt, *sub_tmp; @@ -597,7 +623,10 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) if (sub_elt->pkt_msg_cb) { packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); - if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + if(packet_plugin_schema) + { + sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + } } } if (topic->pkt_msg_free_cb) @@ -670,8 +699,11 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); - if(plug_mgr_rt->enable_session_mq==0)return -1; - return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); + if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; + int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); + if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1; + return ret; } static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) @@ -814,8 +846,10 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } } } - if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again - sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again + { + sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + } } } cur_sub_idx++; @@ -894,6 +928,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) if(rt->session_mq_status != NULL) { bitmap_free(rt->session_mq_status); + rt->session_mq_status=NULL; } if (rt->plug_mgr->registered_session_plugin_array) { @@ -922,6 +957,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) *********************************************/ + UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env) @@ -945,6 +981,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; unsigned char ip_proto=packet_get_ip_protocol(pkt); + stellar_current_thread_packet_mq_counter_reset(plug_mgr); while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet) @@ -1053,11 +1090,10 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) default: break; } - plug_mgr_rt->enable_session_mq=1; + plug_mgr_rt->pub_session_msg_cnt=0; //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - //plug_mgr_rt->enable_session_mq=0; return; } @@ -1065,10 +1101,8 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - //plug_mgr_rt->enable_session_mq=1; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt); plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); return; @@ -1078,7 +1112,6 @@ void plugin_manager_on_session_closing(struct session *sess) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - plug_mgr_rt->enable_session_mq=1; switch (session_get_type(sess)) { case SESSION_TYPE_TCP: @@ -1092,7 +1125,6 @@ void plugin_manager_on_session_closing(struct session *sess) break; } plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); return; diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 24b8d42..0cf55d6 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -16,15 +16,16 @@ struct per_thread_exdata_array }; struct stellar_message; -struct per_thread_mq_array +struct per_thread_mq { struct stellar_message *mq; + int pub_msg_cnt; }; struct plugin_manger_per_thread_data { struct per_thread_exdata_array per_thread_pkt_exdata_array; - struct per_thread_mq_array per_thread_pkt_mq_array; + struct per_thread_mq per_thread_pkt_mq; }; struct plugin_manager_schema @@ -47,6 +48,7 @@ struct plugin_manager_schema int udp_topic_id; int egress_topic_id; int control_packet_topic_id; + int max_message_dispatch; struct plugin_manger_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); @@ -133,7 +135,7 @@ struct plugin_manager_runtime struct stellar_exdata *sess_exdata_array; struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; - int enable_session_mq; + int pub_session_msg_cnt; }__attribute__((aligned(sizeof(void*)))); struct registered_packet_plugin_schema @@ -172,6 +174,8 @@ struct registered_session_plugin_schema * PLUGIN MANAGER INIT & EXIT * *******************************/ +#define MAX_MSG_PER_DISPATCH 128 + #include <dlfcn.h> struct plugin_specific |
