diff options
| author | yangwei <[email protected]> | 2024-09-25 18:36:31 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-09-25 18:36:31 +0800 |
| commit | 75d60bb3eaaf626b6e143fb989a972042e225a1a (patch) | |
| tree | 2eb5502942997b8c875cf9f4822f32dcaab2e239 | |
| parent | 74f77f34112170d081868203441e71b2c78a5452 (diff) | |
✨ feat(add publish at once API): define in mq_internal.h temporarily
| -rw-r--r-- | infra/mq/mq.c | 51 | ||||
| -rw-r--r-- | infra/mq/mq_internal.h | 3 |
2 files changed, 39 insertions, 15 deletions
diff --git a/infra/mq/mq.c b/infra/mq/mq.c index 9e75907..5228572 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -106,25 +106,46 @@ int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) } -static void mq_dispatch_one_message(struct mq_schema *s, struct mq_message *mq_elt) +static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq_elt) { struct mq_subscriber *sub_elt, *sub_tmp; - struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array,(unsigned int)(mq_elt->header.topic_id)); - if (topic) + if(topic==NULL || mq_elt==NULL)return -1; + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + if (sub_elt->msg_cb) { - if (sub_elt->msg_cb) - { - if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id, - mq_elt->body, - sub_elt->msg_cb, - sub_elt->msg_cb_arg, - topic->dispatch_cb_arg); - else sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg); - } + if (topic->dispatch_cb) + topic->dispatch_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb, sub_elt->msg_cb_arg, + topic->dispatch_cb_arg); + else + sub_elt->msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->msg_cb_arg); } } + return 0; +} + +int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg) +{ + if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; + //if(rt->is_cleaning==true)return -1; + unsigned int len = utarray_len(rt->schema->topic_array); + if (len <= (unsigned int)topic_id)return -1; + + struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(topic_id)); + if(topic==NULL)return -1; + + struct mq_message *mq_elt = CALLOC(struct mq_message,1); + mq_elt->rt=rt; + mq_elt->header.topic_id = topic_id; + mq_elt->header.priority = STELLAR_MQ_PRIORITY_HIGH; + mq_elt->body = msg; + mq_dispatch_one_message(topic, mq_elt); + if (topic->free_cb) + { + topic->free_cb(mq_elt->body, topic->free_cb_arg); + } + FREE(mq_elt); + return 0; } void mq_runtime_clean(struct mq_runtime *rt) @@ -155,6 +176,7 @@ void mq_runtime_clean(struct mq_runtime *rt) void mq_runtime_dispatch(struct mq_runtime *rt) { + struct mq_topic *topic=NULL; struct mq_message *mq_elt=NULL, *mq_tmp=NULL; int cur_priority = STELLAR_MQ_PRIORITY_HIGH; while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) @@ -168,7 +190,8 @@ void mq_runtime_dispatch(struct mq_runtime *rt) { DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt); rt->priority_mq_len[mq_elt->header.priority]-=1; - mq_dispatch_one_message(rt->schema, mq_elt); + topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(mq_elt->header.topic_id)); + mq_dispatch_one_message(topic, mq_elt); DL_APPEND(rt->priority_mq[STELLAR_MQ_DEATH_LETTER], mq_elt); // move to dlq list rt->priority_mq_len[STELLAR_MQ_DEATH_LETTER]+=1; cur_priority=STELLAR_MQ_PRIORITY_HIGH; diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h index 4e7a31f..4d60e37 100644 --- a/infra/mq/mq_internal.h +++ b/infra/mq/mq_internal.h @@ -70,7 +70,8 @@ struct mq_runtime bool is_cleaning; }; -int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority); +int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *msg, enum mq_property priority); +int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg); #ifdef __cplusplus } |
