diff options
| author | yangwei <[email protected]> | 2024-07-23 17:40:24 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-23 18:14:04 +0800 |
| commit | e94c0f07702b003f6db0d57292f3200906e02bef (patch) | |
| tree | 67c91901a375d450ea0c787fbc7b2fddf9d9d6be | |
| parent | 4f9c5866f72dff2c01e82f86638afc40790eb09f (diff) | |
🧪 test(packet exdata): test case pub msg when exdata free
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 10 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 72 |
2 files changed, 81 insertions, 1 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 218b23e..7ab8f99 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -672,6 +672,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data) struct stellar *st = packet_stellar_get(pkt); struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); int tid = stellar_get_current_thread_id(plug_mgr->st); + if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1; if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1; if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0) { @@ -721,6 +722,7 @@ int session_mq_publish_message_with_priority(struct session *sess, int topic_id, struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1; if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; int tid = stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0) @@ -949,10 +951,13 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; - per_thread_packet_exdata_arrary_clean(plug_mgr, pkt); + int tid=stellar_get_current_thread_id(plug_mgr->st); + stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish stellar_mq_free(NULL, pkt, &plug_mgr->per_thread_data[stellar_get_current_thread_id(plug_mgr->st)].dealth_letter_queue, plug_mgr->stellar_mq_schema_array); + per_thread_packet_exdata_arrary_clean(plug_mgr, pkt); } /********************************************* @@ -1053,6 +1058,7 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH); int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); + plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish stellar_mq_free(plug_mgr_rt->sess,pkt, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); return; } @@ -1061,6 +1067,7 @@ void plugin_manager_on_session_closing(struct session *sess) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; + plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt switch (session_get_type(sess)) { case SESSION_TYPE_TCP: @@ -1075,6 +1082,7 @@ void plugin_manager_on_session_closing(struct session *sess) } int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st); stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL); + plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish stellar_mq_free(plug_mgr_rt->sess,NULL,&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); return; } diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index 08cb376..90c3bf1 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -680,6 +680,77 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) { } + +static void test_exdata_free_pub_msg_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + EXPECT_EQ(ex_ptr, pkt); + env->exdata_free_called[idx]+=1; + EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), 0 );// publish message in packet exdata_free is ok + env->msg_pub_cnt+=1; + return; +} + +static void test_exdata_free_pub_msg_free(struct packet *pkt, void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + EXPECT_EQ(pkt, msg); + env->msg_free_cnt+=1; + EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), -1 );// publish message in packet msg_free is illegal + return; +} + +static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0); + env->basic_on_packet_called+=1; + return; +} + +static void test_exdata_free_pub_msg_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_EQ(topic_id, env->packet_topic_id[0]); + env->msg_sub_cnt+=1; +} + +TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + env.packet_exdata_idx[0]=stellar_packet_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); + env.packet_topic_id[0]=stellar_packet_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); + + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); + EXPECT_EQ(env.msg_pub_cnt, env.msg_sub_cnt); + EXPECT_EQ(env.msg_pub_cnt, env.msg_free_cnt); + EXPECT_EQ(env.msg_free_cnt, 1); + EXPECT_EQ(env.exdata_free_called[0], 1); +} + /********************************************** * TEST PLUGIN MANAGER ON SESSION PLUGIN INIT * **********************************************/ @@ -1256,6 +1327,7 @@ static void test_overlimit_session_msg_free(struct session *sess, void *msg, voi { struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free if(msg) { EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); |
