summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--examples/sapp_plugin/publisher_loader.c9
-rw-r--r--include/stellar/stellar.h4
-rw-r--r--src/adapter/session_manager.c48
-rw-r--r--src/adapter/session_manager.h1
-rw-r--r--src/adapter/stellar_internal.h2
5 files changed, 58 insertions, 6 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c
index 0003680..4334f9a 100644
--- a/examples/sapp_plugin/publisher_loader.c
+++ b/examples/sapp_plugin/publisher_loader.c
@@ -24,6 +24,10 @@ static void session_mq_topic_free(void *data, void *cb_arg)
return;
}
+struct stellar *g_stellar=NULL;
+int g_topic_id=-1;
+int g_sub_id=-1;
+
static int session_mq_loader_read(struct session *sess, int topic_id, const void *data, void *cb_arg)
{
struct simple_stream_ctx *ctx =(struct simple_stream_ctx *)data;
@@ -33,11 +37,10 @@ static int session_mq_loader_read(struct session *sess, int topic_id, const void
ctx->s2c_pkts, ctx->s2c_bytes);
printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
+ session_mq_ignore_message(sess, topic_id, g_sub_id);
return 0;
}
-struct stellar *g_stellar=NULL;
-int g_topic_id=-1;
int PUBLISHER_LOADER_INIT()
{
g_stellar = stellar_init("./stellar_plugin/simple_plugin.toml");
@@ -52,7 +55,7 @@ int PUBLISHER_LOADER_INIT()
{
g_topic_id=session_mq_create_topic(g_stellar, "SIMPLE_MQ_TOPIC", session_mq_topic_free, NULL);
}
- session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL);
+ g_sub_id=session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL);
return 0;
}
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index 9751d93..c10c8ec 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -7,6 +7,10 @@ struct stellar;
int stellar_get_worker_thread_num(struct stellar *st);
int stellar_get_current_thread_id(struct stellar *st);
+int stellar_get_session_mq_topic_cnt(struct stellar *st);
+int stellar_get_session_mq_subscriber_cnt(struct stellar *st, int topic_id);
+
+
typedef void *plugin_init_callback(struct stellar *st);
typedef void plugin_exit_callback(void *plugin_ctx);
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
index 1f4bdfa..368feff 100644
--- a/src/adapter/session_manager.c
+++ b/src/adapter/session_manager.c
@@ -196,6 +196,7 @@ int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free
t_schema.subscribers=NULL;
t_schema.sub_cnt=0;
utarray_push_back(st->session_mq_schema_array, &t_schema);
+ st->topic_num+=1;
return t_schema.topic_id;
}
@@ -236,8 +237,17 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id)
{
- // TODO: implement
- return -1;
+ if(topic_id < 0 || sub_id < 0)return -1;
+ if(topic_id >= sess->st->topic_num)return -1;
+ struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array, (unsigned int)topic_id);
+ if(sub_id > topic->sub_cnt)return -1;
+ if(sess->mq_sub_status)
+ {
+ sess->mq_sub_status[topic_id+(sub_id-1)]=0;
+ return 0;
+ }
+ else
+ return -1;
}
int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg)
@@ -251,6 +261,27 @@ int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func
new_subscriber->cb_arg = cb_arg;
DL_APPEND(topic->subscribers, new_subscriber);
topic->sub_cnt+=1;
+ st->sub_num+=1;
+ return topic->sub_cnt;
+}
+
+
+int stellar_get_session_mq_topic_cnt(struct stellar *st)
+{
+ if(st->session_exdata_schema_array)
+ {
+ return utarray_len(st->session_mq_schema_array);
+ }
+ else
+ return 0;
+}
+
+int stellar_get_session_mq_subscriber_cnt(struct stellar *st, int topic_id)
+{
+ if(st->session_exdata_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id);
return topic->sub_cnt;
}
@@ -359,6 +390,8 @@ struct session *session_new(struct stellar *st, enum session_type type, int thre
sess->type = type;
sess->state = SESSION_STATE_OPENING;
sess->mq = NULL;
+ sess->mq_sub_status=CALLOC(char, st->topic_num*st->sub_num);
+ memset(sess->mq_sub_status, 1, st->topic_num*st->sub_num);
sess->ex_data_rt = session_new_exdata_rt(st);
sess->ev_list=NULL;
sess->intrinsic_events=session_intrinsic_events_init(st, sess);
@@ -385,6 +418,10 @@ void session_free(struct session *sess)
{
session_mq_free(sess->mq);
}
+ if(sess->mq_sub_status != NULL)
+ {
+ FREE(sess->mq_sub_status);
+ }
session_free_exdata_runtime(sess, sess->ex_data_rt);
if(sess->cur_pkt)
{
@@ -508,9 +545,14 @@ void session_defer_loop(struct session *sess)
(unsigned int)(mq_elt->topic_id));
if (topic)
{
+ int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg);
+ if(sess->mq_sub_status[(mq_elt->topic_id)+cur_sub_idx] == 1)
+ {
+ sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg);
+ }
+ cur_sub_idx++;
}
if (topic->free_cb)
{
diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h
index ccdee34..cf8f140 100644
--- a/src/adapter/session_manager.h
+++ b/src/adapter/session_manager.h
@@ -22,6 +22,7 @@ struct session
struct session_event *intrinsic_events;
struct session_exdata_runtime *ex_data_rt;
struct session_mq *mq;
+ char *mq_sub_status;
struct session_event *ev_list;
uint64_t set_detach_others;
};
diff --git a/src/adapter/stellar_internal.h b/src/adapter/stellar_internal.h
index fe843bc..02d1eb8 100644
--- a/src/adapter/stellar_internal.h
+++ b/src/adapter/stellar_internal.h
@@ -9,4 +9,6 @@ struct stellar
UT_array *plugin_specs_array;
UT_array *session_mq_schema_array;
UT_array *session_exdata_schema_array;
+ int topic_num;
+ int sub_num;
}; \ No newline at end of file