From 656dcf70d214513636df15191de903c7eb0abb91 Mon Sep 17 00:00:00 2001 From: yangwei Date: Thu, 18 Jan 2024 17:46:41 +0800 Subject: ✨ feat(session_mq_ignore_message): code implementation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- examples/sapp_plugin/publisher_loader.c | 9 ++++--- include/stellar/stellar.h | 4 +++ src/adapter/session_manager.c | 48 ++++++++++++++++++++++++++++++--- src/adapter/session_manager.h | 1 + src/adapter/stellar_internal.h | 2 ++ 5 files changed, 58 insertions(+), 6 deletions(-) diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c index 0003680..4334f9a 100644 --- a/examples/sapp_plugin/publisher_loader.c +++ b/examples/sapp_plugin/publisher_loader.c @@ -24,6 +24,10 @@ static void session_mq_topic_free(void *data, void *cb_arg) return; } +struct stellar *g_stellar=NULL; +int g_topic_id=-1; +int g_sub_id=-1; + static int session_mq_loader_read(struct session *sess, int topic_id, const void *data, void *cb_arg) { struct simple_stream_ctx *ctx =(struct simple_stream_ctx *)data; @@ -33,11 +37,10 @@ static int session_mq_loader_read(struct session *sess, int topic_id, const void ctx->s2c_pkts, ctx->s2c_bytes); printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); + session_mq_ignore_message(sess, topic_id, g_sub_id); return 0; } -struct stellar *g_stellar=NULL; -int g_topic_id=-1; int PUBLISHER_LOADER_INIT() { g_stellar = stellar_init("./stellar_plugin/simple_plugin.toml"); @@ -52,7 +55,7 @@ int PUBLISHER_LOADER_INIT() { g_topic_id=session_mq_create_topic(g_stellar, "SIMPLE_MQ_TOPIC", session_mq_topic_free, NULL); } - session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL); + g_sub_id=session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL); return 0; } diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 9751d93..c10c8ec 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -7,6 +7,10 @@ struct stellar; int stellar_get_worker_thread_num(struct stellar *st); int stellar_get_current_thread_id(struct stellar *st); +int stellar_get_session_mq_topic_cnt(struct stellar *st); +int stellar_get_session_mq_subscriber_cnt(struct stellar *st, int topic_id); + + typedef void *plugin_init_callback(struct stellar *st); typedef void plugin_exit_callback(void *plugin_ctx); diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c index 1f4bdfa..368feff 100644 --- a/src/adapter/session_manager.c +++ b/src/adapter/session_manager.c @@ -196,6 +196,7 @@ int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free t_schema.subscribers=NULL; t_schema.sub_cnt=0; utarray_push_back(st->session_mq_schema_array, &t_schema); + st->topic_num+=1; return t_schema.topic_id; } @@ -236,8 +237,17 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id) { - // TODO: implement - return -1; + if(topic_id < 0 || sub_id < 0)return -1; + if(topic_id >= sess->st->topic_num)return -1; + struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array, (unsigned int)topic_id); + if(sub_id > topic->sub_cnt)return -1; + if(sess->mq_sub_status) + { + sess->mq_sub_status[topic_id+(sub_id-1)]=0; + return 0; + } + else + return -1; } int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg) @@ -251,6 +261,27 @@ int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func new_subscriber->cb_arg = cb_arg; DL_APPEND(topic->subscribers, new_subscriber); topic->sub_cnt+=1; + st->sub_num+=1; + return topic->sub_cnt; +} + + +int stellar_get_session_mq_topic_cnt(struct stellar *st) +{ + if(st->session_exdata_schema_array) + { + return utarray_len(st->session_mq_schema_array); + } + else + return 0; +} + +int stellar_get_session_mq_subscriber_cnt(struct stellar *st, int topic_id) +{ + if(st->session_exdata_schema_array==NULL)return -1; + unsigned int len = utarray_len(st->session_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id); return topic->sub_cnt; } @@ -359,6 +390,8 @@ struct session *session_new(struct stellar *st, enum session_type type, int thre sess->type = type; sess->state = SESSION_STATE_OPENING; sess->mq = NULL; + sess->mq_sub_status=CALLOC(char, st->topic_num*st->sub_num); + memset(sess->mq_sub_status, 1, st->topic_num*st->sub_num); sess->ex_data_rt = session_new_exdata_rt(st); sess->ev_list=NULL; sess->intrinsic_events=session_intrinsic_events_init(st, sess); @@ -385,6 +418,10 @@ void session_free(struct session *sess) { session_mq_free(sess->mq); } + if(sess->mq_sub_status != NULL) + { + FREE(sess->mq_sub_status); + } session_free_exdata_runtime(sess, sess->ex_data_rt); if(sess->cur_pkt) { @@ -508,9 +545,14 @@ void session_defer_loop(struct session *sess) (unsigned int)(mq_elt->topic_id)); if (topic) { + int cur_sub_idx = 0; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { - sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg); + if(sess->mq_sub_status[(mq_elt->topic_id)+cur_sub_idx] == 1) + { + sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg); + } + cur_sub_idx++; } if (topic->free_cb) { diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h index ccdee34..cf8f140 100644 --- a/src/adapter/session_manager.h +++ b/src/adapter/session_manager.h @@ -22,6 +22,7 @@ struct session struct session_event *intrinsic_events; struct session_exdata_runtime *ex_data_rt; struct session_mq *mq; + char *mq_sub_status; struct session_event *ev_list; uint64_t set_detach_others; }; diff --git a/src/adapter/stellar_internal.h b/src/adapter/stellar_internal.h index fe843bc..02d1eb8 100644 --- a/src/adapter/stellar_internal.h +++ b/src/adapter/stellar_internal.h @@ -9,4 +9,6 @@ struct stellar UT_array *plugin_specs_array; UT_array *session_mq_schema_array; UT_array *session_exdata_schema_array; + int topic_num; + int sub_num; }; \ No newline at end of file -- cgit v1.2.3