diff options
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index a102e5c..8af47ff 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -355,8 +355,8 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id)return -1; struct stellar_message *msg= CALLOC(struct stellar_message,1); - msg->topic_id = topic_id; - msg->msg_data = data; + msg->header.topic_id = topic_id; + msg->body = data; DL_APPEND(*mq, msg); return 0; } @@ -375,10 +375,10 @@ void session_mq_free(struct session *sess, struct stellar_message **head, UT_arr DL_FOREACH_SAFE(*head, mq_elt, tmp) { topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, - (unsigned int)(mq_elt->topic_id)); + (unsigned int)(mq_elt->header.topic_id)); if (topic && topic->free_cb) { - topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg); + topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg); } DL_DELETE(*head, mq_elt); FREE(mq_elt); @@ -544,14 +544,14 @@ static void plugin_manager_session_message_dispatch(struct session *sess) DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) { topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, - (unsigned int)(mq_elt->topic_id)); + (unsigned int)(mq_elt->header.topic_id)); if (topic) { int cur_sub_idx = 0; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx; - if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0) + if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0) { plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx); session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); @@ -573,15 +573,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } } } - if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again + if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again { - sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); } } } cur_sub_idx++; } - if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->topic_id, 1, 0); + if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0); } DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list |
