summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/plugin_manager/plugin_manager.c60
-rw-r--r--src/plugin_manager/plugin_manager_interna.h10
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp290
3 files changed, 334 insertions, 26 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 2f20d51..1480c25 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -89,7 +89,7 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_
{
p_data=per_thread_data+i;
if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
- if(p_data->per_thread_pkt_mq_array.mq)FREE(p_data->per_thread_pkt_mq_array.mq);
+ if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq);
}
FREE(per_thread_data);
return;
@@ -105,6 +105,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return NULL;
}
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
+ plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH;
if(spec_num > 0)
{
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
@@ -465,6 +466,27 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf
* PACKET MQ *
*******************************/
+static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return -1;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1;
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
+}
+
+static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0;
+}
+
+static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL)return 0;
+ int tid = stellar_get_current_thread_id(plug_mgr->st);
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
+}
int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
@@ -563,9 +585,13 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg)
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
int tid = stellar_get_current_thread_id(st);
- return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq));
+ if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg
+ int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq));
+ if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr);
+ return ret;
}
+// TODO: limit maximum pub message number in one loop
static void plugin_manager_packet_message_dispatch(struct packet *pkt)
{
@@ -578,7 +604,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
int tid = stellar_get_current_thread_id(st);
- struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq);
+ struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq);
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
@@ -597,7 +623,10 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
if (sub_elt->pkt_msg_cb)
{
packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ if(packet_plugin_schema)
+ {
+ sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ }
}
}
if (topic->pkt_msg_free_cb)
@@ -670,8 +699,11 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
- if(plug_mgr_rt->enable_session_mq==0)return -1;
- return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
+ if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
+ int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
+ if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1;
+ return ret;
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
@@ -814,8 +846,10 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
}
}
- if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again
- sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again
+ {
+ sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ }
}
}
cur_sub_idx++;
@@ -894,6 +928,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
+ rt->session_mq_status=NULL;
}
if (rt->plug_mgr->registered_session_plugin_array)
{
@@ -922,6 +957,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
*********************************************/
+
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
@@ -945,6 +981,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
unsigned char ip_proto=packet_get_ip_protocol(pkt);
+ stellar_current_thread_packet_mq_counter_reset(plug_mgr);
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
@@ -1053,11 +1090,10 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
default:
break;
}
- plug_mgr_rt->enable_session_mq=1;
+ plug_mgr_rt->pub_session_msg_cnt=0;
//TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
- //plug_mgr_rt->enable_session_mq=0;
return;
}
@@ -1065,10 +1101,8 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- //plug_mgr_rt->enable_session_mq=1;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt);
plugin_manager_session_message_dispatch(sess);
- plug_mgr_rt->enable_session_mq=0;
session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
return;
@@ -1078,7 +1112,6 @@ void plugin_manager_on_session_closing(struct session *sess)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- plug_mgr_rt->enable_session_mq=1;
switch (session_get_type(sess))
{
case SESSION_TYPE_TCP:
@@ -1092,7 +1125,6 @@ void plugin_manager_on_session_closing(struct session *sess)
break;
}
plugin_manager_session_message_dispatch(sess);
- plug_mgr_rt->enable_session_mq=0;
session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL);
return;
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 24b8d42..0cf55d6 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -16,15 +16,16 @@ struct per_thread_exdata_array
};
struct stellar_message;
-struct per_thread_mq_array
+struct per_thread_mq
{
struct stellar_message *mq;
+ int pub_msg_cnt;
};
struct plugin_manger_per_thread_data
{
struct per_thread_exdata_array per_thread_pkt_exdata_array;
- struct per_thread_mq_array per_thread_pkt_mq_array;
+ struct per_thread_mq per_thread_pkt_mq;
};
struct plugin_manager_schema
@@ -47,6 +48,7 @@ struct plugin_manager_schema
int udp_topic_id;
int egress_topic_id;
int control_packet_topic_id;
+ int max_message_dispatch;
struct plugin_manger_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
@@ -133,7 +135,7 @@ struct plugin_manager_runtime
struct stellar_exdata *sess_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
- int enable_session_mq;
+ int pub_session_msg_cnt;
}__attribute__((aligned(sizeof(void*))));
struct registered_packet_plugin_schema
@@ -172,6 +174,8 @@ struct registered_session_plugin_schema
* PLUGIN MANAGER INIT & EXIT *
*******************************/
+#define MAX_MSG_PER_DISPATCH 128
+
#include <dlfcn.h>
struct plugin_specific
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index efd131d..75f928c 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -55,7 +55,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p
for(int i=0; i<thread_num; i++)
{
EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL);
- EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_mq_array.mq==NULL);
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_mq.mq==NULL);
}
//intrinsic topic
@@ -257,7 +257,7 @@ static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol,
return;
}
-TEST(plugin_manager, basic_packet_plugin) {
+TEST(plugin_manager, packet_plugin_illegal_exdata) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
@@ -389,7 +389,7 @@ static void test_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, v
}
-TEST(plugin_manager, basic_packet_exdata) {
+TEST(plugin_manager, packet_plugins_share_exdata) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
@@ -486,7 +486,7 @@ static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol,
return;
}
-TEST(plugin_manager, basic_packet_mq_pub_sub) {
+TEST(plugin_manager, packet_plugins_mq_pub_sub) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
@@ -556,6 +556,124 @@ TEST(plugin_manager, basic_packet_mq_pub_sub) {
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
}
+static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ FREE(msg);
+ return;
+}
+
+static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ int cnt=0;
+ int *msg;
+ for(int i=0; i<topic_id_num; i++)
+ {
+ for(int j=0; j < MAX_MSG_PER_DISPATCH; j++)
+ {
+ msg=CALLOC(int, 1);
+ *msg=cnt;
+ int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg);
+ if(cnt < MAX_MSG_PER_DISPATCH)
+ {
+ ASSERT_EQ(pub_ret, 0);
+ env->msg_pub_cnt+=1;
+ }
+ else
+ {
+ ASSERT_EQ(pub_ret, -1);
+ }
+ if(pub_ret!=0)FREE(msg);
+ cnt+=1;
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_pub_overlimit) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->packet_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->packet_mq_schema_array), topic_id_num);
+ }
+
+ int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env);
+ EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
/**********************************************
* TEST PLUGIN MANAGER ON SESSION PLUGIN INIT *
**********************************************/
@@ -1042,7 +1160,161 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
}
+struct test_overlimit_session_mq_ctx
+{
+ int pkt_cnt;
+ int pub_cnt;
+ int sub_cnt;
+};
+
+static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt);
+ FREE(ctx);
+ return;
+}
+
+static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+
+static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg
+ FREE(ctx);
+ return;
+}
+
+static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ int *pub_msg;
+ if (msg)
+ {
+ env->test_mq_pub_called += 1;
+ ctx->pkt_cnt += 1;
+ for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++)
+ {
+ pub_msg = CALLOC(int, 1);
+ *pub_msg = env->test_mq_pub_called;
+ if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg
+ {
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0);
+ ctx->pub_cnt+=1;
+ }
+ else
+ {
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), -1);
+ FREE(pub_msg);
+ }
+ }
+ }
+ return;
+}
+
+static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(*(int *)msg, env->test_mq_pub_called);
+ env->test_mq_sub_called+=1;
+ ctx->sub_cnt+=1;
+ return;
+}
+
+static void test_overlimit_session_msg_free(struct session *sess, void *msg, void *msg_free_arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ if(msg)
+ {
+ EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);
+ env->test_mq_free_called+=1;
+ FREE(msg);
+ }
+ return;
+}
+
+TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_pub_plugin_id, 0);
+
+ env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+ env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_sub_plugin_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+
+ for(int i=0; i < env.N_session; i++)
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session);
+ EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+ EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+
+}
static void *test_dettach_session_ctx_new(struct session *sess, void *plugin_env)
{
@@ -1125,13 +1397,13 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) {
-static void *test_invalid_send_msg_session_ctx_new(struct session *sess, void *plugin_env)
+static void *test_invalid_pub_msg_session_ctx_new(struct session *sess, void *plugin_env)
{
struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
return ctx;
}
-static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+static void test_invalid_pub_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
{
struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
env->basic_ctx_free_called+=1;
@@ -1143,7 +1415,7 @@ static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *s
FREE(ctx);
}
-static void test_invalid_send_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+static void test_invalid_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
{
struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
ctx->called+=1;
@@ -1160,12 +1432,12 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) {
// plugin manager register plugin
- int plugin_id=stellar_session_plugin_register(&st, test_invalid_send_msg_session_ctx_new, test_invalid_send_msg_session_ctx_free, &env);
+ int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env);
EXPECT_GE(plugin_id,0);
env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP);
EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
- EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_send_msg_on_session, plugin_id), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0);
env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env);
EXPECT_GE(env.test_mq_topic_id, 0);