summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-11-16 19:21:44 +0800
committeryangwei <[email protected]>2023-11-16 19:21:44 +0800
commit892678bc8a4852543a9169bbc32c7d8562b39cb8 (patch)
treee91af2035c79e63b5383b9556c1e8870f656fccf
parent1457a9f4fe5d0c29b3b6fdaf6133f289f2a3b2e7 (diff)
✨ feat(session management): added session_plugin_detach_othersv1.0.1
Implemented a new interface session_plugin_detach_others to modify a session's behavior. This function sets the registered events of all plugins associated with the session to 0, except for the specified plugin_id, effectively detaching other plugins.
-rw-r--r--examples/sapp_plugin/publisher_loader.c17
-rw-r--r--include/stellar/session.h2
-rw-r--r--src/adapter/adapter.c13
-rw-r--r--src/adapter/adapter.h2
-rw-r--r--src/adapter/session_manager.c16
-rw-r--r--src/adapter/session_manager.h1
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp.c19
7 files changed, 49 insertions, 21 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c
index 9f1e8ab..d2db719 100644
--- a/examples/sapp_plugin/publisher_loader.c
+++ b/examples/sapp_plugin/publisher_loader.c
@@ -83,10 +83,11 @@ static struct simple_stream_ctx *stream_ctx_dup(const struct simple_stream_ctx *
return ctx;
}
-static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
+static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
{
struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme;
struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail;
+ unsigned char entry_ret=APP_STATE_GIVEME;
if(*pme==NULL)
{
*pme=CALLOC(struct simple_stream_ctx,1);
@@ -108,7 +109,7 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
{
case OP_STATE_PENDING:
ctx->sess=adapter_session_new(g_stellar, pstream);
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING);
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING);
break;
case OP_STATE_DATA:
msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme);
@@ -116,10 +117,10 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
{
FREE(msg);
}
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE);
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE);
break;
case OP_STATE_CLOSE:
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING);
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING);
adapter_session_free(ctx->sess);
print_stream_ctx(pstream, ctx);
FREE(*pme);
@@ -127,17 +128,15 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
default:
break;
}
- return;
+ return entry_ret;
}
char publisher_loader_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
- loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
- return APP_STATE_GIVEME;
+ return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
}
char publisher_loader_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
- loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet);
- return APP_STATE_GIVEME;
+ return loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet);
}
diff --git a/include/stellar/session.h b/include/stellar/session.h
index f365e6a..964218d 100644
--- a/include/stellar/session.h
+++ b/include/stellar/session.h
@@ -132,6 +132,8 @@ struct session_event;
int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg);// register intrinsic event
struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id);
+void session_plugin_detach_others(struct session *sess, int plugin_id);
+
#include <sys/time.h>
struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg);
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index c3333a0..c99d0a5 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -177,7 +177,7 @@ void adapter_session_free(struct session *sess)
return session_free(sess);
}
-void adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state)
+unsigned char adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state)
{
struct packet *pkt = NULL;
if(sess)
@@ -199,8 +199,17 @@ void adapter_session_state_update(struct streaminfo *stream, struct session *ses
FREE(pkt);
session_set_current_packet(sess, NULL);
}
+ if(sess->set_detach_others==1)
+ {
+ return APP_STATE_GIVEME|APP_STATE_KILL_OTHER;
+ }
+ else
+ {
+ return APP_STATE_GIVEME;
+ }
}
- return;
+ else
+ return APP_STATE_DROPME;
}
const char* session_get0_readable_addr(struct session *sess)
diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h
index d281be3..3559f1a 100644
--- a/src/adapter/adapter.h
+++ b/src/adapter/adapter.h
@@ -12,6 +12,6 @@ void stellar_exit(struct stellar *st);
struct session *adapter_session_new(struct stellar *st, struct streaminfo *stream);
void adapter_session_free(struct session *sess);
-void adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state);
+unsigned char adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state);
void adapter_session_poll(struct session *sess); \ No newline at end of file
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
index 04cc760..d4315ac 100644
--- a/src/adapter/session_manager.c
+++ b/src/adapter/session_manager.c
@@ -322,6 +322,22 @@ void session_dispatch(struct session *sess, enum session_state state, struct pa
return;
}
+void session_plugin_detach_others(struct session *sess, int plugin_id)
+{
+ struct session_event *intrinsic_events = (struct session_event *)session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx);
+ if(sess->st->plugin_specs_array == NULL)return;
+ unsigned int len = utarray_len(sess->st->intrinsic_session_event_schema_array);
+ for(unsigned int i = 0; i < len; i++)
+ {
+ if((int)i!=plugin_id)
+ {
+ (intrinsic_events+i)->events = 0;
+ }
+ }
+ sess->set_detach_others=1;
+ return;
+}
+
static struct session_event* session_intrinsic_events_init(struct stellar *st, struct session *sess)
{
if(st->intrinsic_session_event_schema_array == NULL)return NULL;
diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h
index d923c47..e1e06a1 100644
--- a/src/adapter/session_manager.h
+++ b/src/adapter/session_manager.h
@@ -17,6 +17,7 @@ struct session
struct session_exdata_runtime *ex_data_rt;
struct session_mq *mq;
struct session_event *ev_list;
+ uint64_t set_detach_others;
};
struct session *session_new(struct stellar *st, enum session_type type, int thread_id);
diff --git a/src/stellar_on_sapp/stellar_on_sapp.c b/src/stellar_on_sapp/stellar_on_sapp.c
index 6843f32..6d8a58b 100644
--- a/src/stellar_on_sapp/stellar_on_sapp.c
+++ b/src/stellar_on_sapp/stellar_on_sapp.c
@@ -93,12 +93,13 @@ void STELLAR_DEFER_LOADER_EXIT(void)
return;
}
-static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
+static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
{
struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme;
struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail;
int is_ctrl_pkt=0;
const void *raw_pkt=NULL;
+ unsigned char entry_ret=APP_STATE_GIVEME;
if(state == OP_STATE_PENDING && (is_l7_type_tunnels(pstream)==1))
{
return APP_STATE_DROPME;
@@ -126,19 +127,19 @@ static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
ctx->sess=adapter_session_new(g_stellar, pstream);
session_set_ex_data(ctx->sess, g_streaminfo_exdata_id, pstream);
stream_bridge_async_data_put(pstream, g_session_bridge_id, ctx->sess);
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING);
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING);
break;
case OP_STATE_DATA:
raw_pkt = get_current_rawpkt_from_streaminfo(pstream);
get_opt_from_rawpkt(raw_pkt, RAW_PKT_GET_IS_CTRL_PKT, (void *)&is_ctrl_pkt);
if(is_ctrl_pkt==1)
{
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CONTROL);
- return APP_STATE_GIVEME|APP_STATE_DROPPKT;
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CONTROL);
+ entry_ret = entry_ret|APP_STATE_DROPPKT;
}
else
{
- adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE);
+ entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE);
}
break;
case OP_STATE_CLOSE:
@@ -147,20 +148,20 @@ static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state
default:
break;
}
- return APP_STATE_GIVEME;
+ return entry_ret;
}
-char stellar_on_sapp_udp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+unsigned char stellar_on_sapp_udp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
}
-char stellar_on_sapp_tcpall_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+unsigned char stellar_on_sapp_tcpall_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
return loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet);
}
-char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
{
return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
}