summaryrefslogtreecommitdiff
path: root/src/plugin_manager/plugin_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
-rw-r--r--src/plugin_manager/plugin_manager.c18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index a102e5c..8af47ff 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -355,8 +355,8 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr
unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
- msg->topic_id = topic_id;
- msg->msg_data = data;
+ msg->header.topic_id = topic_id;
+ msg->body = data;
DL_APPEND(*mq, msg);
return 0;
}
@@ -375,10 +375,10 @@ void session_mq_free(struct session *sess, struct stellar_message **head, UT_arr
DL_FOREACH_SAFE(*head, mq_elt, tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
+ (unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
- topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg);
+ topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(*head, mq_elt);
FREE(mq_elt);
@@ -544,14 +544,14 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
+ (unsigned int)(mq_elt->header.topic_id));
if (topic)
{
int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
+ if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
@@ -573,15 +573,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
{
- sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}
}
cur_sub_idx++;
}
- if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->topic_id, 1, 0);
+ if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0);
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list