diff options
| author | yangwei <[email protected]> | 2024-05-28 22:35:41 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-29 04:43:10 +0800 |
| commit | af179a089c5fe9c0a4a681c2845eccc97bdf0565 (patch) | |
| tree | 316a59febc098b08e31d790f3f4e93aacc2399c3 | |
| parent | 66fc0f662c68baa99522ddbb2601fe0216da818f (diff) | |
🐞 fix(plugin manager on packet egress): update trigger logic in loader
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 20 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 24 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp.h | 2 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 13 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_loader.c | 14 | ||||
| -rw-r--r-- | test/plugin_manager/plugin_manager_gtest_main.cpp | 76 |
6 files changed, 105 insertions, 44 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 5331e7c..7f11fb5 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -502,10 +502,10 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->packet_mq_schema_array==NULL)return -1; + if(plug_mgr == NULL || plug_mgr->packet_mq_schema_array==NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; - struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); if(packet_plugin_schema==NULL)return -1; @@ -522,7 +522,21 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p))) { if(p->topic_id==topic_id) - return 0; + { + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->pkt_msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } + return -1; + } }; struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index b720e74..0c9e055 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -48,7 +48,7 @@ struct plugin_manager_schema int egress_topic_id; int control_packet_topic_id; struct plugin_manger_per_thread_data *per_thread_data; -}; +}__attribute__((aligned(sizeof(void*)))); @@ -72,7 +72,7 @@ struct stellar_exdata_schema void *free_arg; int idx; -}; +}__attribute__((aligned(sizeof(void*)))); struct stellar_message @@ -80,7 +80,7 @@ struct stellar_message int topic_id; void *msg_data; struct stellar_message *next, *prev; -}; +}__attribute__((aligned(sizeof(void*)))); typedef struct stellar_mq_subscriber { @@ -92,7 +92,7 @@ typedef struct stellar_mq_subscriber on_packet_msg_cb_func *pkt_msg_cb; }; struct stellar_mq_subscriber *next, *prev; -}stellar_mq_subscriber; +}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); struct stellar_mq_topic_schema @@ -109,7 +109,7 @@ struct stellar_mq_topic_schema packet_msg_free_cb_func *pkt_msg_free_cb; }; struct stellar_mq_subscriber *subscribers; -}; +}__attribute__((aligned(sizeof(void*)))); enum plugin_ctx_state { INIT, ACTIVE, EXIT }; @@ -119,7 +119,7 @@ struct session_plugin_ctx_runtime enum plugin_ctx_state state; int session_plugin_id; void *plugin_ctx; -}; +}__attribute__((aligned(sizeof(void*)))); @@ -134,7 +134,7 @@ struct plugin_manager_runtime struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; int enable_session_mq; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_packet_plugin_schema { @@ -142,19 +142,19 @@ struct registered_packet_plugin_schema plugin_on_packet_func *on_packet; void *plugin_env; UT_array *registed_packet_mq_subscriber_info; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_polling_plugin_schema { plugin_on_polling_func *on_polling; void *plugin_env; -}; +}__attribute__((aligned(sizeof(void*)))); struct stellar_mq_subscriber_info { int topic_id; int subscriber_idx; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_session_plugin_schema { @@ -162,7 +162,7 @@ struct registered_session_plugin_schema session_ctx_free_func *on_ctx_free; void *plugin_env; UT_array *registed_session_mq_subscriber_info; -}; +}__attribute__((aligned(sizeof(void*)))); #define PACKET_PULGIN_ID_BASE 0x10000 #define POLLING_PULGIN_ID_BASE 0x20000 @@ -179,4 +179,4 @@ struct plugin_specific plugin_on_load_func *load_cb; plugin_on_unload_func *unload_cb; void *plugin_ctx; -};
\ No newline at end of file +}__attribute__((aligned(sizeof(void*))));
\ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h index f0d0f6f..f4f8159 100644 --- a/src/stellar_on_sapp/stellar_on_sapp.h +++ b/src/stellar_on_sapp/stellar_on_sapp.h @@ -13,7 +13,7 @@ void session_free_on_sapp(struct session *sess); unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned char stream_state, struct session *sess, const void *raw_pkt, enum packet_type type); -void session_poll_on_sapp(struct session *sess); +void session_defer_on_sapp(struct session *sess); void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index d40112c..f9e5637 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -21,10 +21,9 @@ struct packet { enum packet_type type; unsigned char ip_proto; - unsigned char pad[3]; const void *raw_pkt; struct stellar *st; -}; +}__attribute__((aligned(sizeof(void*)))) ; struct session { @@ -35,7 +34,7 @@ struct session struct packet cur_pkt; struct stellar *st; struct plugin_manager_runtime *plug_mgr_rt; -}; +}__attribute__((aligned(sizeof(void*)))); inline struct stellar * packet_stellar_get(struct packet *pkt) { @@ -145,7 +144,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c return APP_STATE_DROPME; } -void session_poll_on_sapp(struct session *sess) +void session_defer_on_sapp(struct session *sess) { if(sess==NULL)return; if(sess->state != SESSION_STATE_CONTROL) @@ -155,6 +154,7 @@ void session_poll_on_sapp(struct session *sess) if(pkt->raw_pkt) { plugin_manager_on_session_egress(sess, pkt); + plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt); } } sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt @@ -187,7 +187,12 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void default: return; } + //TODO: exclude non innermost packet plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); + if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) + { + plugin_manager_on_packet_egress(st->plug_mgr, &pkt); + } } inline int polling_on_sapp(struct stellar *st) diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c index 1ff58cc..111c44d 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_loader.c +++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c @@ -5,14 +5,6 @@ #include <MESA/stream.h> -struct simple_stream_ctx -{ - uint32_t c2s_pkts; - uint32_t c2s_bytes; - uint32_t s2c_pkts; - uint32_t s2c_bytes; - struct session *sess; -}; #define STELLAR_PLUGIN_PATH "./stellar_plugin/spec.toml" #define STELLAR_BRIDEGE_NAME "STELLAR_SESSION" @@ -81,7 +73,7 @@ char stellar_on_sapp_defer_entry(struct streaminfo *pstream,void **pme, int thre struct session *sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id); if(sess) { - session_poll_on_sapp(sess); + session_defer_on_sapp(sess); return APP_STATE_GIVEME; } else @@ -187,13 +179,13 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) { - packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); + if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); return APP_STATE_GIVEME; } char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) { - packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); + if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); return APP_STATE_GIVEME; } diff --git a/test/plugin_manager/plugin_manager_gtest_main.cpp b/test/plugin_manager/plugin_manager_gtest_main.cpp index 0fc40d2..8494e7b 100644 --- a/test/plugin_manager/plugin_manager_gtest_main.cpp +++ b/test/plugin_manager/plugin_manager_gtest_main.cpp @@ -3,8 +3,10 @@ #include "plugin_manager_gtest_mock.h" #include "stellar/utils.h" -void test_init_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) +void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) { + SCOPED_TRACE("whitebox test intrisic metadata"); + EXPECT_TRUE(plug_mgr!=NULL); EXPECT_EQ(plug_mgr->st, st); @@ -56,11 +58,16 @@ void test_init_plugin_manager_intrisic_metadata(struct stellar *st, struct plugi } } + +/*********************************** + * TEST PLUGIN MANAGER INIT & EXIT * + ***********************************/ + TEST(plugin_manager_init, init_without_toml) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); plugin_manager_exit(plug_mgr); } @@ -74,10 +81,11 @@ static void test_mock_overwrite_exdata_free(struct packet *pkt, int idx, void *e return; } + TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); const char *exdata_name="PACKET_EXDATA"; int exdata_idx=stellar_packet_exdata_new_index(&st,exdata_name, test_mock_exdata_free, &st); @@ -92,7 +100,8 @@ TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { EXPECT_EQ(exdata_schema->idx, exdata_idx); EXPECT_STREQ(exdata_schema->name, exdata_name); - int exdata_num=utarray_len(plug_mgr->packet_exdata_schema_array); + + int exdata_num=utarray_len(plug_mgr->packet_exdata_schema_array); EXPECT_EQ(exdata_num, 1); plugin_manager_exit(plug_mgr); @@ -107,14 +116,10 @@ void test_mock_overwrite_packet_msg_free(struct packet *pkt, void *msg, void *ms return; } - - TEST(plugin_manager_init, packet_mq_topic_create_and_update) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); - - + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); const char *topic_name="PACKET_TOPIC"; @@ -171,6 +176,43 @@ void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const v return; } +TEST(plugin_manager_init, packet_mq_subscribe) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=stellar_packet_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id + EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + + struct stellar_mq_topic_schema *topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array,(unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + plugin_manager_exit(plug_mgr); +} + +/*********************************** + * TEST PLUGIN MANAGER ON PACKET * + ***********************************/ + #define PACKET_PROTO_PLUGIN_NUM 128 #define PACKET_EXDATA_NUM 2 #define PACKET_TOPIC_NUM 2 @@ -208,7 +250,7 @@ TEST(plugin_manager, basic_packet_plugin) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); unsigned char ip_proto=6; struct basic_plugin_env env; @@ -245,7 +287,7 @@ TEST(plugin_manager, packet_plugin_proto_filter) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); struct basic_plugin_env env; memset(&env, 0, sizeof(struct basic_plugin_env)); @@ -337,7 +379,7 @@ TEST(plugin_manager, packet_exdata) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); unsigned char ip_proto=6; struct basic_plugin_env env; @@ -426,7 +468,7 @@ TEST(plugin_manager, packet_mq) { struct stellar st={0}; struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); - test_init_plugin_manager_intrisic_metadata(&st, plug_mgr); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); unsigned char ip_proto=6; struct basic_plugin_env env; @@ -484,6 +526,14 @@ TEST(plugin_manager, packet_mq) { EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); } +/*********************************** + * TEST PLUGIN MANAGER ON SESSION * + ***********************************/ + + +/*********************************** + * TEST PLUGIN MANAGER ON POLLING * + ***********************************/ int main(int argc, char ** argv) |
