summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-10 11:41:14 +0800
committeryangwei <[email protected]>2024-07-10 12:10:40 +0800
commit6a43c2bc21427a6a728cdf517d9317fe56442b36 (patch)
treed5100a3c016d4ec8969f15560a627e75cc4a36f7
parentbe78e4bef7dcfe46613e0c9d2f2117b725508924 (diff)
🧪 test(plugin_manager gtest): add session_mq_topic_is_active test case
-rw-r--r--deps/bitmap/bitmap.c17
-rw-r--r--deps/bitmap/bitmap.h4
-rw-r--r--src/plugin_manager/plugin_manager.c38
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp233
4 files changed, 279 insertions, 13 deletions
diff --git a/deps/bitmap/bitmap.c b/deps/bitmap/bitmap.c
index 59fb4b7..068bc3f 100644
--- a/deps/bitmap/bitmap.c
+++ b/deps/bitmap/bitmap.c
@@ -46,8 +46,21 @@ void bitmap_free(struct bitmap *bmp) {
}
}
-
-
+int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length) {
+ if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
+ return -1; // Return error code if coordinates are out of bounds
+ }
+ int idx = y * bmp->width + x;
+ if (idx + length > bmp->width * bmp->height) {
+ return -1; // Return error if range exceeds bitmap bounds
+ }
+ for (int i = 0; i < length; i++) {
+ if (bmp->data[(idx + i) / 8] & (1 << ((idx + i) % 8))) {
+ return 0; // Return 0 if any bit is not zero
+ }
+ }
+ return 1; // Return 1 if all bits are zero
+}
int test_bitmap() {
struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap
diff --git a/deps/bitmap/bitmap.h b/deps/bitmap/bitmap.h
index 210ea35..d690cb7 100644
--- a/deps/bitmap/bitmap.h
+++ b/deps/bitmap/bitmap.h
@@ -2,4 +2,6 @@ struct bitmap;
struct bitmap * bitmap_new(int width, int height, int value);
int bitmap_set(struct bitmap *bmp, int x, int y, int value);
int bitmap_get(struct bitmap *bmp, int x, int y);
-void bitmap_free(struct bitmap *bmp); \ No newline at end of file
+void bitmap_free(struct bitmap *bmp);
+
+int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length); \ No newline at end of file
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 8af47ff..3f3fead 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -429,6 +429,23 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
return ret;
}
+static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
+{
+ //update topic status
+ switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt))
+ {
+ case 1:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0);
+ break;
+ case 0:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1);
+ break;
+ default:
+ break;
+ }
+ return;
+}
+
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
if(bit_value!=0 && bit_value!=1)return -1;
@@ -451,9 +468,10 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
- bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value);
}
}
+ session_mq_update_topic_status(plug_mgr_rt, topic);
return 0;
}
return -1;
@@ -551,7 +569,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
plug_mgr_rt->current_session_plugin_id=sub_elt->plugin_idx;
- if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)
+ if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
@@ -573,7 +591,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->header.topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)// ctx_new maybe call detach, need check again
{
sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
@@ -581,7 +599,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
cur_sub_idx++;
}
- if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->header.topic_id, 1, 0);
+ if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
@@ -596,7 +614,7 @@ int session_mq_topic_is_active(struct session *sess, int topic_id)
assert(plug_mgr_rt);
if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
- if(bitmap_get(plug_mgr_rt->session_topic_status, topic_id, 1) == 0)return 0;
+ if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
return 1;
}
@@ -642,8 +660,8 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
rt->sess = sess;
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
- rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1);
- rt->session_topic_status=bitmap_new(plug_mgr->session_mq_topic_num, 1, 1);
+ rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->session_mq_topic_num, 1);
+ rt->session_topic_status=bitmap_new(1, plug_mgr->session_mq_topic_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
if(plug_mgr->registered_session_plugin_array)
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
@@ -869,7 +887,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
if(session_plugin_schema==NULL)return;
-
+ struct stellar_mq_topic_schema *topic=NULL;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
//FIXME: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
if(plug_mgr_rt->session_mq_status)
@@ -877,7 +895,9 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
- bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
+ session_mq_update_topic_status(plug_mgr_rt, topic);
}
}
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index 4913586..c380953 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -295,6 +295,10 @@ struct session_plugin_env
int test_mq_sub_called;
int test_mq_free_called;
int test_mq_topic_id;
+ int plugin_id_1;
+ int plugin_id_2;
+ int plugin_id_1_called;
+ int plugin_id_2_called;
};
TEST(plugin_manager, no_plugin_register_runtime) {
@@ -1058,7 +1062,234 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
EXPECT_EQ(env.test_mq_sub_called,env.N_session);
}
-//TODO: test session_mq_topic_is_active
+struct test_session_called_ctx
+{
+ int called;
+};
+
+static void *test_session_called_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_session_called_ctx *ctx=CALLOC(struct test_session_called_ctx, 1);
+ return ctx;
+}
+
+static void test_session_called_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ FREE(session_ctx);
+}
+
+//test session topic is active
+static void test_session_mq_topic_is_active_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_1_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ session_mq_ignore_message(sess, topic_id, env->plugin_id_1);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+ return;
+}
+
+static void test_session_mq_topic_is_active_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_2_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+
+ if(ctx->called > env->N_per_session_pkt_cnt/2)
+ {
+ session_mq_ignore_message(sess, topic_id, env->plugin_id_2);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0);
+ }
+ return;
+}
+
+TEST(plugin_manager, test_session_mq_topic_is_active) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore
+}
+
+//test dettach session
+static void test_session_dettach_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_1_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ stellar_session_plugin_dettach_current_session(sess);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+ return;
+}
+
+static void test_session_dettach_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_2_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+
+ if(ctx->called > env->N_per_session_pkt_cnt/2)
+ {
+ stellar_session_plugin_dettach_current_session(sess);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0);
+ }
+
+ return;
+}
+
+TEST(plugin_manager, test_session_dettach) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore
+}
/**********************************************
* TEST PLUGIN MANAGER ON POLLING PLUGIN INIT *