summaryrefslogtreecommitdiff
path: root/src/plugin_manager/plugin_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
-rw-r--r--src/plugin_manager/plugin_manager.c38
1 files changed, 29 insertions, 9 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 8af47ff..3f3fead 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -429,6 +429,23 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
return ret;
}
+static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
+{
+ //update topic status
+ switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt))
+ {
+ case 1:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0);
+ break;
+ case 0:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1);
+ break;
+ default:
+ break;
+ }
+ return;
+}
+
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
if(bit_value!=0 && bit_value!=1)return -1;
@@ -451,9 +468,10 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
- bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value);
}
}
+ session_mq_update_topic_status(plug_mgr_rt, topic);
return 0;
}
return -1;
@@ -551,7 +569,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)
+ if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
@@ -573,7 +591,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)// ctx_new maybe call detach, need check again
{
sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
@@ -581,7 +599,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
cur_sub_idx++;
}
- if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0);
+ if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
@@ -596,7 +614,7 @@ int session_mq_topic_is_active(struct session *sess, int topic_id)
assert(plug_mgr_rt);
if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
- if(bitmap_get(plug_mgr_rt->session_topic_status, topic_id, 1) == 0)return 0;
+ if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
return 1;
}
@@ -642,8 +660,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
rt->sess = sess;
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
- rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1);
- rt->session_topic_status=bitmap_new(plug_mgr->session_mq_topic_num, 1, 1);
+ rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1);
+ rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
if(plug_mgr->registered_session_plugin_array)
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
@@ -869,7 +887,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
if(session_plugin_schema==NULL)return;
-
+ struct stellar_mq_topic_schema *topic=NULL;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
//FIXME: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
if(plug_mgr_rt->session_mq_status)
@@ -877,7 +895,9 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
- bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
+ session_mq_update_topic_status(plug_mgr_rt, topic);
}
}