summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-28 22:35:41 +0800
committeryangwei <[email protected]>2024-05-29 04:43:10 +0800
commitaf179a089c5fe9c0a4a681c2845eccc97bdf0565 (patch)
tree316a59febc098b08e31d790f3f4e93aacc2399c3
parent66fc0f662c68baa99522ddbb2601fe0216da818f (diff)
🐞 fix(plugin manager on packet egress): update trigger logic in loader
-rw-r--r--src/plugin_manager/plugin_manager.c20
-rw-r--r--src/plugin_manager/plugin_manager_interna.h24
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp.h2
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c13
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_loader.c14
-rw-r--r--test/plugin_manager/plugin_manager_gtest_main.cpp76
6 files changed, 105 insertions, 44 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 5331e7c..7f11fb5 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -502,10 +502,10 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_
if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr->packet_mq_schema_array==NULL)return -1;
+ if(plug_mgr == NULL || plug_mgr->packet_mq_schema_array==NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
+
unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
-
struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
if(packet_plugin_schema==NULL)return -1;
@@ -522,7 +522,21 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_
while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p)))
{
if(p->topic_id==topic_id)
- return 0;
+ {
+ struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
+ int cnt=0;
+ while(tmp_subscriber)
+ {
+ if(cnt==p->subscriber_idx)
+ {
+ tmp_subscriber->pkt_msg_cb=plugin_on_msg_cb;
+ return 0;
+ }
+ cnt++;
+ tmp_subscriber=tmp_subscriber->next;
+ }
+ return -1;
+ }
};
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index b720e74..0c9e055 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -48,7 +48,7 @@ struct plugin_manager_schema
int egress_topic_id;
int control_packet_topic_id;
struct plugin_manger_per_thread_data *per_thread_data;
-};
+}__attribute__((aligned(sizeof(void*))));
@@ -72,7 +72,7 @@ struct stellar_exdata_schema
void *free_arg;
int idx;
-};
+}__attribute__((aligned(sizeof(void*))));
struct stellar_message
@@ -80,7 +80,7 @@ struct stellar_message
int topic_id;
void *msg_data;
struct stellar_message *next, *prev;
-};
+}__attribute__((aligned(sizeof(void*))));
typedef struct stellar_mq_subscriber
{
@@ -92,7 +92,7 @@ typedef struct stellar_mq_subscriber
on_packet_msg_cb_func *pkt_msg_cb;
};
struct stellar_mq_subscriber *next, *prev;
-}stellar_mq_subscriber;
+}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
struct stellar_mq_topic_schema
@@ -109,7 +109,7 @@ struct stellar_mq_topic_schema
packet_msg_free_cb_func *pkt_msg_free_cb;
};
struct stellar_mq_subscriber *subscribers;
-};
+}__attribute__((aligned(sizeof(void*))));
enum plugin_ctx_state
{ INIT, ACTIVE, EXIT };
@@ -119,7 +119,7 @@ struct session_plugin_ctx_runtime
enum plugin_ctx_state state;
int session_plugin_id;
void *plugin_ctx;
-};
+}__attribute__((aligned(sizeof(void*))));
@@ -134,7 +134,7 @@ struct plugin_manager_runtime
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
int enable_session_mq;
-};
+}__attribute__((aligned(sizeof(void*))));
struct registered_packet_plugin_schema
{
@@ -142,19 +142,19 @@ struct registered_packet_plugin_schema
plugin_on_packet_func *on_packet;
void *plugin_env;
UT_array *registed_packet_mq_subscriber_info;
-};
+}__attribute__((aligned(sizeof(void*))));
struct registered_polling_plugin_schema
{
plugin_on_polling_func *on_polling;
void *plugin_env;
-};
+}__attribute__((aligned(sizeof(void*))));
struct stellar_mq_subscriber_info
{
int topic_id;
int subscriber_idx;
-};
+}__attribute__((aligned(sizeof(void*))));
struct registered_session_plugin_schema
{
@@ -162,7 +162,7 @@ struct registered_session_plugin_schema
session_ctx_free_func *on_ctx_free;
void *plugin_env;
UT_array *registed_session_mq_subscriber_info;
-};
+}__attribute__((aligned(sizeof(void*))));
#define PACKET_PULGIN_ID_BASE 0x10000
#define POLLING_PULGIN_ID_BASE 0x20000
@@ -179,4 +179,4 @@ struct plugin_specific
plugin_on_load_func *load_cb;
plugin_on_unload_func *unload_cb;
void *plugin_ctx;
-}; \ No newline at end of file
+}__attribute__((aligned(sizeof(void*)))); \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h
index f0d0f6f..f4f8159 100644
--- a/src/stellar_on_sapp/stellar_on_sapp.h
+++ b/src/stellar_on_sapp/stellar_on_sapp.h
@@ -13,7 +13,7 @@ void session_free_on_sapp(struct session *sess);
unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned char stream_state, struct session *sess, const void *raw_pkt, enum packet_type type);
-void session_poll_on_sapp(struct session *sess);
+void session_defer_on_sapp(struct session *sess);
void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type);
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index d40112c..f9e5637 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -21,10 +21,9 @@ struct packet
{
enum packet_type type;
unsigned char ip_proto;
- unsigned char pad[3];
const void *raw_pkt;
struct stellar *st;
-};
+}__attribute__((aligned(sizeof(void*)))) ;
struct session
{
@@ -35,7 +34,7 @@ struct session
struct packet cur_pkt;
struct stellar *st;
struct plugin_manager_runtime *plug_mgr_rt;
-};
+}__attribute__((aligned(sizeof(void*))));
inline struct stellar * packet_stellar_get(struct packet *pkt)
{
@@ -145,7 +144,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
return APP_STATE_DROPME;
}
-void session_poll_on_sapp(struct session *sess)
+void session_defer_on_sapp(struct session *sess)
{
if(sess==NULL)return;
if(sess->state != SESSION_STATE_CONTROL)
@@ -155,6 +154,7 @@ void session_poll_on_sapp(struct session *sess)
if(pkt->raw_pkt)
{
plugin_manager_on_session_egress(sess, pkt);
+ plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt);
}
}
sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt
@@ -187,7 +187,12 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void
default:
return;
}
+ //TODO: exclude non innermost packet
plugin_manager_on_packet_ingress(st->plug_mgr, &pkt);
+ if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP)
+ {
+ plugin_manager_on_packet_egress(st->plug_mgr, &pkt);
+ }
}
inline int polling_on_sapp(struct stellar *st)
diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c
index 1ff58cc..111c44d 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_loader.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c
@@ -5,14 +5,6 @@
#include <MESA/stream.h>
-struct simple_stream_ctx
-{
- uint32_t c2s_pkts;
- uint32_t c2s_bytes;
- uint32_t s2c_pkts;
- uint32_t s2c_bytes;
- struct session *sess;
-};
#define STELLAR_PLUGIN_PATH "./stellar_plugin/spec.toml"
#define STELLAR_BRIDEGE_NAME "STELLAR_SESSION"
@@ -81,7 +73,7 @@ char stellar_on_sapp_defer_entry(struct streaminfo *pstream,void **pme, int thre
struct session *sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id);
if(sess)
{
- session_poll_on_sapp(sess);
+ session_defer_on_sapp(sess);
return APP_STATE_GIVEME;
}
else
@@ -187,13 +179,13 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i
char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet)
{
- packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4);
+ if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4);
return APP_STATE_GIVEME;
}
char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet)
{
- packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6);
+ if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6);
return APP_STATE_GIVEME;
}
diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp
index 0fc40d2..8494e7b 100644
--- a/test/plugin_manager/plugin_manager_gtest_main.cpp
+++ b/test/plugin_manager/plugin_manager_gtest_main.cpp
@@ -3,8 +3,10 @@
#include "plugin_manager_gtest_mock.h"
#include "stellar/utils.h"
-void test_init_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr)
{
+ SCOPED_TRACE("whitebox test intrisic metadata");
+
EXPECT_TRUE(plug_mgr!=NULL);
EXPECT_EQ(plug_mgr->st, st);
@@ -56,11 +58,16 @@ void test_init_plugin_manager_intrisic_metadata(struct stellar *st, struct plugi
}
}
+
+/***********************************
+ * TEST PLUGIN MANAGER INIT & EXIT *
+ ***********************************/
+
TEST(plugin_manager_init, init_without_toml) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
plugin_manager_exit(plug_mgr);
}
@@ -74,10 +81,11 @@ static void test_mock_overwrite_exdata_free(struct packet *pkt, int idx, void *e
return;
}
+
TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
const char *exdata_name="PACKET_EXDATA";
int exdata_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_exdata_free, &st);
@@ -92,7 +100,8 @@ TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
EXPECT_EQ(exdata_schema->idx, exdata_idx);
EXPECT_STREQ(exdata_schema->name, exdata_name);
- int exdata_num=utarray_len(plug_mgr->packet_exdata_schema_array);
+
+ int exdata_num=utarray_len(plug_mgr->packet_exdata_schema_array);
EXPECT_EQ(exdata_num, 1);
plugin_manager_exit(plug_mgr);
@@ -107,14 +116,10 @@ void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *ms
return;
}
-
-
TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
-
-
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
const char *topic_name="PACKET_TOPIC";
@@ -171,6 +176,43 @@ void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const v
return;
}
+TEST(plugin_manager_init, packet_mq_subscribe) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+
+ const char *topic_name="PACKET_TOPIC";
+
+ int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+
+ struct stellar_mq_topic_schema *topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array,(unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+
+/***********************************
+ * TEST PLUGIN MANAGER ON PACKET *
+ ***********************************/
+
#define PACKET_PROTO_PLUGIN_NUM 128
#define PACKET_EXDATA_NUM 2
#define PACKET_TOPIC_NUM 2
@@ -208,7 +250,7 @@ TEST(plugin_manager, basic_packet_plugin) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
unsigned char ip_proto=6;
struct basic_plugin_env env;
@@ -245,7 +287,7 @@ TEST(plugin_manager, packet_plugin_proto_filter) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
struct basic_plugin_env env;
memset(&env, 0, sizeof(struct basic_plugin_env));
@@ -337,7 +379,7 @@ TEST(plugin_manager, packet_exdata) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
unsigned char ip_proto=6;
struct basic_plugin_env env;
@@ -426,7 +468,7 @@ TEST(plugin_manager, packet_mq) {
struct stellar st={0};
struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
- test_init_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
unsigned char ip_proto=6;
struct basic_plugin_env env;
@@ -484,6 +526,14 @@ TEST(plugin_manager, packet_mq) {
EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
}
+/***********************************
+ * TEST PLUGIN MANAGER ON SESSION *
+ ***********************************/
+
+
+/***********************************
+ * TEST PLUGIN MANAGER ON POLLING *
+ ***********************************/
int main(int argc, char ** argv)