summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-09 19:22:35 +0800
committeryangwei <[email protected]>2024-07-09 19:32:21 +0800
commitbe78e4bef7dcfe46613e0c9d2f2117b725508924 (patch)
tree66bbeb67f6d3ff2818340953305e684641494300
parentbd599decb93fa9fd2955289cd96c362b703b2ad1 (diff)
🦄 refactor(stellar message define): define header and bodyPerf-drop-inactive-tcp
-rw-r--r--src/plugin_manager/plugin_manager.c18
-rw-r--r--src/plugin_manager/plugin_manager_interna.h10
2 files changed, 15 insertions, 13 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index a102e5c..8af47ff 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -355,8 +355,8 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr
unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
- msg->topic_id = topic_id;
- msg->msg_data = data;
+ msg->header.topic_id = topic_id;
+ msg->body = data;
DL_APPEND(*mq, msg);
return 0;
}
@@ -375,10 +375,10 @@ void session_mq_free(struct session *sess, struct stellar_message **head, UT_arr
DL_FOREACH_SAFE(*head, mq_elt, tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
+ (unsigned int)(mq_elt->header.topic_id));
if (topic && topic->free_cb)
{
- topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg);
+ topic->sess_msg_free_cb(sess, mq_elt->body, topic->free_cb_arg);
}
DL_DELETE(*head, mq_elt);
FREE(mq_elt);
@@ -544,14 +544,14 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
+ (unsigned int)(mq_elt->header.topic_id));
if (topic)
{
int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
+ if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
@@ -573,15 +573,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
{
- sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}
}
cur_sub_idx++;
}
- if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->topic_id, 1, 0);
+ if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0);
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 15e4bff..50da9e8 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -50,13 +50,15 @@ struct stellar_exdata_schema
int idx;
}__attribute__((aligned(sizeof(void*))));
-
struct stellar_message
{
- int topic_id;
- void *msg_data;
+ struct
+ {
+ int topic_id;
+ } header;
+ void *body;
struct stellar_message *next, *prev;
-}__attribute__((aligned(sizeof(void*))));
+} __attribute__((aligned(sizeof(void *))));
typedef struct stellar_mq_subscriber
{