summaryrefslogtreecommitdiff
path: root/src/plugin_manager/plugin_manager.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
-rw-r--r--src/plugin_manager/plugin_manager.c28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index f450c3a..5331e7c 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -406,7 +406,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms
utarray_new(*mq_schema_array, &stellar_mq_topic_schema_icd);
}
unsigned int len = utarray_len(*mq_schema_array);
- if(stellar_session_mq_get_topic_id(st, topic_name) >= 0)
+ if(stellar_mq_get_topic_id(topic_name, *mq_schema_array) >= 0)
{
return -1;
}
@@ -432,15 +432,17 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
(struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- if (topic)
+ if(topic == NULL)return -1;
+
+ if (topic->is_destroyed == 1)return 0;
+
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- DL_DELETE(topic->subscribers, sub_elt);
- FREE(sub_elt);
- }
+ DL_DELETE(topic->subscribers, sub_elt);
+ FREE(sub_elt);
}
- return 0; // success
+ topic->is_destroyed = 1;
+ return 1; // success
}
int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message **mq)
@@ -489,7 +491,9 @@ int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- return stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array);
+ int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array);
+ if(ret==1)plug_mgr->packet_mq_topic_num-=1;
+ return ret;
}
//return 0 if success, otherwise return -1.
@@ -642,7 +646,9 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
- return stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array);
+ int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array);
+ if(ret==1)plug_mgr->session_mq_topic_num-=1;
+ return ret;
}
int session_mq_publish_message(struct session *sess, int topic_id, void *data)
@@ -774,7 +780,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
plugin_ctx_rt->state = ACTIVE;
}
}
- if (sub_elt->sess_msg_cb)
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again
sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}