diff options
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 38 |
1 files changed, 29 insertions, 9 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 8af47ff..3f3fead 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -429,6 +429,23 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) return ret; } +static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic) +{ + //update topic status + switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt)) + { + case 1: + bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0); + break; + case 0: + bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1); + break; + default: + break; + } + return; +} + static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) { if(bit_value!=0 && bit_value!=1)return -1; @@ -451,9 +468,10 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); if(topic_id==session_plugin_sub_info->topic_id) { - bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value); + bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value); } } + session_mq_update_topic_status(plug_mgr_rt, topic); return 0; } return -1; @@ -551,7 +569,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess) DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx; - if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0) + if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) { plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx); session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); @@ -573,7 +591,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } } } - if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again + if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)// ctx_new maybe call detach, need check again { sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); } @@ -581,7 +599,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } cur_sub_idx++; } - if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0); + if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); } DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list @@ -596,7 +614,7 @@ int session_mq_topic_is_active(struct session *sess, int topic_id) assert(plug_mgr_rt); if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range - if(bitmap_get(plug_mgr_rt->session_topic_status, topic_id, 1) == 0)return 0; + if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0; return 1; } @@ -642,8 +660,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ rt->sess = sess; rt->pending_mq = NULL; rt->delivered_mq = NULL; - rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1); - rt->session_topic_status=bitmap_new(plug_mgr->session_mq_topic_num, 1, 1); + rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1); + rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); if(plug_mgr->registered_session_plugin_array) rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); @@ -869,7 +887,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id); if(session_plugin_schema==NULL)return; - + struct stellar_mq_topic_schema *topic=NULL; unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info); //FIXME: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch if(plug_mgr_rt->session_mq_status) @@ -877,7 +895,9 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) for(unsigned int i=0; i < plugin_subscriber_num; i++) { struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); - bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0); + bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id); + session_mq_update_topic_status(plug_mgr_rt, topic); } } |
