summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-25 18:36:31 +0800
committeryangwei <[email protected]>2024-09-25 18:36:31 +0800
commit75d60bb3eaaf626b6e143fb989a972042e225a1a (patch)
tree2eb5502942997b8c875cf9f4822f32dcaab2e239
parent74f77f34112170d081868203441e71b2c78a5452 (diff)
✨ feat(add publish at once API): define in mq_internal.h temporarily
-rw-r--r--infra/mq/mq.c51
-rw-r--r--infra/mq/mq_internal.h3
2 files changed, 39 insertions, 15 deletions
diff --git a/infra/mq/mq.c b/infra/mq/mq.c
index 9e75907..5228572 100644
--- a/infra/mq/mq.c
+++ b/infra/mq/mq.c
@@ -106,25 +106,46 @@ int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
}
-static void mq_dispatch_one_message(struct mq_schema *s, struct mq_message *mq_elt)
+static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt)
{
struct mq_subscriber *sub_elt, *sub_tmp;
- struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array,(unsigned int)(mq_elt->header.topic_id));
- if (topic)
+ if(topic==NULL || mq_elt==NULL)return -1;
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ if (sub_elt->msg_cb)
{
- if (sub_elt->msg_cb)
- {
- if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,
- mq_elt->body,
- sub_elt->msg_cb,
- sub_elt->msg_cb_arg,
- topic->dispatch_cb_arg);
- else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
- }
+ if (topic->dispatch_cb)
+ topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg,
+ topic->dispatch_cb_arg);
+ else
+ sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg);
}
}
+ return 0;
+}
+
+int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg)
+{
+ if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
+ //if(rt->is_cleaning==true)return -1;
+ unsigned int len = utarray_len(rt->schema->topic_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(topic_id));
+ if(topic==NULL)return -1;
+
+ struct mq_message *mq_elt = CALLOC(struct mq_message,1);
+ mq_elt->rt=rt;
+ mq_elt->header.topic_id = topic_id;
+ mq_elt->header.priority = STELLAR_MQ_PRIORITY_HIGH;
+ mq_elt->body = msg;
+ mq_dispatch_one_message(topic, mq_elt);
+ if (topic->free_cb)
+ {
+ topic->free_cb(mq_elt->body, topic->free_cb_arg);
+ }
+ FREE(mq_elt);
+ return 0;
}
void mq_runtime_clean(struct mq_runtime *rt)
@@ -155,6 +176,7 @@ void mq_runtime_clean(struct mq_runtime *rt)
void mq_runtime_dispatch(struct mq_runtime *rt)
{
+ struct mq_topic *topic=NULL;
struct mq_message *mq_elt=NULL, *mq_tmp=NULL;
int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
@@ -168,7 +190,8 @@ void mq_runtime_dispatch(struct mq_runtime *rt)
{
DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
rt->priority_mq_len[mq_elt->header.priority]-=1;
- mq_dispatch_one_message(rt->schema, mq_elt);
+ topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(mq_elt->header.topic_id));
+ mq_dispatch_one_message(topic, mq_elt);
DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list
rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1;
cur_priority=STELLAR_MQ_PRIORITY_HIGH;
diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h
index 4e7a31f..4d60e37 100644
--- a/infra/mq/mq_internal.h
+++ b/infra/mq/mq_internal.h
@@ -70,7 +70,8 @@ struct mq_runtime
bool is_cleaning;
};
-int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority);
+int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *msg, enum mq_property priority);
+int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg);
#ifdef __cplusplus
}