summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-23 17:40:24 +0800
committeryangwei <[email protected]>2024-07-23 18:14:04 +0800
commite94c0f07702b003f6db0d57292f3200906e02bef (patch)
tree67c91901a375d450ea0c787fbc7b2fddf9d9d6be
parent4f9c5866f72dff2c01e82f86638afc40790eb09f (diff)
🧪 test(packet exdata): test case pub msg when exdata free
-rw-r--r--src/plugin_manager/plugin_manager.c10
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp72
2 files changed, 81 insertions, 1 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 218b23e..7ab8f99 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -672,6 +672,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
struct stellar *st = packet_stellar_get(pkt);
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
int tid = stellar_get_current_thread_id(plug_mgr->st);
+ if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1;
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,SESSION_MQ_PRIORITY_HIGH)==0)
{
@@ -721,6 +722,7 @@ int session_mq_publish_message_with_priority(struct session *sess, int topic_id,
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1;
if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
int tid = stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
@@ -949,10 +951,13 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
- per_thread_packet_exdata_arrary_clean(plug_mgr, pkt);
+ int tid=stellar_get_current_thread_id(plug_mgr->st);
+ stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
stellar_mq_free(NULL, pkt,
&plug_mgr->per_thread_data[stellar_get_current_thread_id(plug_mgr->st)].dealth_letter_queue,
plug_mgr->stellar_mq_schema_array);
+ per_thread_packet_exdata_arrary_clean(plug_mgr, pkt);
}
/*********************************************
@@ -1053,6 +1058,7 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, SESSION_MQ_PRIORITY_HIGH);
int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
+ plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
stellar_mq_free(plug_mgr_rt->sess,pkt, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
return;
}
@@ -1061,6 +1067,7 @@ void plugin_manager_on_session_closing(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
+ plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
@@ -1075,6 +1082,7 @@ void plugin_manager_on_session_closing(struct session *sess)
}
int tid=stellar_get_current_thread_id(plug_mgr_rt->plug_mgr->st);
stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
+ plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
stellar_mq_free(plug_mgr_rt->sess,NULL,&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
return;
}
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index 08cb376..90c3bf1 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -680,6 +680,77 @@ TEST(plugin_manager, packet_plugins_pub_overlimit) {
}
+
+static void test_exdata_free_pub_msg_exdata_free(struct packet *pkt, int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ EXPECT_EQ(ex_ptr, pkt);
+ env->exdata_free_called[idx]+=1;
+ EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), 0 );// publish message in packet exdata_free is ok
+ env->msg_pub_cnt+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_free(struct packet *pkt, void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ EXPECT_EQ(pkt, msg);
+ env->msg_free_cnt+=1;
+ EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[0], pkt), -1 );// publish message in packet msg_free is illegal
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0);
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_EQ(topic_id, env->packet_topic_id[0]);
+ env->msg_sub_cnt+=1;
+}
+
+TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ env.packet_exdata_idx[0]=stellar_packet_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
+ env.packet_topic_id[0]=stellar_packet_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
+
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+ EXPECT_EQ(env.msg_pub_cnt, env.msg_sub_cnt);
+ EXPECT_EQ(env.msg_pub_cnt, env.msg_free_cnt);
+ EXPECT_EQ(env.msg_free_cnt, 1);
+ EXPECT_EQ(env.exdata_free_called[0], 1);
+}
+
/**********************************************
* TEST PLUGIN MANAGER ON SESSION PLUGIN INIT *
**********************************************/
@@ -1256,6 +1327,7 @@ static void test_overlimit_session_msg_free(struct session *sess, void *msg, voi
{
struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free
if(msg)
{
EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);