diff options
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp.h | 3 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 63 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_loader.c | 26 |
3 files changed, 58 insertions, 34 deletions
diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h index f4f8159..d22ecd3 100644 --- a/src/stellar_on_sapp/stellar_on_sapp.h +++ b/src/stellar_on_sapp/stellar_on_sapp.h @@ -15,7 +15,8 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c void session_defer_on_sapp(struct session *sess); -void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type); +struct packet *stellar_get0_current_thread_packet(struct stellar *st); +void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, struct packet *pkt, void *iphdr, enum packet_type type); //return polling work state, 0: idle, 1: working int polling_on_sapp(struct stellar *st);
\ No newline at end of file diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index 20f87a6..f551fc1 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -19,6 +19,7 @@ struct stellar { struct plugin_manager_schema *plug_mgr; int tcp_stream_topic_id; + struct packet *per_thread_pkt; }; struct packet @@ -37,7 +38,6 @@ struct session enum session_state state; int session_direction; struct streaminfo *pstream; - struct packet cur_pkt; struct stellar *st; struct plugin_manager_runtime *plug_mgr_rt; }__attribute__((aligned(sizeof(void*)))); @@ -77,6 +77,12 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path) } st->plug_mgr=pm; st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM); + int tid=stellar_get_worker_thread_num(st); + st->per_thread_pkt=CALLOC(struct packet, tid); + for(int i=0; i<tid; i++) + { + st->per_thread_pkt[i].st=st; + } return st; } @@ -84,6 +90,7 @@ void stellar_exit_on_sapp(struct stellar *st) { if(!st)return; if(st->plug_mgr)plugin_manager_exit(st->plug_mgr); + FREE(st->per_thread_pkt); FREE(st); return; }; @@ -115,9 +122,6 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea sess->state = SESSION_STATE_OPENING; sess->pstream=stream; sess->session_direction=-1; - memset(&sess->cur_pkt, 0, sizeof(struct packet)); - sess->cur_pkt.st=st; - sess->cur_pkt.sess=sess; sess->plug_mgr_rt=plugin_manager_session_runtime_new(st->plug_mgr, sess); return sess; } @@ -143,7 +147,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c if(sess) { sess->state=(stream_state == OP_STATE_PENDING) ? SESSION_STATE_OPENING : SESSION_STATE_ACTIVE; - struct packet *pkt = &sess->cur_pkt; + struct packet *pkt = stellar_get0_current_thread_packet(sess->st); pkt->raw_pkt=raw_pkt; pkt->type=type; pkt->sess=sess; @@ -164,9 +168,9 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c void session_defer_on_sapp(struct session *sess) { if(sess==NULL)return; + struct packet *pkt = stellar_get0_current_thread_packet(sess->st); if(sess->state != SESSION_STATE_CONTROL) { - struct packet *pkt = &sess->cur_pkt; if(unlikely(pkt->raw_pkt==NULL))pkt->raw_pkt=get_current_rawpkt_from_streaminfo(sess->pstream); if(pkt->raw_pkt) { @@ -174,43 +178,48 @@ void session_defer_on_sapp(struct session *sess) plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt); } } - sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt + pkt->raw_pkt=NULL;//clear raw_pkt } /********************************************* * PACKET STATE UPDATE ON SAPP * *********************************************/ #include <netinet/in.h> -void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *pkt_hdr, enum packet_type type) -{ - struct packet pkt; - pkt.type=type; - pkt.raw_pkt=get_current_rawpkt_from_streaminfo(pstream); - pkt.st=st; - pkt.sess=NULL; - pkt.pstream=pstream; + +struct packet *stellar_get0_current_thread_packet(struct stellar *st) +{ + int tid=get_current_worker_thread_id(); + return &st->per_thread_pkt[tid]; +} + +void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, struct packet *pkt, void *pkt_hdr, enum packet_type type) +{ + pkt->type=type; + pkt->raw_pkt=get_current_rawpkt_from_streaminfo(pstream); + pkt->sess=NULL; + pkt->pstream=pstream; switch (type) { case IPv4: { struct iphdr *ip4_hdr=(struct iphdr *)pkt_hdr; - pkt.ip_proto=ip4_hdr->protocol; + pkt->ip_proto=ip4_hdr->protocol; break; } case IPv6: { struct ip6_hdr *ip6_hdr=(struct ip6_hdr *)pkt_hdr; - pkt.ip_proto=ip6_hdr->ip6_nxt; + pkt->ip_proto=ip6_hdr->ip6_nxt; break; } default: return; } //FIXME: defer TCP/UDP packet on session update - plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); - if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) + plugin_manager_on_packet_ingress(st->plug_mgr, pkt); + if(pkt->ip_proto!=IPPROTO_TCP && pkt->ip_proto!=IPPROTO_UDP) { - plugin_manager_on_packet_egress(st->plug_mgr, &pkt); + plugin_manager_on_packet_egress(st->plug_mgr, pkt); } } @@ -350,7 +359,7 @@ inline struct session *packet_get_session(const struct packet *pkt) inline struct packet *session_get_current_packet(struct session *sess) { - return &sess->cur_pkt; + return stellar_get0_current_thread_packet(sess->st); } const char* session_get0_readable_addr(struct session *sess) @@ -403,16 +412,17 @@ const char *session_get0_current_payload(struct session *sess, size_t *payload_l inline const struct packet *session_get0_current_packet(struct session *sess) { - if(unlikely(sess->cur_pkt.raw_pkt == NULL)) + struct packet *pkt=stellar_get0_current_thread_packet(sess->st); + if(unlikely(pkt->raw_pkt == NULL)) { struct streaminfo *pstream=sess->pstream; assert(pstream); + pkt->pstream=pstream; + pkt->sess=sess; //attach packet to session - sess->cur_pkt.raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream); + pkt->raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream); } - - return &sess->cur_pkt; - + return pkt; } int session_get_direction(struct session *sess) @@ -420,7 +430,6 @@ int session_get_direction(struct session *sess) if(sess->session_direction>=0)return sess->session_direction; struct streaminfo *pstream=sess->pstream; assert(pstream); - // int c2s_router_dir=(sess->cur_pkt->a_stream->curdir==DIR_C2S)?(sess->cur_pkt->a_stream->routedir):((sess->cur_pkt->a_stream->routedir)^1); u_int8_t c2s_router_dir = 0; int dir_len = sizeof(c2s_router_dir); MESA_get_stream_opt(pstream, MSO_STREAM_C2S_ROUTE_DIRECTION, (void *)&c2s_router_dir, &dir_len); diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c index 1497093..16ff7f6 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_loader.c +++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c @@ -2,6 +2,7 @@ #include "stellar/stellar.h" #include "stellar/session_exdata.h" +#include "stellar/packet_exdata.h" #include "stellar_on_sapp.h" #include <MESA/stream.h> @@ -18,7 +19,8 @@ enum entry_type struct stellar *g_stellar=NULL; int g_session_bridge_id=-1; -int g_streaminfo_exdata_id=-1; +int g_session_streaminfo_exdata_id=-1; +int g_packet_streaminfo_exdata_id=-1; static int stream_is_inner_most(struct streaminfo *a_stream) @@ -47,8 +49,10 @@ int STELLAR_START_LOADER_INIT() g_session_bridge_id=stream_bridge_build(STELLAR_BRIDEGE_NAME, "w"); if(g_session_bridge_id < 0)return -1; stream_bridge_register_data_free_cb(g_session_bridge_id, stellar_on_sapp_bridge_free); - g_streaminfo_exdata_id=stellar_session_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL); - if(g_streaminfo_exdata_id < 0)return -1; + g_session_streaminfo_exdata_id=stellar_session_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL); + if(g_session_streaminfo_exdata_id < 0)return -1; + g_packet_streaminfo_exdata_id=stellar_packet_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL); + if(g_session_streaminfo_exdata_id < 0)return -1; return 0; } @@ -126,7 +130,7 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC if(!sess) { sess = session_new_on_sapp(g_stellar, pstream); - session_exdata_set(sess, g_streaminfo_exdata_id, pstream); + session_exdata_set(sess, g_session_streaminfo_exdata_id, pstream); stream_bridge_async_data_put(pstream, g_session_bridge_id, sess); } entry_ret = session_state_update_on_sapp(pstream, state, sess, raw_pkt, pkt_type); @@ -174,13 +178,23 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir ,int thread_seq , void *a_packet) { - if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4); + if(stream_is_inner_most(pstream)==1) + { + struct packet *pkt=stellar_get0_current_thread_packet(g_stellar); + packet_exdata_set(pkt, g_packet_streaminfo_exdata_id, pstream); + packet_update_on_sapp(g_stellar, pstream, pkt,a_packet, IPv4); + } return APP_STATE_GIVEME; } char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir ,int thread_seq , void *a_packet) { - if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6); + if(stream_is_inner_most(pstream)==1) + { + struct packet *pkt=stellar_get0_current_thread_packet(g_stellar); + packet_exdata_set(pkt, g_packet_streaminfo_exdata_id, pstream); + packet_update_on_sapp(g_stellar, pstream, pkt, a_packet, IPv6); + } return APP_STATE_GIVEME; } |
