diff options
| author | yangwei <[email protected]> | 2024-10-18 14:04:19 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-10-18 14:04:19 +0800 |
| commit | 722ae7483b516692141f44c8ce1ce4d3a172b056 (patch) | |
| tree | 2adb077ddb827c1422c491c7bacef8a1204d9ef7 /infra | |
| parent | 65ae531ee34e89b6c763d64be01a16474054a3d2 (diff) | |
✨ feat(mq): add mq_runtime_defer, default disable
Diffstat (limited to 'infra')
| -rw-r--r-- | infra/module_manager/module_manager.c | 1 | ||||
| -rw-r--r-- | infra/mq/mq.c | 40 | ||||
| -rw-r--r-- | infra/mq/mq_internal.h | 11 | ||||
| -rw-r--r-- | infra/mq/test/gtest_mq_main.cpp | 5 |
4 files changed, 40 insertions, 17 deletions
diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c index f05256e..e5ac28c 100644 --- a/infra/module_manager/module_manager.c +++ b/infra/module_manager/module_manager.c @@ -270,6 +270,5 @@ void stellar_polling_dispatch(struct stellar_module_manager *mod_mgr) if(mod_mgr==NULL)return; stellar_module_manager_polling_active(mod_mgr); mq_runtime_dispatch(local_mq_rt); - mq_runtime_clean(local_mq_rt); return; }
\ No newline at end of file diff --git a/infra/mq/mq.c b/infra/mq/mq.c index e17e996..33d64df 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -149,7 +149,7 @@ void mq_runtime_clean(struct mq_runtime *rt) struct mq_topic *topic; rt->is_cleaning=true; - for (int i = 0; i < MQ_TYPE_MAX; i++) + for (int i = 0; i < MQ_MAX; i++) { DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp) { @@ -170,19 +170,27 @@ void mq_runtime_dispatch(struct mq_runtime *rt) { struct mq_topic *topic=NULL; struct mq_message *mq_elt=NULL, *mq_tmp=NULL; - while (rt->mq_len[MQ_TYPE_MAILBOX]) + while (rt->mq_len[MQ_MAILBOX]) { - DL_FOREACH_SAFE(rt->mq[MQ_TYPE_MAILBOX], mq_elt, mq_tmp) + DL_FOREACH_SAFE(rt->mq[MQ_MAILBOX], mq_elt, mq_tmp) { - DL_DELETE(rt->mq[MQ_TYPE_MAILBOX], mq_elt); - rt->mq_len[MQ_TYPE_MAILBOX] -= 1; + DL_DELETE(rt->mq[MQ_MAILBOX], mq_elt); + rt->mq_len[MQ_MAILBOX] -= 1; topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id)); mq_dispatch_one_message(topic, mq_elt); - DL_APPEND(rt->mq[MQ_TYPE_DEATH_LETTER], mq_elt); // move to dlq list - rt->mq_len[MQ_TYPE_DEATH_LETTER] += 1; + if (rt->defer_enabled==true) + { + DL_APPEND(rt->mq[MQ_DEATH_LETTER], mq_elt); // move to dlq list + rt->mq_len[MQ_DEATH_LETTER] += 1; + } + else + { + if(topic->free_cb)topic->free_cb(mq_elt->body, topic->free_cb_arg); + FREE(mq_elt); + } } } - //mq_runtime_clean(rt); + mq_runtime_clean(rt); return; } @@ -215,8 +223,14 @@ int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data) msg->rt=rt; msg->header.topic_id = topic_id; msg->body = data; - DL_APPEND(rt->mq[MQ_TYPE_MAILBOX], msg); - rt->mq_len[MQ_TYPE_MAILBOX]+=1; + DL_APPEND(rt->mq[MQ_MAILBOX], msg); + rt->mq_len[MQ_MAILBOX]+=1; + + if(rt->defer_enabled==false) + { + mq_runtime_dispatch(rt); + } + return 0; } @@ -250,6 +264,12 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s) return rt; } +void mq_runtime_defer(struct mq_runtime *rt) +{ + if(rt==NULL)return; + rt->defer_enabled=true; +} + void mq_runtime_free(struct mq_runtime *rt) { if(rt==NULL)return; diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index 342c79b..d0cb695 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -54,17 +54,18 @@ struct mq_schema enum mq_property { - MQ_TYPE_MAILBOX = 0, - MQ_TYPE_DEATH_LETTER = 1, - MQ_TYPE_MAX, + MQ_MAILBOX = 0, + MQ_DEATH_LETTER = 1, + MQ_MAX, }; struct mq_runtime { struct mq_schema *schema; - struct mq_message *mq[MQ_TYPE_MAX];// message queue - size_t mq_len[MQ_TYPE_MAX]; + struct mq_message *mq[MQ_MAX];// message queue + size_t mq_len[MQ_MAX]; bool is_cleaning; + bool defer_enabled; }; int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg); diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp index 06050b2..6c8d5c8 100644 --- a/infra/mq/test/gtest_mq_main.cpp +++ b/infra/mq/test/gtest_mq_main.cpp @@ -161,7 +161,7 @@ void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg) return; } -TEST(mq_runtime, pub_then_clean) { +TEST(mq_runtime, defer_pub_then_clean) { struct test_pub_and_clean_env env={}; env.s = mq_schema_new(); @@ -174,6 +174,8 @@ TEST(mq_runtime, pub_then_clean) { env.rt=mq_runtime_new(env.s); EXPECT_TRUE(env.rt!=NULL); + mq_runtime_defer(env.rt); + for(int i=0; i<env.N_round;i++) { env.current_round=i; @@ -370,6 +372,7 @@ TEST(mq_runtime, pub_on_msg_free) env.N_round=10; env.rt=mq_runtime_new(env.s); EXPECT_TRUE(env.rt!=NULL); + mq_runtime_defer(env.rt); for(int i=0; i<env.N_round;i++) { env.current_round=i; |
