summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-10-18 14:04:19 +0800
committeryangwei <[email protected]>2024-10-18 14:04:19 +0800
commit722ae7483b516692141f44c8ce1ce4d3a172b056 (patch)
tree2adb077ddb827c1422c491c7bacef8a1204d9ef7
parent65ae531ee34e89b6c763d64be01a16474054a3d2 (diff)
✨ feat(mq): add mq_runtime_defer, default disable
-rw-r--r--include/stellar/mq.h1
-rw-r--r--infra/module_manager/module_manager.c1
-rw-r--r--infra/mq/mq.c40
-rw-r--r--infra/mq/mq_internal.h11
-rw-r--r--infra/mq/test/gtest_mq_main.cpp5
5 files changed, 41 insertions, 17 deletions
diff --git a/include/stellar/mq.h b/include/stellar/mq.h
index aa7b416..0a0bb9e 100644
--- a/include/stellar/mq.h
+++ b/include/stellar/mq.h
@@ -42,6 +42,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms
struct mq_runtime;
struct mq_runtime *mq_runtime_new(struct mq_schema *s);
+void mq_runtime_defer(struct mq_runtime *rt);
void mq_runtime_free(struct mq_runtime *s);
// return 0 if success, otherwise return -1
diff --git a/infra/module_manager/module_manager.c b/infra/module_manager/module_manager.c
index f05256e..e5ac28c 100644
--- a/infra/module_manager/module_manager.c
+++ b/infra/module_manager/module_manager.c
@@ -270,6 +270,5 @@ void stellar_polling_dispatch(struct stellar_module_manager *mod_mgr)
if(mod_mgr==NULL)return;
stellar_module_manager_polling_active(mod_mgr);
mq_runtime_dispatch(local_mq_rt);
- mq_runtime_clean(local_mq_rt);
return;
} \ No newline at end of file
diff --git a/infra/mq/mq.c b/infra/mq/mq.c
index e17e996..33d64df 100644
--- a/infra/mq/mq.c
+++ b/infra/mq/mq.c
@@ -149,7 +149,7 @@ void mq_runtime_clean(struct mq_runtime *rt)
struct mq_topic *topic;
rt->is_cleaning=true;
- for (int i = 0; i < MQ_TYPE_MAX; i++)
+ for (int i = 0; i < MQ_MAX; i++)
{
DL_FOREACH_SAFE(rt->mq[i], mq_elt, tmp)
{
@@ -170,19 +170,27 @@ void mq_runtime_dispatch(struct mq_runtime *rt)
{
struct mq_topic *topic=NULL;
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
- while (rt->mq_len[MQ_TYPE_MAILBOX])
+ while (rt->mq_len[MQ_MAILBOX])
{
- DL_FOREACH_SAFE(rt->mq[MQ_TYPE_MAILBOX], mq_elt, mq_tmp)
+ DL_FOREACH_SAFE(rt->mq[MQ_MAILBOX], mq_elt, mq_tmp)
{
- DL_DELETE(rt->mq[MQ_TYPE_MAILBOX], mq_elt);
- rt->mq_len[MQ_TYPE_MAILBOX] -= 1;
+ DL_DELETE(rt->mq[MQ_MAILBOX], mq_elt);
+ rt->mq_len[MQ_MAILBOX] -= 1;
topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array, (unsigned int)(mq_elt->header.topic_id));
mq_dispatch_one_message(topic, mq_elt);
- DL_APPEND(rt->mq[MQ_TYPE_DEATH_LETTER], mq_elt); // move to dlq list
- rt->mq_len[MQ_TYPE_DEATH_LETTER] += 1;
+ if (rt->defer_enabled==true)
+ {
+ DL_APPEND(rt->mq[MQ_DEATH_LETTER], mq_elt); // move to dlq list
+ rt->mq_len[MQ_DEATH_LETTER] += 1;
+ }
+ else
+ {
+ if(topic->free_cb)topic->free_cb(mq_elt->body, topic->free_cb_arg);
+ FREE(mq_elt);
+ }
}
}
- //mq_runtime_clean(rt);
+ mq_runtime_clean(rt);
return;
}
@@ -215,8 +223,14 @@ int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data)
msg->rt=rt;
msg->header.topic_id = topic_id;
msg->body = data;
- DL_APPEND(rt->mq[MQ_TYPE_MAILBOX], msg);
- rt->mq_len[MQ_TYPE_MAILBOX]+=1;
+ DL_APPEND(rt->mq[MQ_MAILBOX], msg);
+ rt->mq_len[MQ_MAILBOX]+=1;
+
+ if(rt->defer_enabled==false)
+ {
+ mq_runtime_dispatch(rt);
+ }
+
return 0;
}
@@ -250,6 +264,12 @@ struct mq_runtime *mq_runtime_new(struct mq_schema *s)
return rt;
}
+void mq_runtime_defer(struct mq_runtime *rt)
+{
+ if(rt==NULL)return;
+ rt->defer_enabled=true;
+}
+
void mq_runtime_free(struct mq_runtime *rt)
{
if(rt==NULL)return;
diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h
index 342c79b..d0cb695 100644
--- a/infra/mq/mq_internal.h
+++ b/infra/mq/mq_internal.h
@@ -54,17 +54,18 @@ struct mq_schema
enum mq_property
{
- MQ_TYPE_MAILBOX = 0,
- MQ_TYPE_DEATH_LETTER = 1,
- MQ_TYPE_MAX,
+ MQ_MAILBOX = 0,
+ MQ_DEATH_LETTER = 1,
+ MQ_MAX,
};
struct mq_runtime
{
struct mq_schema *schema;
- struct mq_message *mq[MQ_TYPE_MAX];// message queue
- size_t mq_len[MQ_TYPE_MAX];
+ struct mq_message *mq[MQ_MAX];// message queue
+ size_t mq_len[MQ_MAX];
bool is_cleaning;
+ bool defer_enabled;
};
int mq_runtime_publish_message_immediate(struct mq_runtime *rt, int topic_id, void *msg);
diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp
index 06050b2..6c8d5c8 100644
--- a/infra/mq/test/gtest_mq_main.cpp
+++ b/infra/mq/test/gtest_mq_main.cpp
@@ -161,7 +161,7 @@ void test_pub_and_clean_on_msg(int topic_id, void *msg, void *sub_arg)
return;
}
-TEST(mq_runtime, pub_then_clean) {
+TEST(mq_runtime, defer_pub_then_clean) {
struct test_pub_and_clean_env env={};
env.s = mq_schema_new();
@@ -174,6 +174,8 @@ TEST(mq_runtime, pub_then_clean) {
env.rt=mq_runtime_new(env.s);
EXPECT_TRUE(env.rt!=NULL);
+ mq_runtime_defer(env.rt);
+
for(int i=0; i<env.N_round;i++)
{
env.current_round=i;
@@ -370,6 +372,7 @@ TEST(mq_runtime, pub_on_msg_free)
env.N_round=10;
env.rt=mq_runtime_new(env.s);
EXPECT_TRUE(env.rt!=NULL);
+ mq_runtime_defer(env.rt);
for(int i=0; i<env.N_round;i++)
{
env.current_round=i;