summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-06-03 04:13:41 +0800
committeryangwei <[email protected]>2024-06-03 04:13:41 +0800
commit3c7e53b142981b9bfa66033151a7a32e074c615f (patch)
tree4dc027f95002626d6dd1c05dd8ce8ae1f0637ff7
parent30eb96cb24a971c30059961b13065ddd01f1d0d2 (diff)
✨ feat(plugin manager intrisic mq): send empty msg when session closingFeature-closing-send-sesison-msg
-rw-r--r--src/plugin_manager/plugin_manager.c29
-rw-r--r--src/plugin_manager/plugin_manager.h1
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c1
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp174
-rw-r--r--test/plugin_manager/plugin_manager_gtest_mock.h13
5 files changed, 203 insertions, 15 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 2cd5a81..2f20d51 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -1066,12 +1066,35 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
//plug_mgr_rt->enable_session_mq=1;
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt);
plugin_manager_session_message_dispatch(sess);
plug_mgr_rt->enable_session_mq=0;
session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- assert(plug_mgr_rt->pending_mq==NULL);
- assert(plug_mgr_rt->delivered_mq==NULL);
+ assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
+ return;
+}
+
+void plugin_manager_on_session_closing(struct session *sess)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
+ if(plug_mgr_rt==NULL)return;
+ plug_mgr_rt->enable_session_mq=1;
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL);
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL);
+ break;
+ case SESSION_TYPE_UDP:
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL);
+ break;
+ default:
+ break;
+ }
+ plugin_manager_session_message_dispatch(sess);
+ plug_mgr_rt->enable_session_mq=0;
+ session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
return;
}
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index 1305f19..6810da8 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -17,6 +17,7 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt);
void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt);
+void plugin_manager_on_session_closing(struct session *sess);
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index a7b590a..9f2794d 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -119,6 +119,7 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea
void session_free_on_sapp(struct session *sess)
{
sess->state = SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(sess);
if(sess->plug_mgr_rt)
{
plugin_manager_session_runtime_free(sess->plug_mgr_rt);
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index 132d417..2745354 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -762,7 +762,10 @@ TEST(plugin_manager, no_plugin_runtime) {
}
for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
//exit stage
plugin_manager_exit(plug_mgr);
@@ -787,7 +790,7 @@ static void test_basic_session_ctx_free(struct session *sess, void *session_ctx,
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
env->basic_ctx_free_called+=1;
struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
- EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress
+ EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress + closing
FREE(ctx);
return;
}
@@ -802,8 +805,11 @@ static void test_basic_on_session_ingress(struct session *sess, int topic_id, co
EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set
EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get
EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, env), 0);
- env->basic_on_session_ingress_called+=1;
- ctx->called+=1;
+ if(msg)
+ {
+ env->basic_on_session_ingress_called+=1;
+ ctx->called+=1;
+ }
return;
}
@@ -816,6 +822,7 @@ static void test_basic_on_session_egress(struct session *sess, int topic_id, con
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set
EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get
+ EXPECT_TRUE(msg!=NULL);
EXPECT_EQ(session_exdata_get(sess, env->basic_exdata_idx), env);
env->basic_on_session_egress_called+=1;
ctx->called+=1;
@@ -880,7 +887,10 @@ TEST(plugin_manager, basic_session_plugin) {
}
for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
plugin_manager_exit(plug_mgr);
@@ -934,12 +944,15 @@ static void test_mq_pub_on_session(struct session *sess, int topic_id, const voi
EXPECT_TRUE(env!=NULL);
EXPECT_TRUE(ctx!=NULL);
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
- env->test_mq_pub_called+=1;
- ctx->called+=1;
- int *pub_msg=(int *)CALLOC(int, 1);
- *pub_msg=env->test_mq_pub_called;
- EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg),0);
- return;
+ if (msg)
+ {
+ env->test_mq_pub_called += 1;
+ ctx->called += 1;
+ int *pub_msg = (int *)CALLOC(int, 1);
+ *pub_msg = env->test_mq_pub_called;
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0);
+ }
+ return;
}
static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
@@ -960,8 +973,11 @@ static void test_session_msg_free(struct session *sess, void *msg, void *msg_fre
struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);
- env->test_mq_free_called+=1;
- FREE(msg);
+ if(msg)
+ {
+ env->test_mq_free_called+=1;
+ FREE(msg);
+ }
return;
}
@@ -1014,7 +1030,10 @@ TEST(plugin_manager, basic_session_plugin_mq) {
}
for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
plugin_manager_exit(plug_mgr);
@@ -1093,7 +1112,10 @@ TEST(plugin_manager, session_plugin_ctx_new_dettach) {
}
for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
plugin_manager_exit(plug_mgr);
@@ -1113,7 +1135,7 @@ static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *s
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
env->basic_ctx_free_called+=1;
struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
- EXPECT_EQ(ctx->called,env->N_per_session_pkt_cnt);
+ EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt+1));
EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing
@@ -1178,10 +1200,138 @@ TEST(plugin_manager, session_plugin_invalid_send_msg) {
}
for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
// pesudo exit stage
plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+}
+
+
+struct test_session_closing_ctx
+{
+ int pkt_called;
+ int session_free_called;
+ int userdefine_on_msg_called;
+};
+
+
+static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1);
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_new_called+=1;
+ return ctx;
+}
+
+static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt);
+ EXPECT_EQ(ctx->session_free_called,1);
+ FREE(ctx);
+}
+
+static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ if(msg)ctx->pkt_called+=1;
+ if(session_get_current_state(sess)==SESSION_STATE_CLOSING)
+ {
+ EXPECT_EQ(msg,nullptr);
+ ctx->session_free_called+=1;
+ session_mq_publish_message(sess, env->test_mq_topic_id, env);
+ env->test_mq_pub_called+=1;
+ }
+}
+
+static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ ctx->userdefine_on_msg_called+=1;
+ EXPECT_EQ(msg, plugin_env);
+ EXPECT_EQ(session_get_current_state(sess),SESSION_STATE_CLOSING);
+ env->test_mq_sub_called+=1;
+}
+
+TEST(plugin_manager, session_closing) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env);
+ EXPECT_GE(plugin_id,0);
+
+ env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_ctx_new_called,env.N_session);
+ EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+ EXPECT_EQ(env.test_mq_pub_called,env.N_session);
+ EXPECT_EQ(env.test_mq_sub_called,env.N_session);
}
/**********************************************
diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h
index a0d28b7..7d6ffc3 100644
--- a/test/plugin_manager/plugin_manager_gtest_mock.h
+++ b/test/plugin_manager/plugin_manager_gtest_mock.h
@@ -5,6 +5,7 @@ extern "C"
#include "plugin_manager/plugin_manager_interna.h"
#include "stellar_on_sapp/stellar_internal.h"
+#include "stellar/session.h"
@@ -24,8 +25,20 @@ struct packet
struct session
{
struct plugin_manager_runtime *plug_mgr_rt;
+ enum session_type type;
+ enum session_state state;
int sess_pkt_cnt;
};
+enum session_state session_get_current_state(struct session *sess)
+{
+ return sess->state;
+}
+
+enum session_type session_get_type(struct session *sess)
+{
+ return sess->type;
+
+}
struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st)
{