summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-14 16:44:36 +0800
committeryangwei <[email protected]>2024-09-14 16:44:36 +0800
commit7305c64a8cfe66f37c335457cf4fbbd8a9e95e6a (patch)
tree732e98dea5068b54104d882a34ba4e05c6c11c13
parent5150a03512a165f7c2f8f9c67e8d9fbf10516c8a (diff)
🧪 test(mq): add dispatch_cb test case
-rw-r--r--infra/mq/test/gtest_mq_main.cpp96
1 files changed, 96 insertions, 0 deletions
diff --git a/infra/mq/test/gtest_mq_main.cpp b/infra/mq/test/gtest_mq_main.cpp
index 3b9dc77..23d93f2 100644
--- a/infra/mq/test/gtest_mq_main.cpp
+++ b/infra/mq/test/gtest_mq_main.cpp
@@ -301,6 +301,102 @@ TEST(plugin_manager, basic_pub_sub) {
mq_schema_free(s);
}
+/**********************************************
+ * MQ RUNTIME WITH DISPATCH *
+ **********************************************/
+
+struct session
+{
+ int id;
+ int cnt;
+};
+
+struct session_mq_test_env
+{
+ struct mq_schema *s;
+ struct mq_runtime *rt;
+ int N_session;
+ struct session sess[1024];
+ int intrinsc_tcp_input_topic_id;
+ int basic_on_tcp_called;
+ int sess_dispatch_called;
+ int test_mq_sub_called;
+ int sess_msg_free_called;
+};
+
+#define TOPIC_TCP "TCP"
+typedef void on_session_msg_cb_func(int topic_id, struct session *sess, void *module_ctx);
+
+static void pesudo_on_msg_dispatch(int topic_id,
+ void *msg,
+ on_msg_cb_func* on_msg_cb,
+ void *on_msg_cb_arg,
+ void *dispatch_arg)
+{
+ on_session_msg_cb_func *session_cb = (on_session_msg_cb_func *)on_msg_cb;
+ struct session *sess=(struct session *)msg;
+ EXPECT_TRUE(dispatch_arg==NULL);
+ session_cb(topic_id, sess, on_msg_cb_arg);
+ struct session_mq_test_env *env=(struct session_mq_test_env *)on_msg_cb_arg;
+ env->sess_dispatch_called+=1;
+}
+
+static void pesudo_tcp_session_msg_free(void *msg, void *msg_free_arg)
+{
+ struct session_mq_test_env *env=(struct session_mq_test_env *)msg_free_arg;
+ env->sess_msg_free_called+=1;
+
+}
+static int pesudo_tcp_session_subscribe(struct session_mq_test_env *env, on_session_msg_cb_func *on_session_cb)
+{
+ int topic_id=mq_schema_get_topic_id(env->s, TOPIC_TCP);
+ if(topic_id<0)
+ {
+ topic_id=mq_schema_create_topic(env->s, TOPIC_TCP, pesudo_on_msg_dispatch, NULL, pesudo_tcp_session_msg_free, env);
+ }
+ return mq_schema_subscribe(env->s, topic_id, (on_msg_cb_func *)on_session_cb, env);
+}
+
+static void test_basic_on_tcp_session(int topic_id, struct session *sess, void *plugin_env)
+{
+ struct session_mq_test_env *env = (struct session_mq_test_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ if(sess)
+ {
+ env->basic_on_tcp_called+=1;
+ }
+ return;
+}
+
+TEST(mq_runtime, sub_with_dispatch_cb) {
+
+ struct session_mq_test_env env;
+ memset(&env, 0, sizeof(env));
+ env.N_session=10;
+
+ env.s=mq_schema_new();
+
+ EXPECT_EQ(pesudo_tcp_session_subscribe(&env, test_basic_on_tcp_session), 0);
+ env.intrinsc_tcp_input_topic_id=mq_schema_get_topic_id(env.s, TOPIC_TCP);
+
+ env.rt=mq_runtime_new(env.s);
+
+ for(int i=0; i <env.N_session; i++)
+ {
+ env.sess[i].id=i;
+ mq_runtime_publish_message(env.rt, env.intrinsc_tcp_input_topic_id, &env.sess[i]);
+ env.sess[i].cnt+=1;
+ mq_runtime_dispatch(env.rt);
+ }
+
+ mq_runtime_free(env.rt);
+ mq_schema_free(env.s);
+
+ EXPECT_EQ(env.basic_on_tcp_called, env.N_session);
+ EXPECT_EQ(env.sess_dispatch_called, env.N_session);
+ EXPECT_EQ(env.sess_msg_free_called, env.N_session);
+}
+
#if 0
static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
{