summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-19 15:58:39 +0800
committeryangwei <[email protected]>2024-09-19 15:58:39 +0800
commit17b537b19ad41326f1f0fc6fffcee2d04ef14405 (patch)
tree115b4bf0e270b21a169f68c911f78d3b228c52b0
parent6af61355e38060ec9de79045b09e606aa5acb126 (diff)
🧪 test(mq): add priority test case
-rw-r--r--infra/mq/mq.c4
-rw-r--r--infra/mq/mq_internal.h1
-rw-r--r--infra/mq/test/gtest_mq_main.cpp217
3 files changed, 104 insertions, 118 deletions
diff --git a/infra/mq/mq.c b/infra/mq/mq.c
index 76ae081..c4b315c 100644
--- a/infra/mq/mq.c
+++ b/infra/mq/mq.c
@@ -195,7 +195,7 @@ int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_ms
return 0;
}
-static int mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority)
+int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority)
{
if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
@@ -212,7 +212,7 @@ static int mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id,
int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *data)
{
- return mq_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM);
+ return mq_runtime_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM);
}
struct mq_schema *mq_schema_new()
diff --git a/infra/mq/mq_internal.h b/infra/mq/mq_internal.h
index bbcbb0c..c246894 100644
--- a/infra/mq/mq_internal.h
+++ b/infra/mq/mq_internal.h
@@ -66,6 +66,7 @@ struct mq_runtime
struct mq_message *dealth_letter_queue;// dlq list
};
+int mq_runtime_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum mq_priority priority);
#ifdef __cplusplus
}
diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp
index c09dd96..423b41d 100644
--- a/infra/mq/test/gtest_mq_main.cpp
+++ b/infra/mq/test/gtest_mq_main.cpp
@@ -4,6 +4,8 @@
#include "mq/mq_internal.h"
+#include "stellar/utils.h"
+
#define TOPIC_NAME_MAX 512
/*******************************************
@@ -358,8 +360,8 @@ TEST(mq_runtime, sub_with_dispatch_cb) {
EXPECT_EQ(env.sess_dispatch_called, env.N_session);
EXPECT_EQ(env.sess_msg_free_called, env.N_session);
}
-
#if 0
+//TODO: test case mq for overlimit
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{
struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
@@ -406,7 +408,6 @@ static void overlimit_pub_on_packet(struct packet *pkt, void *plugin_env)
}
return;
}
-//TODO: test case mq for overlimit
TEST(mq_runtime, packet_plugins_pub_overlimit) {
@@ -479,160 +480,144 @@ TEST(mq_runtime, packet_plugins_pub_overlimit) {
}
-//TODO: test case, mq priority
+#endif
+
+struct test_priority_mq_env
+{
+ struct mq_schema *s;
+ struct mq_runtime *rt;
+ int N_round;
+ int current_round;
+ int tcp_topic_id;
+ int test_mq_topic_id;
+ int plugin_id_1_called;
+ int plugin_id_2_called;
+
+};
//test dettach session
-static void test_session_mq_priority_plugin_1_on_msg(int topic_id, const void *msg, void *plugin_env)
+static void test_session_mq_priority_plugin_1_on_msg(int topic_id, void *msg, void *plugin_env)
{
- struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
env->plugin_id_1_called+=1;
- if(topic_id == env->intrinsc_tcp_topic_id)
- {
- struct session *sess = (struct session *)msg;
- struct test_session_called_ctx *ctx =
- (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_1_id);
- if (ctx == NULL)
- {
- ctx = CALLOC(struct test_session_called_ctx, 1);
- session_exdata_set(sess, env->exdata_ctx_1_id, ctx);
- }
- ctx->called+=1;
- EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority
- EXPECT_EQ(stellar_mq_publish_message_with_priority(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0);
- }
- if(topic_id == env->test_mq_topic_id)
+ if(topic_id == env->tcp_topic_id)
{
- if(ctx->called%3 == 2)
+ EXPECT_EQ(env->plugin_id_1_called%3, 1);// tcp msg has high priority
+ if((long)msg%2==0)
{
- EXPECT_EQ((int)(long)msg, env->plugin_id_2);
+ EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_LOW), 0);
}
- if(ctx->called%3 == 0)
+ else
{
- EXPECT_EQ((int )(long)msg, env->plugin_id_1);
+ EXPECT_EQ(mq_runtime_publish_message_with_priority(env->rt, env->test_mq_topic_id, (void *)(long)1, STELLAR_MQ_PRIORITY_HIGH), 0);
}
}
+ if(topic_id == env->test_mq_topic_id)
+ {
+ if (env->current_round % 2 == 0)
+ {
+ if (env->plugin_id_1_called % 3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
+ }
+ if (env->plugin_id_1_called % 3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
+ }
+ }
+ else
+ {
+ if (env->plugin_id_1_called % 3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
+ }
+ if (env->plugin_id_1_called % 3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
+ }
+ }
+ }
return;
}
-static void test_session_mq_priority_plugin_2_on_msg(int topic_id, const void *msg, void *plugin_env)
+static void test_session_mq_priority_plugin_2_on_msg(int topic_id, void *msg, void *plugin_env)
{
- struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_priority_mq_env *env = (struct test_priority_mq_env *)plugin_env;
env->plugin_id_2_called+=1;
-
- if(topic_id == env->intrinsc_tcp_topic_id)
+ if(topic_id == env->tcp_topic_id)
{
- struct session *sess = (struct session *)msg;
- struct test_session_called_ctx *ctx =
- (struct test_session_called_ctx *)session_exdata_get(sess, env->exdata_ctx_2_id);
- if (ctx == NULL)
- {
- ctx = CALLOC(struct test_session_called_ctx, 1);
- session_exdata_set(sess, env->exdata_ctx_2_id, ctx);
- }
- ctx->called+=1;
- EXPECT_EQ(ctx->called % 3, 1);
- // publish msg has normal priority
- EXPECT_EQ(stellar_mq_publish_message(env->plug_mgr->st, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0);
+ EXPECT_EQ(env->plugin_id_2_called % 3, 1);
+ // tcp msg has normal priority
+ EXPECT_EQ(mq_runtime_publish_message(env->rt, env->test_mq_topic_id, (void *)(long)2), 0);
}
if(topic_id == env->test_mq_topic_id)
{
- if(ctx->called%3 == 2)
- {
- EXPECT_EQ((int)(long)msg, env->plugin_id_2);
- }
- if(ctx->called%3 == 0)
- {
- EXPECT_EQ((int)(long)msg, env->plugin_id_1);
- }
- }
+ if (env->current_round % 2 == 0)
+ {
+ if (env->plugin_id_2_called % 3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, 2); // msg 2 has normal priority
+ }
+ if (env->plugin_id_2_called % 3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, 1); // msg 1 has low priority
+ }
+ }
+ else
+ {
+ if (env->plugin_id_2_called % 3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, 1); // msg 2 has normal priority
+ }
+ if (env->plugin_id_2_called % 3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, 2); // msg 1 has low priority
+ }
+ }
+ }
return;
}
-TEST(mq_runtime, test_session_mq_priority) {
-
- struct stellar st={0};
- struct session_plugin_env env;
- memset(&env, 0, sizeof(struct session_plugin_env));
-
-// mock init stage
- struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL, MAX_MSG_PER_STAGE);
- whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
-
-// plugin manager register plugin
+TEST(mq_runtime, basic_mq_priority) {
- int plugin_id_1=stellar_plugin_register(&st, 0,NULL, NULL, &env);
- EXPECT_GE(plugin_id_1,0);
+ struct test_priority_mq_env env={};
- int plugin_id_2=stellar_plugin_register(&st, 0, NULL, NULL, &env);
- EXPECT_GE(plugin_id_2,0);
-
- env.plugin_id_1=plugin_id_1;
- env.plugin_id_2=plugin_id_2;
-
- env.exdata_ctx_1_id=stellar_exdata_new_index(&st, "SESSION_CTX_1", stellar_exdata_free_default, &env) ;
- env.exdata_ctx_2_id=stellar_exdata_new_index(&st, "SESSION_CTX_2", stellar_exdata_free_default, &env) ;
+ env.s=mq_schema_new();
- env.intrinsc_tcp_topic_id=stellar_mq_create_topic(&st, TOPIC_TCP_INPUT, NULL, NULL, NULL);
- EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
- EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
- EXPECT_EQ(stellar_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+ env.tcp_topic_id=mq_schema_create_topic(env.s, TOPIC_TCP, NULL, NULL, NULL, &env);
+ EXPECT_GE(env.tcp_topic_id, 0);
+
+ EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
+ EXPECT_EQ(mq_schema_subscribe(env.s, env.tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
- env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
+ env.test_mq_topic_id=mq_schema_create_topic(env.s, "SESSION_PRIORITY_TOPIC", NULL, NULL, NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);
- EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
- EXPECT_EQ(stellar_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+ EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, &env), 0);
+ EXPECT_EQ(mq_schema_subscribe(env.s, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, &env), 0);
// mock packet and session
- env.plug_mgr=plug_mgr;
- env.N_per_session_pkt_cnt=10;
- env.N_session=10;
-
- struct packet pkt={&st, TCP, 6};
-
- struct session sess[env.N_session];
- memset(&sess, 0, sizeof(sess));
-
-// mock running stage
- for(int i=0; i < env.N_session; i++)
- {
- sess[i].state=SESSION_STATE_OPENING;
- sess[i].session_exdat_rt=session_exdata_runtime_new(plug_mgr);
- sess[i].type=SESSION_TYPE_TCP;
- }
+ env.rt=mq_runtime_new(env.s);
+ env.N_round=10;
- for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ for (int i = 0; i < env.N_round; i++)
{
- plugin_manager_on_packet_input(plug_mgr, &pkt);
-
- for (int i = 0; i < env.N_session; i++)
- {
- sess[i].sess_pkt_cnt+=1;
- sess[i].state=SESSION_STATE_ACTIVE;
- stellar_mq_publish_message(&st, env.intrinsc_tcp_topic_id, &sess[i]);
- }
+ env.current_round=i;
+ mq_runtime_publish_message(env.rt, env.tcp_topic_id, (void *)(long)i);
+ mq_runtime_dispatch(env.rt);
- plugin_manager_on_packet_output(plug_mgr, &pkt);
}
- for(int i=0; i < env.N_session; i++)
- {
- sess[i].state=SESSION_STATE_CLOSING;
- session_exdata_runtime_free(sess[i].session_exdat_rt);
- }
-
-// mock exit stage
- plugin_manager_exit(plug_mgr);
-
- // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
- EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
- EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt)*3));
+ mq_runtime_free(env.rt);
+ mq_schema_free(env.s);
+ // publish TCP TOPIC N_round, and SESSION_PRIORITY_TOPIC*2
+ EXPECT_EQ(env.plugin_id_1_called,env.N_round*3);
+ EXPECT_EQ(env.plugin_id_2_called,env.N_round*3);
}
-#endif
/**********************************************
* GTEST MAIN *