diff options
| author | yangwei <[email protected]> | 2024-07-12 19:47:18 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-12 20:22:12 +0800 |
| commit | 271e569dbf8a58e36a357226b585a15515655b15 (patch) | |
| tree | 19c0c6ee13c69b5281a7d8ddfcca9c17617c1b9f /src | |
| parent | 0913c1cd4bcc8e9f682902c9babe5a66f672121e (diff) | |
✨ feat(session mq with priority): support 3 priority mq, default to normalFeature-session-mq-priority
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 167 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 5 |
2 files changed, 114 insertions, 58 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 9f9d8cf..17aedb1 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -349,15 +349,16 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) return 1; // success } -int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message **mq) +static int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq[], enum session_mq_priority priority) { if(mq_schema_array==NULL || topic_id < 0)return -1; unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id)return -1; struct stellar_message *msg= CALLOC(struct stellar_message,1); msg->header.topic_id = topic_id; + msg->header.priority = priority; msg->body = data; - DL_APPEND(*mq, msg); + DL_APPEND(mq[priority], msg); return 0; } @@ -418,17 +419,25 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) return ret; } -int session_mq_publish_message(struct session *sess, int topic_id, void *data) + + +int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum session_mq_priority priority) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; - int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); + int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->priority_mq, priority); if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1; return ret; } + +int session_mq_publish_message(struct session *sess, int topic_id, void *data) +{ + return session_mq_publish_message_with_priority(sess, topic_id, data, SESSION_MQ_PRIORITY_NORMAL); +} + static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic) { //update topic status @@ -547,64 +556,99 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms return 0; } -static void plugin_manager_session_message_dispatch(struct session *sess) +static void session_mq_dispatch_one_message(struct session *sess, struct stellar_message *mq_elt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); - if(plug_mgr_rt==NULL)return; - - struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; - struct stellar_mq_subscriber *sub_elt, *sub_tmp; - struct stellar_mq_topic_schema *topic; + struct stellar_mq_subscriber *sub_elt, *sub_tmp; struct registered_session_plugin_schema *session_plugin_schema; struct session_plugin_ctx_runtime *plugin_ctx_rt; - while (plug_mgr_rt->pending_mq != NULL) + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)(mq_elt->header.topic_id)); + + if (topic) { - DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) + int cur_sub_idx = 0; + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, - (unsigned int)(mq_elt->header.topic_id)); - if (topic) + plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx; + if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) { - int cur_sub_idx = 0; - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr( + plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (session_plugin_schema) { - plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx; - if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) + if (plugin_ctx_rt->state == INIT) { - plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx); - session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); - if (session_plugin_schema) + if (session_plugin_schema->on_ctx_new) { - if (plugin_ctx_rt->state == INIT) + plugin_ctx_rt->plugin_ctx = + session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); + if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) { - if (session_plugin_schema->on_ctx_new) - { - plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - if(plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) - { - session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); - plugin_ctx_rt->plugin_ctx=NULL; - } - else - { - plugin_ctx_rt->state = ACTIVE; - } - } + session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + plugin_ctx_rt->plugin_ctx = NULL; + } + else + { + plugin_ctx_rt->state = ACTIVE; } - if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)// ctx_new maybe call detach, need check again - { - sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); - } } } - cur_sub_idx++; + if (sub_elt->sess_msg_cb && + bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != + 0) // ctx_new maybe call detach, need check again + { + sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + } } - if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); } - DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); - DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list + cur_sub_idx++; } + if (cur_sub_idx == 0) + bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); } +} + +static void plugin_manager_session_message_dispatch(struct session *sess) +{ + struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); + if(plug_mgr_rt==NULL)return; + + struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; + + int cur_priority = SESSION_MQ_PRIORITY_HIGH; + while(cur_priority >= SESSION_MQ_PRIORITY_LOW) + { + if(plug_mgr_rt->priority_mq[cur_priority]==NULL) + { + cur_priority--; + continue; + } + DL_FOREACH_SAFE(plug_mgr_rt->priority_mq[cur_priority], mq_elt, mq_tmp) + { + session_mq_dispatch_one_message(sess, mq_elt); + DL_DELETE(plug_mgr_rt->priority_mq[mq_elt->header.priority], mq_elt); + DL_APPEND(plug_mgr_rt->dealth_letter_queue, mq_elt); // move to dlq list + + cur_priority=SESSION_MQ_PRIORITY_HIGH; + break; + } + } + +#if 0 + while (plug_mgr_rt->pending_mq != NULL) + { + DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) + { + session_mq_dispatch_one_message(sess, mq_elt); + DL_DELETE(plug_mgr_rt->pending_mq[mq_elt->header.priority], mq_elt); + DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt); // move to delivered message list + } + } +#endif return; } @@ -658,8 +702,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1); rt->plug_mgr = plug_mgr; rt->sess = sess; - rt->pending_mq = NULL; - rt->delivered_mq = NULL; + memset(rt->priority_mq, 0, sizeof(rt->priority_mq)); + rt->dealth_letter_queue = NULL; rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1); rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); @@ -672,15 +716,20 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; - assert(rt->pending_mq==NULL); - if(rt->pending_mq != NULL) + assert(rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); + assert(rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); + assert(rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); + for(int i=0; i < SESSION_MQ_PRIORITY_MAX; i++) { - session_mq_free(rt->sess, &rt->pending_mq, rt->plug_mgr->session_mq_schema_array); + if(rt->priority_mq[i] != NULL) + { + session_mq_free(rt->sess, &rt->priority_mq[i], rt->plug_mgr->session_mq_schema_array); + } } - assert(rt->delivered_mq==NULL); - if(rt->delivered_mq != NULL) + assert(rt->dealth_letter_queue==NULL); + if(rt->dealth_letter_queue != NULL) { - session_mq_free(rt->sess, &rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); + session_mq_free(rt->sess, &rt->dealth_letter_queue, rt->plug_mgr->session_mq_schema_array); } if(rt->session_mq_status != NULL) { @@ -855,8 +904,11 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) if(plug_mgr_rt==NULL)return; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt); plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); - assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); + session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); + assert(plug_mgr_rt->dealth_letter_queue==NULL); return; } @@ -877,8 +929,11 @@ void plugin_manager_on_session_closing(struct session *sess) break; } plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); - assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); + session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->dealth_letter_queue, plug_mgr_rt->plug_mgr->session_mq_schema_array); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_HIGH]==NULL); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_NORMAL]==NULL); + assert(plug_mgr_rt->priority_mq[SESSION_MQ_PRIORITY_LOW]==NULL); + assert(plug_mgr_rt->dealth_letter_queue==NULL); return; } diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 50da9e8..82b2e7c 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -55,6 +55,7 @@ struct stellar_message struct { int topic_id; + enum session_mq_priority priority; } header; void *body; struct stellar_message *next, *prev; @@ -103,8 +104,8 @@ struct plugin_manager_runtime { struct plugin_manager_schema *plug_mgr; struct session *sess; - struct stellar_message *pending_mq;// message list - struct stellar_message *delivered_mq;// message list + struct stellar_message *priority_mq[SESSION_MQ_PRIORITY_MAX];// message list + struct stellar_message *dealth_letter_queue;// dlq list struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber struct bitmap *session_topic_status; //N bits, N topic struct stellar_exdata *sess_exdata_array; |
