summaryrefslogtreecommitdiff
path: root/src/plugin_manager/plugin_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
-rw-r--r--src/plugin_manager/plugin_manager.c167
1 files changed, 111 insertions, 56 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 9f9d8cf..17aedb1 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -349,15 +349,16 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
return 1; // success
}
-int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message **mq)
+static int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq[], enum session_mq_priority priority)
{
if(mq_schema_array==NULL || topic_id < 0)return -1;
unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->header.topic_id = topic_id;
+ msg->header.priority = priority;
msg->body = data;
- DL_APPEND(*mq, msg);
+ DL_APPEND(mq[priority], msg);
return 0;
}
@@ -418,17 +419,25 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
return ret;
}
-int session_mq_publish_message(struct session *sess, int topic_id, void *data)
+
+
+int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
- int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
+ int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->priority_mq, priority);
if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1;
return ret;
}
+
+int session_mq_publish_message(struct session *sess, int topic_id, void *data)
+{
+ return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL);
+}
+
static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
{
//update topic status
@@ -547,64 +556,99 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
return 0;
}
-static void plugin_manager_session_message_dispatch(struct session *sess)
+static void session_mq_dispatch_one_message(struct session *sess, struct stellar_message *mq_elt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
- if(plug_mgr_rt==NULL)return;
-
- struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
- struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- struct stellar_mq_topic_schema *topic;
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct registered_session_plugin_schema *session_plugin_schema;
struct session_plugin_ctx_runtime *plugin_ctx_rt;
- while (plug_mgr_rt->pending_mq != NULL)
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)(mq_elt->header.topic_id));
+
+ if (topic)
{
- DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
+ int cur_sub_idx = 0;
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
- (unsigned int)(mq_elt->header.topic_id));
- if (topic)
+ plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
+ if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
{
- int cur_sub_idx = 0;
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
+ plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (session_plugin_schema)
{
- plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
+ if (plugin_ctx_rt->state == INIT)
{
- plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
- session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if (session_plugin_schema)
+ if (session_plugin_schema->on_ctx_new)
{
- if (plugin_ctx_rt->state == INIT)
+ plugin_ctx_rt->plugin_ctx =
+ session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
+ if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
{
- if (session_plugin_schema->on_ctx_new)
- {
- plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
- if(plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
- {
- session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
- plugin_ctx_rt->plugin_ctx=NULL;
- }
- else
- {
- plugin_ctx_rt->state = ACTIVE;
- }
- }
+ session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ plugin_ctx_rt->plugin_ctx = NULL;
+ }
+ else
+ {
+ plugin_ctx_rt->state = ACTIVE;
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)// ctx_new maybe call detach, need check again
- {
- sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
- }
}
}
- cur_sub_idx++;
+ if (sub_elt->sess_msg_cb &&
+ bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
+ 0) // ctx_new maybe call detach, need check again
+ {
+ sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ }
}
- if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
- DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
- DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
+ cur_sub_idx++;
}
+ if (cur_sub_idx == 0)
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
+}
+
+static void plugin_manager_session_message_dispatch(struct session *sess)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
+ if(plug_mgr_rt==NULL)return;
+
+ struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
+
+ int cur_priority = SESSION_MQ_PRIORITY_HIGH;
+ while(cur_priority >= SESSION_MQ_PRIORITY_LOW)
+ {
+ if(plug_mgr_rt->priority_mq[cur_priority]==NULL)
+ {
+ cur_priority--;
+ continue;
+ }
+ DL_FOREACH_SAFE(plug_mgr_rt->priority_mq[cur_priority], mq_elt, mq_tmp)
+ {
+ session_mq_dispatch_one_message(sess, mq_elt);
+ DL_DELETE(plug_mgr_rt->priority_mq[mq_elt->header.priority], mq_elt);
+ DL_APPEND(plug_mgr_rt->dealth_letter_queue, mq_elt); // move to dlq list
+
+ cur_priority=SESSION_MQ_PRIORITY_HIGH;
+ break;
+ }
+ }
+
+#if 0
+ while (plug_mgr_rt->pending_mq != NULL)
+ {
+ DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
+ {
+ session_mq_dispatch_one_message(sess, mq_elt);
+ DL_DELETE(plug_mgr_rt->pending_mq[mq_elt->header.priority], mq_elt);
+ DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt); // move to delivered message list
+ }
+ }
+#endif
return;
}
@@ -658,8 +702,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
- rt->pending_mq = NULL;
- rt->delivered_mq = NULL;
+ memset(rt->priority_mq, 0, sizeof(rt->priority_mq));
+ rt->dealth_letter_queue = NULL;
rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1);
rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
@@ -672,15 +716,20 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
- assert(rt->pending_mq==NULL);
- if(rt->pending_mq != NULL)
+ assert(rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
+ assert(rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
+ assert(rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
+ for(int i=0; i < SESSION_MQ_PRIORITY_MAX; i++)
{
- session_mq_free(rt->sess, &rt->pending_mq, rt->plug_mgr->session_mq_schema_array);
+ if(rt->priority_mq[i] != NULL)
+ {
+ session_mq_free(rt->sess, &rt->priority_mq[i], rt->plug_mgr->session_mq_schema_array);
+ }
}
- assert(rt->delivered_mq==NULL);
- if(rt->delivered_mq != NULL)
+ assert(rt->dealth_letter_queue==NULL);
+ if(rt->dealth_letter_queue != NULL)
{
- session_mq_free(rt->sess, &rt->delivered_mq, rt->plug_mgr->session_mq_schema_array);
+ session_mq_free(rt->sess, &rt->dealth_letter_queue, rt->plug_mgr->session_mq_schema_array);
}
if(rt->session_mq_status != NULL)
{
@@ -855,8 +904,11 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
if(plug_mgr_rt==NULL)return;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt);
plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
+ session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
+ assert(plug_mgr_rt->dealth_letter_queue==NULL);
return;
}
@@ -877,8 +929,11 @@ void plugin_manager_on_session_closing(struct session *sess)
break;
}
plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
+ session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL);
+ assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL);
+ assert(plug_mgr_rt->dealth_letter_queue==NULL);
return;
}