From 3c7e53b142981b9bfa66033151a7a32e074c615f Mon Sep 17 00:00:00 2001 From: yangwei Date: Mon, 3 Jun 2024 04:13:41 +0800 Subject: ✨ feat(plugin manager intrisic mq): send empty msg when session closing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/plugin_manager/plugin_manager.c | 29 +++- src/plugin_manager/plugin_manager.h | 1 + src/stellar_on_sapp/stellar_on_sapp_api.c | 1 + test/plugin_manager/plugin_manager_gtest_main.cpp | 174 ++++++++++++++++++++-- test/plugin_manager/plugin_manager_gtest_mock.h | 13 ++ 5 files changed, 203 insertions(+), 15 deletions(-) diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 2cd5a81..2f20d51 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -1066,12 +1066,35 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; //plug_mgr_rt->enable_session_mq=1; - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt); plugin_manager_session_message_dispatch(sess); plug_mgr_rt->enable_session_mq=0; session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); - assert(plug_mgr_rt->pending_mq==NULL); - assert(plug_mgr_rt->delivered_mq==NULL); + assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); + return; +} + +void plugin_manager_on_session_closing(struct session *sess) +{ + struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); + if(plug_mgr_rt==NULL)return; + plug_mgr_rt->enable_session_mq=1; + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL); + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL); + break; + case SESSION_TYPE_UDP: + session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL); + break; + default: + break; + } + plugin_manager_session_message_dispatch(sess); + plug_mgr_rt->enable_session_mq=0; + session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); + assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); return; } diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h index 1305f19..6810da8 100644 --- a/src/plugin_manager/plugin_manager.h +++ b/src/plugin_manager/plugin_manager.h @@ -17,6 +17,7 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); //publish and dispatch session msg(msg, pkt) on session_mq void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt); void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_closing(struct session *sess); struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); \ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index a7b590a..9f2794d 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -119,6 +119,7 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea void session_free_on_sapp(struct session *sess) { sess->state = SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(sess); if(sess->plug_mgr_rt) { plugin_manager_session_runtime_free(sess->plug_mgr_rt); diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index 132d417..2745354 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -762,7 +762,10 @@ TEST(plugin_manager, no_plugin_runtime) { } for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } //exit stage plugin_manager_exit(plug_mgr); @@ -787,7 +790,7 @@ static void test_basic_session_ctx_free(struct session *sess, void *session_ctx, struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; env->basic_ctx_free_called+=1; struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; - EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress + EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress + closing FREE(ctx); return; } @@ -802,8 +805,11 @@ static void test_basic_on_session_ingress(struct session *sess, int topic_id, co EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, env), 0); - env->basic_on_session_ingress_called+=1; - ctx->called+=1; + if(msg) + { + env->basic_on_session_ingress_called+=1; + ctx->called+=1; + } return; } @@ -816,6 +822,7 @@ static void test_basic_on_session_egress(struct session *sess, int topic_id, con EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get + EXPECT_TRUE(msg!=NULL); EXPECT_EQ(session_exdata_get(sess, env->basic_exdata_idx), env); env->basic_on_session_egress_called+=1; ctx->called+=1; @@ -880,7 +887,10 @@ TEST(plugin_manager, basic_session_plugin) { } for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } plugin_manager_exit(plug_mgr); @@ -934,12 +944,15 @@ static void test_mq_pub_on_session(struct session *sess, int topic_id, const voi EXPECT_TRUE(env!=NULL); EXPECT_TRUE(ctx!=NULL); EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); - env->test_mq_pub_called+=1; - ctx->called+=1; - int *pub_msg=(int *)CALLOC(int, 1); - *pub_msg=env->test_mq_pub_called; - EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg),0); - return; + if (msg) + { + env->test_mq_pub_called += 1; + ctx->called += 1; + int *pub_msg = (int *)CALLOC(int, 1); + *pub_msg = env->test_mq_pub_called; + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0); + } + return; } static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) @@ -960,8 +973,11 @@ static void test_session_msg_free(struct session *sess, void *msg, void *msg_fre struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); - env->test_mq_free_called+=1; - FREE(msg); + if(msg) + { + env->test_mq_free_called+=1; + FREE(msg); + } return; } @@ -1014,7 +1030,10 @@ TEST(plugin_manager, basic_session_plugin_mq) { } for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } plugin_manager_exit(plug_mgr); @@ -1093,7 +1112,10 @@ TEST(plugin_manager, session_plugin_ctx_new_dettach) { } for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } plugin_manager_exit(plug_mgr); @@ -1113,7 +1135,7 @@ static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *s struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; env->basic_ctx_free_called+=1; struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; - EXPECT_EQ(ctx->called,env->N_per_session_pkt_cnt); + EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt+1)); EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing @@ -1178,10 +1200,138 @@ TEST(plugin_manager, session_plugin_invalid_send_msg) { } for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } // pesudo exit stage plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_ctx_free_called,env.N_session); +} + + +struct test_session_closing_ctx +{ + int pkt_called; + int session_free_called; + int userdefine_on_msg_called; +}; + + +static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1); + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_new_called+=1; + return ctx; +} + +static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt); + EXPECT_EQ(ctx->session_free_called,1); + FREE(ctx); +} + +static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + if(msg)ctx->pkt_called+=1; + if(session_get_current_state(sess)==SESSION_STATE_CLOSING) + { + EXPECT_EQ(msg,nullptr); + ctx->session_free_called+=1; + session_mq_publish_message(sess, env->test_mq_topic_id, env); + env->test_mq_pub_called+=1; + } +} + +static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + ctx->userdefine_on_msg_called+=1; + EXPECT_EQ(msg, plugin_env); + EXPECT_EQ(session_get_current_state(sess),SESSION_STATE_CLOSING); + env->test_mq_sub_called+=1; +} + +TEST(plugin_manager, session_closing) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); + + env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_ctx_new_called,env.N_session); + EXPECT_EQ(env.basic_ctx_free_called,env.N_session); + EXPECT_EQ(env.test_mq_pub_called,env.N_session); + EXPECT_EQ(env.test_mq_sub_called,env.N_session); } /********************************************** diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h index a0d28b7..7d6ffc3 100644 --- a/test/plugin_manager/plugin_manager_gtest_mock.h +++ b/test/plugin_manager/plugin_manager_gtest_mock.h @@ -5,6 +5,7 @@ extern "C" #include "plugin_manager/plugin_manager_interna.h" #include "stellar_on_sapp/stellar_internal.h" +#include "stellar/session.h" @@ -24,8 +25,20 @@ struct packet struct session { struct plugin_manager_runtime *plug_mgr_rt; + enum session_type type; + enum session_state state; int sess_pkt_cnt; }; +enum session_state session_get_current_state(struct session *sess) +{ + return sess->state; +} + +enum session_type session_get_type(struct session *sess) +{ + return sess->type; + +} struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st) { -- cgit v1.2.3