summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-03-01 10:45:33 +0800
committeryangwei <[email protected]>2024-04-30 15:58:10 +0800
commitb1e2dd20c4470d3b4dfcb98fb0a9289f197b200e (patch)
tree9e0bb6a16e55df2fe4ca584f40793a521231b714 /src
parentdba8ffef7b3b69db106d75c86e784c57b88b92b8 (diff)
✨ feat(polling return with work state): 0: idle, 1: working
Diffstat (limited to 'src')
-rw-r--r--src/plugin_manager/CMakeLists.txt2
-rw-r--r--src/plugin_manager/plugin_manager.c37
-rw-r--r--src/plugin_manager/plugin_manager.h3
-rw-r--r--src/stellar_on_sapp/defer_loader.inf8
-rw-r--r--src/stellar_on_sapp/start_loader.inf9
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp.h4
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c6
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_loader.c86
8 files changed, 66 insertions, 89 deletions
diff --git a/src/plugin_manager/CMakeLists.txt b/src/plugin_manager/CMakeLists.txt
index 3db675a..cc17074 100644
--- a/src/plugin_manager/CMakeLists.txt
+++ b/src/plugin_manager/CMakeLists.txt
@@ -4,4 +4,4 @@ include_directories(${CMAKE_SOURCE_DIR}/deps)
include_directories(${CMAKE_SOURCE_DIR}/src/stellar_on_sapp)
add_library(plugin_manager STATIC plugin_manager.c)
-target_link_libraries(plugin_manager toml)
+target_link_libraries(plugin_manager toml bitmap)
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index dc0939b..f14385f 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -9,6 +9,8 @@
#include "uthash/utlist.h"
#include "uthash/utarray.h"
+#include "bitmap/bitmap.h"
+
struct plugin_manager_schema
{
struct stellar *st;
@@ -81,7 +83,7 @@ struct plugin_manager_runtime
struct session *sess;
struct session_message *pending_mq;// message list
struct session_message *delivered_mq;// message list
- char *session_mq_status; //N * M bits, N topic, M subscriber
+ struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
struct plugin_exdata *plugin_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
@@ -491,7 +493,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
- plug_mgr_rt->session_mq_status[topic_id+(session_plugin_sub_info->subscriber_idx)]=bit_value;
+ bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
}
}
return 0;
@@ -577,7 +579,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- if (plug_mgr_rt->session_mq_status[(mq_elt->topic_id) + cur_sub_idx] != 0)
+ if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
{
plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id);
session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id);
@@ -589,7 +591,7 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
plugin_ctx_rt->state=ACTIVE;
}
}
- sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
+ if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
session_plugin_schema->plugin_env);
}
cur_sub_idx++;
@@ -648,8 +650,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
rt->sess = sess;
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
- rt->session_mq_status=CALLOC(char, plug_mgr->topic_num*plug_mgr->subscriber_num);
- memset(rt->session_mq_status, 1, plug_mgr->topic_num*plug_mgr->subscriber_num);
+ rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1);
rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr);
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
return rt;
@@ -671,21 +672,22 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
}
if(rt->session_mq_status != NULL)
{
- FREE(rt->session_mq_status);
+ bitmap_free(rt->session_mq_status);
}
- session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array);
- FREE(rt->plugin_exdata_array);
unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
for(unsigned int i = 0; i < len; i++)
{
struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
- if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state!=EXIT)
+ if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE)
{
session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
}
}
FREE(rt->plugin_ctx_array);
+
+ session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array);
+ FREE(rt->plugin_exdata_array);
FREE(rt);
}
@@ -751,18 +753,22 @@ int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func o
}
// FIXME: scheduler with return value
-void plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
+int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
{
- if(plug_mgr->registered_polling_plugin_array == NULL)return;
+ if(plug_mgr->registered_polling_plugin_array == NULL)return 0;
struct registered_polling_plugin_schema *p=NULL;
+ int polling_state=0;
while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
{
if(p->on_polling)
{
- p->on_polling(p->plugin_env);
+ if(p->on_polling(p->plugin_env)==1)
+ {
+ polling_state=1;
+ }
}
}
- return;
+ return polling_state;
}
/*********************************************
@@ -816,6 +822,7 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet
default:
break;
}
+ //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
return;
@@ -845,7 +852,7 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
- plug_mgr_rt->session_mq_status[session_plugin_sub_info->topic_id+session_plugin_sub_info->subscriber_idx]=0;//clear related session_mq_status
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
}
}
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index c967e9e..6d3da99 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -10,7 +10,8 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-void plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
+//return polling work state, 0: idle, 1: working
+int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt);
diff --git a/src/stellar_on_sapp/defer_loader.inf b/src/stellar_on_sapp/defer_loader.inf
index ebe9dc1..5d03294 100644
--- a/src/stellar_on_sapp/defer_loader.inf
+++ b/src/stellar_on_sapp/defer_loader.inf
@@ -8,14 +8,6 @@ DESTROY_FUNC=STELLAR_DEFER_LOADER_EXIT
FUNC_FLAG=ALL
FUNC_NAME=stellar_on_sapp_defer_entry
-#[TCP]
-#FUNC_FLAG=ALL
-#FUNC_NAME=stellar_on_sapp_defer_entry
-
-#[TCP]
-#FUNC_FLAG=ALL
-#FUNC_NAME=stellar_on_sapp_detach_check_entry
-
[UDP]
FUNC_FLAG=ALL
FUNC_NAME=stellar_on_sapp_defer_entry \ No newline at end of file
diff --git a/src/stellar_on_sapp/start_loader.inf b/src/stellar_on_sapp/start_loader.inf
index 0b029b8..7f42774 100644
--- a/src/stellar_on_sapp/start_loader.inf
+++ b/src/stellar_on_sapp/start_loader.inf
@@ -8,13 +8,10 @@ DESTROY_FUNC=STELLAR_START_LOADER_EXIT
FUNC_FLAG=ALL
FUNC_NAME=stellar_on_sapp_tcpall_entry
-#[TCP]
-#FUNC_FLAG=ALL
-#FUNC_NAME=stellar_on_sapp_tcp_entry
+[TCP]
+FUNC_FLAG=ALL
+FUNC_NAME=stellar_on_sapp_tcp_entry
-#[TCP]
-#FUNC_FLAG=ALL
-#FUNC_NAME=stellar_on_sapp_detach_check_entry
[UDP]
FUNC_FLAG=ALL
diff --git a/src/stellar_on_sapp/stellar_on_sapp.h b/src/stellar_on_sapp/stellar_on_sapp.h
index 0784458..095af00 100644
--- a/src/stellar_on_sapp/stellar_on_sapp.h
+++ b/src/stellar_on_sapp/stellar_on_sapp.h
@@ -1,7 +1,6 @@
#pragma once
#include "stellar_internal.h"
-#include "stellar/session.h"
struct streaminfo;
@@ -18,4 +17,5 @@ void session_poll_on_sapp(struct session *sess);
void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void *a_packet, enum packet_type type);
-void polling_on_sapp(struct stellar *st); \ No newline at end of file
+//return polling work state, 0: idle, 1: working
+int polling_on_sapp(struct stellar *st); \ No newline at end of file
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index e0706c0..707ceb9 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -127,7 +127,7 @@ unsigned char session_state_update_on_sapp(struct streaminfo *stream, struct ses
{
if(sess && a_packet)
{
- sess->state=SESSION_STATE_ACTIVE;
+ sess->state=(stream->pktstate == OP_STATE_PENDING) ? SESSION_STATE_OPENING : SESSION_STATE_ACTIVE;;
struct packet *pkt = sess->cur_pkt;
pkt->raw_pkt=(void *)get_current_rawpkt_from_streaminfo(stream);
pkt->type=type;
@@ -179,9 +179,9 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void
plugin_manager_on_packet(st->plug_mgr, &pkt);
}
-void polling_on_sapp(struct stellar *st)
+inline int polling_on_sapp(struct stellar *st)
{
- plugin_manager_on_polling(st->plug_mgr);
+ return plugin_manager_on_polling(st->plug_mgr);
}
/*********************************************
diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c
index c8a4306..cd95d59 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_loader.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c
@@ -32,26 +32,20 @@ struct stellar *g_stellar=NULL;
int g_session_bridge_id=-1;
int g_streaminfo_exdata_id=-1;
-static int is_l7_type_tunnels(struct streaminfo *a_stream)
+
+static int stream_is_inner_most(struct streaminfo *a_stream)
{
- const struct streaminfo *pfather=NULL, *ptmp=a_stream;
- while(ptmp)
- {
- pfather = ptmp->pfather;
- switch(ptmp->type)
- {
- case STREAM_TYPE_SOCKS4:
- case STREAM_TYPE_SOCKS5:
- case STREAM_TYPE_HTTP_PROXY:
- return 1;
- default:
- break;
- }
- ptmp = pfather;
- }
- return 0;
+ unsigned short tunnel_type=0;
+ int tunnel_type_len=sizeof(unsigned short);
+ MESA_get_stream_opt(a_stream, MSO_STREAM_UP_LAYER_TUNNEL_TYPE, (void *)&tunnel_type, &tunnel_type_len);
+ if (tunnel_type&STREAM_TUNNEL_GPRS_TUNNEL) // tunnel_type set STREAM_TUNNEL_GPRS_TUNNEL means it is a outer of GTP tunnel
+ {
+ return 0;
+ }
+ return 1;
}
+
static void stellar_on_sapp_bridge_free(const struct streaminfo *stream, int bridge_id, void *data)
{
session_free_on_sapp((struct session *)data);
@@ -83,7 +77,7 @@ int STELLAR_DEFER_LOADER_INIT()
char stellar_on_sapp_defer_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
- if(pstream->pktstate==OP_STATE_PENDING && is_l7_type_tunnels(pstream)==1)
+ if(pstream->pktstate==OP_STATE_PENDING && stream_is_inner_most(pstream)==0)
{
return APP_STATE_DROPME;
}
@@ -108,33 +102,16 @@ void STELLAR_DEFER_LOADER_EXIT(void)
static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet, enum entry_type type)
{
- struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme;
struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail;
int is_ctrl_pkt=0;
const void *raw_pkt=NULL;
unsigned char entry_ret=APP_STATE_GIVEME;
- if(state == OP_STATE_PENDING && (is_l7_type_tunnels(pstream)==1))
+ if(state == OP_STATE_PENDING && (stream_is_inner_most(pstream)==0))
{
return APP_STATE_DROPME;
}
- if(*pme==NULL)
- {
- *pme=CALLOC(struct simple_stream_ctx,1);
- ctx=(struct simple_stream_ctx *)*pme;
- }
-
- if(a_packet!=NULL)
- {
- if(DIR_C2S == pstream->curdir){
- ctx->c2s_bytes += pdetail->datalen;
- ctx->c2s_pkts++;
- }else{
- ctx->s2c_bytes += pdetail->datalen;
- ctx->s2c_pkts++;
- }
- }
-
+ struct session *sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id);
//entry_type convert to packet_type
enum packet_type pkt_type=UNKNOWN;
switch (type)
@@ -155,29 +132,25 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC
switch (state)
{
case OP_STATE_PENDING:
- if (type == ENTRY_TYPE_TCPALL || type == ENTRY_TYPE_UDP)
- {
- ctx->sess = session_new_on_sapp(g_stellar, pstream);
- session_exdata_set(ctx->sess, g_streaminfo_exdata_id, pstream);
- stream_bridge_async_data_put(pstream, g_session_bridge_id, ctx->sess);
- }
- else
- {
- ctx->sess = (struct session *)stream_bridge_async_data_get(pstream, g_session_bridge_id);
- }
- entry_ret = session_state_update_on_sapp(pstream, ctx->sess, a_packet, pkt_type);
+ if(!sess)
+ {
+ sess = session_new_on_sapp(g_stellar, pstream);
+ session_exdata_set(sess, g_streaminfo_exdata_id, pstream);
+ stream_bridge_async_data_put(pstream, g_session_bridge_id, sess);
+ }
+ entry_ret = session_state_update_on_sapp(pstream, sess, a_packet, pkt_type);
break;
case OP_STATE_DATA:
raw_pkt = get_current_rawpkt_from_streaminfo(pstream);
get_opt_from_rawpkt(raw_pkt, RAW_PKT_GET_IS_CTRL_PKT, (void *)&is_ctrl_pkt);
if(is_ctrl_pkt==1)
{
- entry_ret=session_state_update_on_sapp(pstream, ctx->sess, a_packet, CONTROL);
+ entry_ret=session_state_update_on_sapp(pstream, sess, a_packet, CONTROL);
entry_ret = entry_ret|APP_STATE_DROPPKT;
}
else
{
- entry_ret=session_state_update_on_sapp(pstream, ctx->sess, a_packet, pkt_type);
+ entry_ret=session_state_update_on_sapp(pstream, sess, a_packet, pkt_type);
}
break;
case OP_STATE_CLOSE:
@@ -185,7 +158,7 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC
// for TCP stream, only trigger update when packet payload > 0
if(a_packet && pstream->ptcpdetail->datalen > 0)
{
- entry_ret=session_state_update_on_sapp(pstream, ctx->sess, a_packet, pkt_type);
+ entry_ret=session_state_update_on_sapp(pstream, sess, a_packet, pkt_type);
}
FREE(*pme);
break;
@@ -225,6 +198,13 @@ char stellar_on_sapp_ip6_entry( struct streaminfo *pstream,unsigned char routedi
char stellar_on_sapp_polling_entry(struct streaminfo *stream, void **pme, int thread_seq,void *a_packet)
{
- polling_on_sapp(g_stellar);
- return APP_STATE_GIVEME;
+ int ret = polling_on_sapp(g_stellar);
+ switch (ret)
+ {
+ case 1:
+ return POLLING_STATE_WORK;
+ default:
+ break;
+ }
+ return POLLING_STATE_IDLE;
} \ No newline at end of file