summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp.h3
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c63
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_loader.c26
3 files changed, 58 insertions, 34 deletions
diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h
index f4f8159..d22ecd3 100644
--- a/src/stellar_on_sapp/stellar_on_sapp.h
+++ b/src/stellar_on_sapp/stellar_on_sapp.h
@@ -15,7 +15,8 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
void session_defer_on_sapp(struct session *sess);
-void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type);
+struct packet *stellar_get0_current_thread_packet(struct stellar *st);
+void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, struct packet *pkt, void *iphdr, enum packet_type type);
//return polling work state, 0: idle, 1: working
int polling_on_sapp(struct stellar *st); \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index 20f87a6..f551fc1 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -19,6 +19,7 @@ struct stellar
{
struct plugin_manager_schema *plug_mgr;
int tcp_stream_topic_id;
+ struct packet *per_thread_pkt;
};
struct packet
@@ -37,7 +38,6 @@ struct session
enum session_state state;
int session_direction;
struct streaminfo *pstream;
- struct packet cur_pkt;
struct stellar *st;
struct plugin_manager_runtime *plug_mgr_rt;
}__attribute__((aligned(sizeof(void*))));
@@ -77,6 +77,12 @@ struct stellar *stellar_init_on_sapp(const char *toml_conf_path)
}
st->plug_mgr=pm;
st->tcp_stream_topic_id=stellar_session_mq_get_topic_id(st, TOPIC_TCP_STREAM);
+ int tid=stellar_get_worker_thread_num(st);
+ st->per_thread_pkt=CALLOC(struct packet, tid);
+ for(int i=0; i<tid; i++)
+ {
+ st->per_thread_pkt[i].st=st;
+ }
return st;
}
@@ -84,6 +90,7 @@ void stellar_exit_on_sapp(struct stellar *st)
{
if(!st)return;
if(st->plug_mgr)plugin_manager_exit(st->plug_mgr);
+ FREE(st->per_thread_pkt);
FREE(st);
return;
};
@@ -115,9 +122,6 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea
sess->state = SESSION_STATE_OPENING;
sess->pstream=stream;
sess->session_direction=-1;
- memset(&sess->cur_pkt, 0, sizeof(struct packet));
- sess->cur_pkt.st=st;
- sess->cur_pkt.sess=sess;
sess->plug_mgr_rt=plugin_manager_session_runtime_new(st->plug_mgr, sess);
return sess;
}
@@ -143,7 +147,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
if(sess)
{
sess->state=(stream_state == OP_STATE_PENDING) ? SESSION_STATE_OPENING : SESSION_STATE_ACTIVE;
- struct packet *pkt = &sess->cur_pkt;
+ struct packet *pkt = stellar_get0_current_thread_packet(sess->st);
pkt->raw_pkt=raw_pkt;
pkt->type=type;
pkt->sess=sess;
@@ -164,9 +168,9 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, unsigned c
void session_defer_on_sapp(struct session *sess)
{
if(sess==NULL)return;
+ struct packet *pkt = stellar_get0_current_thread_packet(sess->st);
if(sess->state != SESSION_STATE_CONTROL)
{
- struct packet *pkt = &sess->cur_pkt;
if(unlikely(pkt->raw_pkt==NULL))pkt->raw_pkt=get_current_rawpkt_from_streaminfo(sess->pstream);
if(pkt->raw_pkt)
{
@@ -174,43 +178,48 @@ void session_defer_on_sapp(struct session *sess)
plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt);
}
}
- sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt
+ pkt->raw_pkt=NULL;//clear raw_pkt
}
/*********************************************
* PACKET STATE UPDATE ON SAPP *
*********************************************/
#include <netinet/in.h>
-void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *pkt_hdr, enum packet_type type)
-{
- struct packet pkt;
- pkt.type=type;
- pkt.raw_pkt=get_current_rawpkt_from_streaminfo(pstream);
- pkt.st=st;
- pkt.sess=NULL;
- pkt.pstream=pstream;
+
+struct packet *stellar_get0_current_thread_packet(struct stellar *st)
+{
+ int tid=get_current_worker_thread_id();
+ return &st->per_thread_pkt[tid];
+}
+
+void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, struct packet *pkt, void *pkt_hdr, enum packet_type type)
+{
+ pkt->type=type;
+ pkt->raw_pkt=get_current_rawpkt_from_streaminfo(pstream);
+ pkt->sess=NULL;
+ pkt->pstream=pstream;
switch (type)
{
case IPv4:
{
struct iphdr *ip4_hdr=(struct iphdr *)pkt_hdr;
- pkt.ip_proto=ip4_hdr->protocol;
+ pkt->ip_proto=ip4_hdr->protocol;
break;
}
case IPv6:
{
struct ip6_hdr *ip6_hdr=(struct ip6_hdr *)pkt_hdr;
- pkt.ip_proto=ip6_hdr->ip6_nxt;
+ pkt->ip_proto=ip6_hdr->ip6_nxt;
break;
}
default:
return;
}
//FIXME: defer TCP/UDP packet on session update
- plugin_manager_on_packet_ingress(st->plug_mgr, &pkt);
- if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP)
+ plugin_manager_on_packet_ingress(st->plug_mgr, pkt);
+ if(pkt->ip_proto!=IPPROTO_TCP && pkt->ip_proto!=IPPROTO_UDP)
{
- plugin_manager_on_packet_egress(st->plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(st->plug_mgr, pkt);
}
}
@@ -350,7 +359,7 @@ inline struct session *packet_get_session(const struct packet *pkt)
inline struct packet *session_get_current_packet(struct session *sess)
{
- return &sess->cur_pkt;
+ return stellar_get0_current_thread_packet(sess->st);
}
const char* session_get0_readable_addr(struct session *sess)
@@ -403,16 +412,17 @@ const char *session_get0_current_payload(struct session *sess, size_t *payload_l
inline const struct packet *session_get0_current_packet(struct session *sess)
{
- if(unlikely(sess->cur_pkt.raw_pkt == NULL))
+ struct packet *pkt=stellar_get0_current_thread_packet(sess->st);
+ if(unlikely(pkt->raw_pkt == NULL))
{
struct streaminfo *pstream=sess->pstream;
assert(pstream);
+ pkt->pstream=pstream;
+ pkt->sess=sess;
//attach packet to session
- sess->cur_pkt.raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream);
+ pkt->raw_pkt=(void *)get_current_rawpkt_from_streaminfo(pstream);
}
-
- return &sess->cur_pkt;
-
+ return pkt;
}
int session_get_direction(struct session *sess)
@@ -420,7 +430,6 @@ int session_get_direction(struct session *sess)
if(sess->session_direction>=0)return sess->session_direction;
struct streaminfo *pstream=sess->pstream;
assert(pstream);
- // int c2s_router_dir=(sess->cur_pkt->a_stream->curdir==DIR_C2S)?(sess->cur_pkt->a_stream->routedir):((sess->cur_pkt->a_stream->routedir)^1);
u_int8_t c2s_router_dir = 0;
int dir_len = sizeof(c2s_router_dir);
MESA_get_stream_opt(pstream, MSO_STREAM_C2S_ROUTE_DIRECTION, (void *)&c2s_router_dir, &dir_len);
diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c
index 1497093..16ff7f6 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_loader.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c
@@ -2,6 +2,7 @@
#include "stellar/stellar.h"
#include "stellar/session_exdata.h"
+#include "stellar/packet_exdata.h"
#include "stellar_on_sapp.h"
#include <MESA/stream.h>
@@ -18,7 +19,8 @@ enum entry_type
struct stellar *g_stellar=NULL;
int g_session_bridge_id=-1;
-int g_streaminfo_exdata_id=-1;
+int g_session_streaminfo_exdata_id=-1;
+int g_packet_streaminfo_exdata_id=-1;
static int stream_is_inner_most(struct streaminfo *a_stream)
@@ -47,8 +49,10 @@ int STELLAR_START_LOADER_INIT()
g_session_bridge_id=stream_bridge_build(STELLAR_BRIDEGE_NAME, "w");
if(g_session_bridge_id < 0)return -1;
stream_bridge_register_data_free_cb(g_session_bridge_id, stellar_on_sapp_bridge_free);
- g_streaminfo_exdata_id=stellar_session_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL);
- if(g_streaminfo_exdata_id < 0)return -1;
+ g_session_streaminfo_exdata_id=stellar_session_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL);
+ if(g_session_streaminfo_exdata_id < 0)return -1;
+ g_packet_streaminfo_exdata_id=stellar_packet_exdata_new_index(g_stellar, STELLAR_EXDATA_NAME, NULL, NULL);
+ if(g_session_streaminfo_exdata_id < 0)return -1;
return 0;
}
@@ -126,7 +130,7 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC
if(!sess)
{
sess = session_new_on_sapp(g_stellar, pstream);
- session_exdata_set(sess, g_streaminfo_exdata_id, pstream);
+ session_exdata_set(sess, g_session_streaminfo_exdata_id, pstream);
stream_bridge_async_data_put(pstream, g_session_bridge_id, sess);
}
entry_ret = session_state_update_on_sapp(pstream, state, sess, raw_pkt, pkt_type);
@@ -174,13 +178,23 @@ unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, i
char stellar_on_sapp_ip4_entry( struct streaminfo *pstream,unsigned char routedir ,int thread_seq , void *a_packet)
{
- if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv4);
+ if(stream_is_inner_most(pstream)==1)
+ {
+ struct packet *pkt=stellar_get0_current_thread_packet(g_stellar);
+ packet_exdata_set(pkt, g_packet_streaminfo_exdata_id, pstream);
+ packet_update_on_sapp(g_stellar, pstream, pkt,a_packet, IPv4);
+ }
return APP_STATE_GIVEME;
}
char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedir ,int thread_seq , void *a_packet)
{
- if(stream_is_inner_most(pstream)==1)packet_update_on_sapp(g_stellar, pstream, a_packet, IPv6);
+ if(stream_is_inner_most(pstream)==1)
+ {
+ struct packet *pkt=stellar_get0_current_thread_packet(g_stellar);
+ packet_exdata_set(pkt, g_packet_streaminfo_exdata_id, pstream);
+ packet_update_on_sapp(g_stellar, pstream, pkt, a_packet, IPv6);
+ }
return APP_STATE_GIVEME;
}