diff options
| author | yangwei <[email protected]> | 2024-05-28 22:35:41 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-29 04:43:10 +0800 |
| commit | af179a089c5fe9c0a4a681c2845eccc97bdf0565 (patch) | |
| tree | 316a59febc098b08e31d790f3f4e93aacc2399c3 /src | |
| parent | 66fc0f662c68baa99522ddbb2601fe0216da818f (diff) | |
🐞 fix(plugin manager on packet egress): update trigger logic in loader
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 20 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 24 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp.h | 2 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 13 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_loader.c | 14 |
5 files changed, 42 insertions, 31 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 5331e7c..7f11fb5 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -502,10 +502,10 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->packet_mq_schema_array==NULL)return -1; + if(plug_mgr == NULL || plug_mgr->packet_mq_schema_array==NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; - struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); if(packet_plugin_schema==NULL)return -1; @@ -522,7 +522,21 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p))) { if(p->topic_id==topic_id) - return 0; + { + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->pkt_msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } + return -1; + } }; struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index b720e74..0c9e055 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -48,7 +48,7 @@ struct plugin_manager_schema int egress_topic_id; int control_packet_topic_id; struct plugin_manger_per_thread_data *per_thread_data; -}; +}__attribute__((aligned(sizeof(void*)))); @@ -72,7 +72,7 @@ struct stellar_exdata_schema void *free_arg; int idx; -}; +}__attribute__((aligned(sizeof(void*)))); struct stellar_message @@ -80,7 +80,7 @@ struct stellar_message int topic_id; void *msg_data; struct stellar_message *next, *prev; -}; +}__attribute__((aligned(sizeof(void*)))); typedef struct stellar_mq_subscriber { @@ -92,7 +92,7 @@ typedef struct stellar_mq_subscriber on_packet_msg_cb_func *pkt_msg_cb; }; struct stellar_mq_subscriber *next, *prev; -}stellar_mq_subscriber; +}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); struct stellar_mq_topic_schema @@ -109,7 +109,7 @@ struct stellar_mq_topic_schema packet_msg_free_cb_func *pkt_msg_free_cb; }; struct stellar_mq_subscriber *subscribers; -}; +}__attribute__((aligned(sizeof(void*)))); enum plugin_ctx_state { INIT, ACTIVE, EXIT }; @@ -119,7 +119,7 @@ struct session_plugin_ctx_runtime enum plugin_ctx_state state; int session_plugin_id; void *plugin_ctx; -}; +}__attribute__((aligned(sizeof(void*)))); @@ -134,7 +134,7 @@ struct plugin_manager_runtime struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free int current_session_plugin_id; int enable_session_mq; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_packet_plugin_schema { @@ -142,19 +142,19 @@ struct registered_packet_plugin_schema plugin_on_packet_func *on_packet; void *plugin_env; UT_array *registed_packet_mq_subscriber_info; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_polling_plugin_schema { plugin_on_polling_func *on_polling; void *plugin_env; -}; +}__attribute__((aligned(sizeof(void*)))); struct stellar_mq_subscriber_info { int topic_id; int subscriber_idx; -}; +}__attribute__((aligned(sizeof(void*)))); struct registered_session_plugin_schema { @@ -162,7 +162,7 @@ struct registered_session_plugin_schema session_ctx_free_func *on_ctx_free; void *plugin_env; UT_array *registed_session_mq_subscriber_info; -}; +}__attribute__((aligned(sizeof(void*)))); #define PACKET_PULGIN_ID_BASE 0x10000 #define POLLING_PULGIN_ID_BASE 0x20000 @@ -179,4 +179,4 @@ struct plugin_specific plugin_on_load_func *load_cb; plugin_on_unload_func *unload_cb; void *plugin_ctx; -};
\ No newline at end of file +}__attribute__((aligned(sizeof(void*))));
\ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h index f0d0f6f..f4f8159 100644 --- a/src/stellar_on_sapp/stellar_on_sapp.h +++ b/src/stellar_on_sapp/stellar_on_sapp.h @@ -13,7 +13,7 @@ void session_free_on_sapp(struct session *sess); unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned char stream_state, struct session *sess, const void *raw_pkt, enum packet_type type); -void session_poll_on_sapp(struct session *sess); +void session_defer_on_sapp(struct session *sess); void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index d40112c..f9e5637 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -21,10 +21,9 @@ struct packet { enum packet_type type; unsigned char ip_proto; - unsigned char pad[3]; const void *raw_pkt; struct stellar *st; -}; +}__attribute__((aligned(sizeof(void*)))) ; struct session { @@ -35,7 +34,7 @@ struct session struct packet cur_pkt; struct stellar *st; struct plugin_manager_runtime *plug_mgr_rt; -}; +}__attribute__((aligned(sizeof(void*)))); inline struct stellar * packet_stellar_get(struct packet *pkt) { @@ -145,7 +144,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c return APP_STATE_DROPME; } -void session_poll_on_sapp(struct session *sess) +void session_defer_on_sapp(struct session *sess) { if(sess==NULL)return; if(sess->state != SESSION_STATE_CONTROL) @@ -155,6 +154,7 @@ void session_poll_on_sapp(struct session *sess) if(pkt->raw_pkt) { plugin_manager_on_session_egress(sess, pkt); + plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt); } } sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt @@ -187,7 +187,12 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void default: return; } + //TODO: exclude non innermost packet plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); + if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) + { + plugin_manager_on_packet_egress(st->plug_mgr, &pkt); + } } inline int polling_on_sapp(struct stellar *st) diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c index 1ff58cc..111c44d 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_loader.c +++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c @@ -5,14 +5,6 @@ #include <MESA/stream.h> -struct simple_stream_ctx -{ - uint32_t c2s_pkts; - uint32_t c2s_bytes; - uint32_t s2c_pkts; - uint32_t s2c_bytes; - struct session *sess; -}; #define STELLAR_PLUGIN_PATH "./stellar_plugin/spec.toml" #define STELLAR_BRIDEGE_NAME "STELLAR_SESSION" @@ -81,7 +73,7 @@ char stellar_on_sapp_defer_entry(struct streaminfo *pstream,void **pme, int thre struct session *sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id); if(sess) { - session_poll_on_sapp(sess); + session_defer_on_sapp(sess); return APP_STATE_GIVEME; } else @@ -187,13 +179,13 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) { - packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); + if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); return APP_STATE_GIVEME; } char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir,int thread_seq, void *a_packet) { - packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); + if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); return APP_STATE_GIVEME; } |
