summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/plugin_manager/plugin_manager.c60
-rw-r--r--src/plugin_manager/plugin_manager_interna.h10
2 files changed, 53 insertions, 17 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 2f20d51..1480c25 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -89,7 +89,7 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_
{
p_data=per_thread_data+i;
if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
- if(p_data->per_thread_pkt_mq_array.mq)FREE(p_data->per_thread_pkt_mq_array.mq);
+ if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq);
}
FREE(per_thread_data);
return;
@@ -105,6 +105,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return NULL;
}
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
+ plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH;
if(spec_num > 0)
{
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
@@ -465,6 +466,27 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf
* PACKET MQ *
*******************************/
+static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return -1;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1;
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
+}
+
+static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0;
+}
+
+static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return 0;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
+}
int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
@@ -563,9 +585,13 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg)
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
int tid = stellar_get_current_thread_id(st);
- return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq));
+ if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg
+ int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq));
+ if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr);
+ return ret;
}
+// TODO: limit maximum pub message number in one loop
static void plugin_manager_packet_message_dispatch(struct packet *pkt)
{
@@ -578,7 +604,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
int tid = stellar_get_current_thread_id(st);
- struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq);
+ struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq);
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
@@ -597,7 +623,10 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
if (sub_elt->pkt_msg_cb)
{
packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ if(packet_plugin_schema)
+ {
+ sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ }
}
}
if (topic->pkt_msg_free_cb)
@@ -670,8 +699,11 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
- if(plug_mgr_rt->enable_session_mq==0)return -1;
- return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
+ if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
+ int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
+ if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1;
+ return ret;
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
@@ -814,8 +846,10 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again
- sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ {
+ sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ }
}
}
cur_sub_idx++;
@@ -894,6 +928,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
+ rt->session_mq_status=NULL;
}
if (rt->plug_mgr->registered_session_plugin_array)
{
@@ -922,6 +957,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
*********************************************/
+
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
@@ -945,6 +981,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
unsigned char ip_proto=packet_get_ip_protocol(pkt);
+ stellar_current_thread_packet_mq_counter_reset(plug_mgr);
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
@@ -1053,11 +1090,10 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
default:
break;
}
- plug_mgr_rt->enable_session_mq=1;
+ plug_mgr_rt->pub_session_msg_cnt=0;
//TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
- //plug_mgr_rt->enable_session_mq=0;
return;
}
@@ -1065,10 +1101,8 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- //plug_mgr_rt->enable_session_mq=1;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt);
plugin_manager_session_message_dispatch(sess);
- plug_mgr_rt->enable_session_mq=0;
session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
return;
@@ -1078,7 +1112,6 @@ void plugin_manager_on_session_closing(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- plug_mgr_rt->enable_session_mq=1;
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
@@ -1092,7 +1125,6 @@ void plugin_manager_on_session_closing(struct session *sess)
break;
}
plugin_manager_session_message_dispatch(sess);
- plug_mgr_rt->enable_session_mq=0;
session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
return;
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 24b8d42..0cf55d6 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -16,15 +16,16 @@ struct per_thread_exdata_array
};
struct stellar_message;
-struct per_thread_mq_array
+struct per_thread_mq
{
struct stellar_message *mq;
+ int pub_msg_cnt;
};
struct plugin_manger_per_thread_data
{
struct per_thread_exdata_array per_thread_pkt_exdata_array;
- struct per_thread_mq_array per_thread_pkt_mq_array;
+ struct per_thread_mq per_thread_pkt_mq;
};
struct plugin_manager_schema
@@ -47,6 +48,7 @@ struct plugin_manager_schema
int udp_topic_id;
int egress_topic_id;
int control_packet_topic_id;
+ int max_message_dispatch;
struct plugin_manger_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
@@ -133,7 +135,7 @@ struct plugin_manager_runtime
struct stellar_exdata *sess_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
- int enable_session_mq;
+ int pub_session_msg_cnt;
}__attribute__((aligned(sizeof(void*))));
struct registered_packet_plugin_schema
@@ -172,6 +174,8 @@ struct registered_session_plugin_schema
* PLUGIN MANAGER INIT & EXIT *
*******************************/
+#define MAX_MSG_PER_DISPATCH 128
+
#include <dlfcn.h>
struct plugin_specific