diff options
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 60 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 10 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 290 |
3 files changed, 334 insertions, 26 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 2f20d51..1480c25 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -89,7 +89,7 @@ static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_ { p_data=per_thread_data+i; if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array); - if(p_data->per_thread_pkt_mq_array.mq)FREE(p_data->per_thread_pkt_mq_array.mq); + if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq); } FREE(per_thread_data); return; @@ -105,6 +105,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char return NULL; } struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); + plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH; if(spec_num > 0) { utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); @@ -465,6 +466,27 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf * PACKET MQ * *******************************/ +static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return -1; + int tid = stellar_get_current_thread_id(plug_mgr->st); + plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1; + return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; +} + +static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return; + int tid = stellar_get_current_thread_id(plug_mgr->st); + plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0; +} + +static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL)return 0; + int tid = stellar_get_current_thread_id(plug_mgr->st); + return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; +} int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { @@ -563,9 +585,13 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg) struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); int tid = stellar_get_current_thread_id(st); - return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq)); + if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg + int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq)); + if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr); + return ret; } +// TODO: limit maximum pub message number in one loop static void plugin_manager_packet_message_dispatch(struct packet *pkt) { @@ -578,7 +604,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) int tid = stellar_get_current_thread_id(st); - struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq_array.mq); + struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq); struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; struct stellar_mq_subscriber *sub_elt, *sub_tmp; @@ -597,7 +623,10 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) if (sub_elt->pkt_msg_cb) { packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); - if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + if(packet_plugin_schema) + { + sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + } } } if (topic->pkt_msg_free_cb) @@ -670,8 +699,11 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); - if(plug_mgr_rt->enable_session_mq==0)return -1; - return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); + if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; + int ret=stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); + if(ret==0)plug_mgr_rt->pub_session_msg_cnt+=1; + return ret; } static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) @@ -814,8 +846,10 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } } } - if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again - sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, need check again + { + sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + } } } cur_sub_idx++; @@ -894,6 +928,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) if(rt->session_mq_status != NULL) { bitmap_free(rt->session_mq_status); + rt->session_mq_status=NULL; } if (rt->plug_mgr->registered_session_plugin_array) { @@ -922,6 +957,7 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) *********************************************/ + UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env) @@ -945,6 +981,7 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; unsigned char ip_proto=packet_get_ip_protocol(pkt); + stellar_current_thread_packet_mq_counter_reset(plug_mgr); while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet) @@ -1053,11 +1090,10 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) default: break; } - plug_mgr_rt->enable_session_mq=1; + plug_mgr_rt->pub_session_msg_cnt=0; //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - //plug_mgr_rt->enable_session_mq=0; return; } @@ -1065,10 +1101,8 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - //plug_mgr_rt->enable_session_mq=1; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt); plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); return; @@ -1078,7 +1112,6 @@ void plugin_manager_on_session_closing(struct session *sess) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - plug_mgr_rt->enable_session_mq=1; switch (session_get_type(sess)) { case SESSION_TYPE_TCP: @@ -1092,7 +1125,6 @@ void plugin_manager_on_session_closing(struct session *sess) break; } plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); assert(plug_mgr_rt->pending_mq==NULL && plug_mgr_rt->delivered_mq==NULL); return; diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 24b8d42..0cf55d6 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -16,15 +16,16 @@ struct per_thread_exdata_array }; struct stellar_message; -struct per_thread_mq_array +struct per_thread_mq { struct stellar_message *mq; + int pub_msg_cnt; }; struct plugin_manger_per_thread_data { struct per_thread_exdata_array per_thread_pkt_exdata_array; - struct per_thread_mq_array per_thread_pkt_mq_array; + struct per_thread_mq per_thread_pkt_mq; }; struct plugin_manager_schema @@ -47,6 +48,7 @@ struct plugin_manager_schema int udp_topic_id; int egress_topic_id; int control_packet_topic_id; + int max_message_dispatch; struct plugin_manger_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); @@ -133,7 +135,7 @@ struct plugin_manager_runtime struct stellar_exdata *sess_exdata_array; struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; - int enable_session_mq; + int pub_session_msg_cnt; }__attribute__((aligned(sizeof(void*)))); struct registered_packet_plugin_schema @@ -172,6 +174,8 @@ struct registered_session_plugin_schema * PLUGIN MANAGER INIT & EXIT * *******************************/ +#define MAX_MSG_PER_DISPATCH 128 + #include <dlfcn.h> struct plugin_specific diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index efd131d..75f928c 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -55,7 +55,7 @@ void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct p for(int i=0; i<thread_num; i++) { EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL); - EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_mq_array.mq==NULL); + EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_mq.mq==NULL); } //intrinsic topic @@ -257,7 +257,7 @@ static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol, return; } -TEST(plugin_manager, basic_packet_plugin) { +TEST(plugin_manager, packet_plugin_illegal_exdata) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); @@ -389,7 +389,7 @@ static void test_packet_exdata_free(struct packet *pkt, int idx, void *ex_ptr, v } -TEST(plugin_manager, basic_packet_exdata) { +TEST(plugin_manager, packet_plugins_share_exdata) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); @@ -486,7 +486,7 @@ static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, return; } -TEST(plugin_manager, basic_packet_mq_pub_sub) { +TEST(plugin_manager, packet_plugins_mq_pub_sub) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); @@ -556,6 +556,124 @@ TEST(plugin_manager, basic_packet_mq_pub_sub) { EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } +static void overlimit_packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + int cnt=0; + int *msg; + for(int i=0; i<topic_id_num; i++) + { + for(int j=0; j < MAX_MSG_PER_DISPATCH; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg); + if(cnt < MAX_MSG_PER_DISPATCH) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(plugin_manager, packet_plugins_pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; i<topic_id_num; i++) + { + sprintf(topic_name[i], "PACKET_TOPIC_%d", i); + env.packet_topic_id[i]=stellar_packet_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env); + EXPECT_GE(env.packet_topic_id[i], 0); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr->packet_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->packet_mq_schema_array), topic_id_num); + } + + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env); + EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} /********************************************** * TEST PLUGIN MANAGER ON SESSION PLUGIN INIT * **********************************************/ @@ -1042,7 +1160,161 @@ TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { } +struct test_overlimit_session_mq_ctx +{ + int pkt_cnt; + int pub_cnt; + int sub_cnt; +}; + +static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} +static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg + FREE(ctx); + return; +} + +static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + int *pub_msg; + if (msg) + { + env->test_mq_pub_called += 1; + ctx->pkt_cnt += 1; + for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++) + { + pub_msg = CALLOC(int, 1); + *pub_msg = env->test_mq_pub_called; + if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg + { + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0); + ctx->pub_cnt+=1; + } + else + { + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), -1); + FREE(pub_msg); + } + } + } + return; +} + +static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(*(int *)msg, env->test_mq_pub_called); + env->test_mq_sub_called+=1; + ctx->sub_cnt+=1; + return; +} + +static void test_overlimit_session_msg_free(struct session *sess, void *msg, void *msg_free_arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + if(msg) + { + EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); + env->test_mq_free_called+=1; + FREE(msg); + } + return; +} + +TEST(plugin_manager, session_plugin_pub_msg_overlimt) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_pub_plugin_id, 0); + + env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); + + env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_sub_plugin_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session); + EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + +} static void *test_dettach_session_ctx_new(struct session *sess, void *plugin_env) { @@ -1125,13 +1397,13 @@ TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) { -static void *test_invalid_send_msg_session_ctx_new(struct session *sess, void *plugin_env) +static void *test_invalid_pub_msg_session_ctx_new(struct session *sess, void *plugin_env) { struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); return ctx; } -static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +static void test_invalid_pub_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) { struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; env->basic_ctx_free_called+=1; @@ -1143,7 +1415,7 @@ static void test_invalid_send_msg_session_ctx_free(struct session *sess, void *s FREE(ctx); } -static void test_invalid_send_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +static void test_invalid_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) { struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; ctx->called+=1; @@ -1160,12 +1432,12 @@ TEST(plugin_manager, session_plugin_pub_on_ctx_free) { // plugin manager register plugin - int plugin_id=stellar_session_plugin_register(&st, test_invalid_send_msg_session_ctx_new, test_invalid_send_msg_session_ctx_free, &env); + int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env); EXPECT_GE(plugin_id,0); env.intrinsc_tcp_topic_id=stellar_session_mq_get_topic_id(&st, TOPIC_TCP); EXPECT_GE(env.intrinsc_tcp_topic_id, 0); - EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_send_msg_on_session, plugin_id), 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0); env.test_mq_topic_id=stellar_session_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env); EXPECT_GE(env.test_mq_topic_id, 0); |
