diff options
| author | yangwei <[email protected]> | 2024-07-09 19:22:03 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-09 19:22:03 +0800 |
| commit | bd599decb93fa9fd2955289cd96c362b703b2ad1 (patch) | |
| tree | 0fa617c6ab28d44b240d796bfeb1addaefef571a | |
| parent | 19969f6fc7383b6b3b62fc8a3b13a4ae8dd6dc8b (diff) | |
🎈 perf(session_state_update_on_sapp): drop tcp when topic inactive
| -rw-r--r-- | include/stellar/session_mq.h | 2 | ||||
| -rw-r--r-- | include/stellar/stellar.h | 3 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 19 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.h | 2 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 1 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_internal.h | 2 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 18 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 2 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_mock.h | 4 |
9 files changed, 37 insertions, 16 deletions
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h index cddde81..a3a631a 100644 --- a/include/stellar/session_mq.h +++ b/include/stellar/session_mq.h @@ -22,3 +22,5 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *msg); int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); + +int session_mq_topic_is_active(struct session *sess, int topic_id);
\ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 5176239..e9322be 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -14,7 +14,8 @@ typedef void plugin_on_unload_func(void *plugin_env); typedef void *session_ctx_new_func(struct session *sess, void *plugin_env); typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env); -#define TOPIC_TCP "TCP" +//intrinsic topic, publish packet as message +#define TOPIC_TCP "TCP" #define TOPIC_TCP_STREAM "TCP_STREAM" #define TOPIC_UDP "UDP" #define TOPIC_EGRESS "EGRESS" diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 07de496..a102e5c 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -226,7 +226,6 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_ * SESSION EXDATA * *******************************/ -//TODO: limit exdata new index in plugin init stage int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); @@ -582,6 +581,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess) } cur_sub_idx++; } + if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->topic_id, 1, 0); } DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list @@ -590,6 +590,16 @@ static void plugin_manager_session_message_dispatch(struct session *sess) return; } +int session_mq_topic_is_active(struct session *sess, int topic_id) +{ + struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); + assert(plug_mgr_rt); + if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range + if(bitmap_get(plug_mgr_rt->session_topic_status, topic_id, 1) == 0)return 0; + return 1; +} + /******************************* * PLUGIN MANAGER SESSION RUNTIME * *******************************/ @@ -633,6 +643,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ rt->pending_mq = NULL; rt->delivered_mq = NULL; rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1); + rt->session_topic_status=bitmap_new(plug_mgr->session_mq_topic_num, 1, 1); rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); if(plug_mgr->registered_session_plugin_array) rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); @@ -658,6 +669,11 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) bitmap_free(rt->session_mq_status); rt->session_mq_status=NULL; } + if(rt->session_topic_status != NULL) + { + bitmap_free(rt->session_topic_status); + rt->session_topic_status=NULL; + } if (rt->plug_mgr->registered_session_plugin_array) { unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array); @@ -810,7 +826,6 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) break; } plug_mgr_rt->pub_session_msg_cnt=0; - //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); return; diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h index 2d07c25..23c4297 100644 --- a/src/plugin_manager/plugin_manager.h +++ b/src/plugin_manager/plugin_manager.h @@ -19,4 +19,4 @@ void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt); void plugin_manager_on_session_closing(struct session *sess); struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); -void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
\ No newline at end of file +void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 6b2beba..15e4bff 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -104,6 +104,7 @@ struct plugin_manager_runtime struct stellar_message *pending_mq;// message list struct stellar_message *delivered_mq;// message list struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber + struct bitmap *session_topic_status; //N bits, N topic struct stellar_exdata *sess_exdata_array; struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h index 46c467d..3358e7c 100644 --- a/src/stellar_on_sapp/stellar_internal.h +++ b/src/stellar_on_sapp/stellar_internal.h @@ -6,8 +6,6 @@ struct plugin_manager_schema; struct plugin_manager_runtime; -struct stellar * packet_stellar_get(struct packet *pkt); - int stellar_plugin_manager_schema_set(struct stellar *st, struct plugin_manager_schema *pm); struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st); struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index 5974ace..72935c6 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -3,11 +3,14 @@ #include "stellar_internal.h" -#include "stellar/stellar.h" #include "stellar/utils.h" +#include "stellar/stellar.h" +#include "stellar/session_mq.h" + #include "plugin_manager/plugin_manager.h" + #include <MESA/stream.h> #include <assert.h> @@ -15,6 +18,7 @@ struct stellar { struct plugin_manager_schema *plug_mgr; + int tcp_stream_topic_id; }; struct packet @@ -37,10 +41,6 @@ struct session struct plugin_manager_runtime *plug_mgr_rt; }__attribute__((aligned(sizeof(void*)))); -inline struct stellar * packet_stellar_get(struct packet *pkt) -{ - return pkt->st; -} inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st) { @@ -70,6 +70,7 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path) return NULL; } st->plug_mgr=pm; + st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); return st; } @@ -140,7 +141,12 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c pkt->raw_pkt=raw_pkt; pkt->type=type; if(raw_pkt)plugin_manager_on_session_ingress(sess, pkt); - return APP_STATE_GIVEME; + + //check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead + if(type == TCP_STREAM && session_mq_topic_is_active(sess, sess->st->tcp_stream_topic_id) == 0) + return APP_STATE_DROPME; + else + return APP_STATE_GIVEME; } else return APP_STATE_DROPME; diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index aa1a359..4913586 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -1058,6 +1058,8 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) { EXPECT_EQ(env.test_mq_sub_called,env.N_session); } +//TODO: test session_mq_topic_is_active + /********************************************** * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT * **********************************************/ diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h index 7d6ffc3..9962872 100644 --- a/test/plugin_manager/plugin_manager_gtest_mock.h +++ b/test/plugin_manager/plugin_manager_gtest_mock.h @@ -61,10 +61,6 @@ int stellar_get_current_thread_id(struct stellar *st) return 0; } -struct stellar * packet_stellar_get(struct packet *pkt) -{ - return pkt->st; -} struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess) { |
