summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-09 19:22:03 +0800
committeryangwei <[email protected]>2024-07-09 19:22:03 +0800
commitbd599decb93fa9fd2955289cd96c362b703b2ad1 (patch)
tree0fa617c6ab28d44b240d796bfeb1addaefef571a
parent19969f6fc7383b6b3b62fc8a3b13a4ae8dd6dc8b (diff)
🎈 perf(session_state_update_on_sapp): drop tcp when topic inactive
-rw-r--r--include/stellar/session_mq.h2
-rw-r--r--include/stellar/stellar.h3
-rw-r--r--src/plugin_manager/plugin_manager.c19
-rw-r--r--src/plugin_manager/plugin_manager.h2
-rw-r--r--src/plugin_manager/plugin_manager_interna.h1
-rw-r--r--src/stellar_on_sapp/stellar_internal.h2
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c18
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp2
-rw-r--r--test/plugin_manager/plugin_manager_gtest_mock.h4
9 files changed, 37 insertions, 16 deletions
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
index cddde81..a3a631a 100644
--- a/include/stellar/session_mq.h
+++ b/include/stellar/session_mq.h
@@ -22,3 +22,5 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
+
+int session_mq_topic_is_active(struct session *sess, int topic_id); \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index 5176239..e9322be 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -14,7 +14,8 @@ typedef void plugin_on_unload_func(void *plugin_env);
typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
-#define TOPIC_TCP "TCP"
+//intrinsic topic, publish packet as message
+#define TOPIC_TCP "TCP"
#define TOPIC_TCP_STREAM "TCP_STREAM"
#define TOPIC_UDP "UDP"
#define TOPIC_EGRESS "EGRESS"
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 07de496..a102e5c 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -226,7 +226,6 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_
* SESSION EXDATA *
*******************************/
-//TODO: limit exdata new index in plugin init stage
int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
@@ -582,6 +581,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
}
cur_sub_idx++;
}
+ if(cur_sub_idx==0)bitmap_set(plug_mgr_rt->session_topic_status, mq_elt->topic_id, 1, 0);
}
DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
@@ -590,6 +590,16 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
return;
}
+int session_mq_topic_is_active(struct session *sess, int topic_id)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
+ assert(plug_mgr_rt);
+ if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
+ if(bitmap_get(plug_mgr_rt->session_topic_status, topic_id, 1) == 0)return 0;
+ return 1;
+}
+
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
@@ -633,6 +643,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1);
+ rt->session_topic_status=bitmap_new(plug_mgr->session_mq_topic_num, 1, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
if(plug_mgr->registered_session_plugin_array)
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
@@ -658,6 +669,11 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
bitmap_free(rt->session_mq_status);
rt->session_mq_status=NULL;
}
+ if(rt->session_topic_status != NULL)
+ {
+ bitmap_free(rt->session_topic_status);
+ rt->session_topic_status=NULL;
+ }
if (rt->plug_mgr->registered_session_plugin_array)
{
unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
@@ -810,7 +826,6 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
break;
}
plug_mgr_rt->pub_session_msg_cnt=0;
- //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
return;
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index 2d07c25..23c4297 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -19,4 +19,4 @@ void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt);
void plugin_manager_on_session_closing(struct session *sess);
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
-void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); \ No newline at end of file
+void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 6b2beba..15e4bff 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -104,6 +104,7 @@ struct plugin_manager_runtime
struct stellar_message *pending_mq;// message list
struct stellar_message *delivered_mq;// message list
struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
+ struct bitmap *session_topic_status; //N bits, N topic
struct stellar_exdata *sess_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
diff --git a/src/stellar_on_sapp/stellar_internal.h b/src/stellar_on_sapp/stellar_internal.h
index 46c467d..3358e7c 100644
--- a/src/stellar_on_sapp/stellar_internal.h
+++ b/src/stellar_on_sapp/stellar_internal.h
@@ -6,8 +6,6 @@ struct plugin_manager_schema;
struct plugin_manager_runtime;
-struct stellar * packet_stellar_get(struct packet *pkt);
-
int stellar_plugin_manager_schema_set(struct stellar *st, struct plugin_manager_schema *pm);
struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st);
struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess);
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index 5974ace..72935c6 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -3,11 +3,14 @@
#include "stellar_internal.h"
-#include "stellar/stellar.h"
#include "stellar/utils.h"
+#include "stellar/stellar.h"
+#include "stellar/session_mq.h"
+
#include "plugin_manager/plugin_manager.h"
+
#include <MESA/stream.h>
#include <assert.h>
@@ -15,6 +18,7 @@
struct stellar
{
struct plugin_manager_schema *plug_mgr;
+ int tcp_stream_topic_id;
};
struct packet
@@ -37,10 +41,6 @@ struct session
struct plugin_manager_runtime *plug_mgr_rt;
}__attribute__((aligned(sizeof(void*))));
-inline struct stellar * packet_stellar_get(struct packet *pkt)
-{
- return pkt->st;
-}
inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st)
{
@@ -70,6 +70,7 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path)
return NULL;
}
st->plug_mgr=pm;
+ st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
return st;
}
@@ -140,7 +141,12 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
pkt->raw_pkt=raw_pkt;
pkt->type=type;
if(raw_pkt)plugin_manager_on_session_ingress(sess, pkt);
- return APP_STATE_GIVEME;
+
+ //check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
+ if(type == TCP_STREAM && session_mq_topic_is_active(sess, sess->st->tcp_stream_topic_id) == 0)
+ return APP_STATE_DROPME;
+ else
+ return APP_STATE_GIVEME;
}
else
return APP_STATE_DROPME;
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index aa1a359..4913586 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -1058,6 +1058,8 @@ TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
EXPECT_EQ(env.test_mq_sub_called,env.N_session);
}
+//TODO: test session_mq_topic_is_active
+
/**********************************************
* TEST PLUGIN MANAGER ON POLLING PLUGIN INIT *
**********************************************/
diff --git a/test/plugin_manager/plugin_manager_gtest_mock.h b/test/plugin_manager/plugin_manager_gtest_mock.h
index 7d6ffc3..9962872 100644
--- a/test/plugin_manager/plugin_manager_gtest_mock.h
+++ b/test/plugin_manager/plugin_manager_gtest_mock.h
@@ -61,10 +61,6 @@ int stellar_get_current_thread_id(struct stellar *st)
return 0;
}
-struct stellar * packet_stellar_get(struct packet *pkt)
-{
- return pkt->st;
-}
struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct session *sess)
{