From b3769f0b9f75581af79f6e2fd332beaee6a7336e Mon Sep 17 00:00:00 2001 From: yangwei Date: Fri, 27 Sep 2024 09:04:21 +0800 Subject: 🦄 refactor(mq internal api): merge duplicate code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- infra/mq/mq.c | 105 ++++++++++++++++++++++++++-------------------------------- 1 file changed, 47 insertions(+), 58 deletions(-) diff --git a/infra/mq/mq.c b/infra/mq/mq.c index 5228572..ff503a8 100644 --- a/infra/mq/mq.c +++ b/infra/mq/mq.c @@ -27,11 +27,11 @@ int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name) { if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1; unsigned int len = utarray_len(s->topic_array); - struct mq_topic *t_schema; + struct mq_topic *topic; for(unsigned int i = 0; i < len; i++) { - t_schema = (struct mq_topic *)utarray_eltptr(s->topic_array, i); - if(strcmp(t_schema->topic_name, topic_name) == 0) + topic = (struct mq_topic *)utarray_eltptr(s->topic_array, i); + if(strcmp(topic->topic_name, topic_name) == 0) { return i; } @@ -39,17 +39,22 @@ int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name) return -1; } -int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +static struct mq_topic *mq_schema_get_topic(struct mq_schema *s, int topic_id) { - if(s == NULL || s->topic_array == NULL)return -1; + if(s==NULL || s->topic_array == NULL || topic_id < 0)return NULL; unsigned int len = utarray_len(s->topic_array); - if(len < (unsigned int)topic_id)return -1; - struct mq_topic *t_schema = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - if(t_schema == NULL)return -1; - t_schema->dispatch_cb=on_dispatch_cb; - t_schema->dispatch_cb_arg=on_dispatch_arg; - t_schema->free_cb=msg_free_cb; - t_schema->free_cb_arg=msg_free_arg; + if (len <= (unsigned int)topic_id)return NULL; + return (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); +} + +int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg) +{ + struct mq_topic *topic = mq_schema_get_topic(s, topic_id); + if(topic == NULL)return -1; + topic->dispatch_cb=on_dispatch_cb; + topic->dispatch_cb_arg=on_dispatch_arg; + topic->free_cb=msg_free_cb; + topic->free_cb_arg=msg_free_arg; return 0; } @@ -65,36 +70,27 @@ int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_d { return -1; } - struct mq_topic t_schema; - memset(&t_schema, 0, sizeof(struct mq_topic)); - t_schema.dispatch_cb=on_dispatch_cb; - t_schema.free_cb=msg_free_cb; - t_schema.topic_name=(char *)topic_name; - t_schema.topic_id=len;//topid_id equals arrary index - t_schema.dispatch_cb_arg=on_dispatch_arg; - t_schema.free_cb_arg=msg_free_arg; - t_schema.subscribers=NULL; - t_schema.subscriber_cnt=0; - utarray_push_back(s->topic_array, &t_schema); + struct mq_topic topic={}; + topic.dispatch_cb=on_dispatch_cb; + topic.free_cb=msg_free_cb; + topic.topic_name=(char *)topic_name; + topic.topic_id=len;//topid_id equals arrary index + topic.dispatch_cb_arg=on_dispatch_arg; + topic.free_cb_arg=msg_free_arg; + topic.subscribers=NULL; + topic.subscriber_cnt=0; + utarray_push_back(s->topic_array, &topic); s->mq_topic_num+=1; - return t_schema.topic_id; + return topic.topic_id; } int mq_schema_destroy_topic(struct mq_schema *s, int topic_id) { - if(s==NULL)return -1; - if(s->topic_array==NULL)return -1; - unsigned int len = utarray_len(s->topic_array); - if (len <= (unsigned int)topic_id) - return -1; - struct mq_topic *topic = - (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - struct mq_subscriber *sub_elt, *sub_tmp; - + struct mq_topic *topic = mq_schema_get_topic(s, topic_id); if(topic == NULL)return -1; - if (topic->is_destroyed == 1)return 0; + struct mq_subscriber *sub_elt, *sub_tmp; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { DL_DELETE(topic->subscribers, sub_elt); @@ -124,27 +120,25 @@ static int mq_dispatch_one_message(struct mq_topic *topic, struct mq_message *mq return 0; } + + int mq_runtime_publish_message_at_once(struct mq_runtime *rt, int topic_id, void *msg) { - if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; - //if(rt->is_cleaning==true)return -1; - unsigned int len = utarray_len(rt->schema->topic_array); - if (len <= (unsigned int)topic_id)return -1; + if(rt==NULL || rt->schema == NULL)return -1; - struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(rt->schema->topic_array,(unsigned int)(topic_id)); + struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); if(topic==NULL)return -1; - struct mq_message *mq_elt = CALLOC(struct mq_message,1); - mq_elt->rt=rt; - mq_elt->header.topic_id = topic_id; - mq_elt->header.priority = STELLAR_MQ_PRIORITY_HIGH; - mq_elt->body = msg; - mq_dispatch_one_message(topic, mq_elt); + struct mq_message mq_elt; + mq_elt.rt=rt; + mq_elt.header.topic_id = topic_id; + mq_elt.header.priority = STELLAR_MQ_PRIORITY_HIGH; + mq_elt.body = msg; + mq_dispatch_one_message(topic, &mq_elt); if (topic->free_cb) { - topic->free_cb(mq_elt->body, topic->free_cb_arg); + topic->free_cb(mq_elt.body, topic->free_cb_arg); } - FREE(mq_elt); return 0; } @@ -205,14 +199,8 @@ void mq_runtime_dispatch(struct mq_runtime *rt) //return 0 if success, otherwise return -1. int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg) { - - if(s == NULL || s->topic_array == NULL)return -1; - - unsigned int len = utarray_len(s->topic_array); - if (len <= (unsigned int)topic_id)return -1; - - struct mq_topic *topic = (struct mq_topic *)utarray_eltptr(s->topic_array, (unsigned int)topic_id); - if(topic==NULL)return -1; + struct mq_topic *topic = mq_schema_get_topic(s, topic_id); + if(topic==NULL)return -1; struct mq_subscriber *new_subscriber = CALLOC(struct mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; @@ -227,12 +215,13 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_property priority) { - if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1; + if(rt==NULL || rt->schema == NULL)return -1; if(rt->is_cleaning==true)return -1; if(priority < STELLAR_MQ_PRIORITY_LOW || priority > STELLAR_MQ_PRIORITY_HIGH)return -1; - unsigned int len = utarray_len(rt->schema->topic_array); - if (len <= (unsigned int)topic_id)return -1; + struct mq_topic *topic = mq_schema_get_topic(rt->schema, topic_id); + if(topic==NULL)return -1; + struct mq_message *msg= CALLOC(struct mq_message,1); msg->rt=rt; msg->header.topic_id = topic_id; -- cgit v1.2.3