diff options
| -rw-r--r-- | examples/sapp_plugin/publisher_loader.c | 17 | ||||
| -rw-r--r-- | include/stellar/session.h | 2 | ||||
| -rw-r--r-- | src/adapter/adapter.c | 13 | ||||
| -rw-r--r-- | src/adapter/adapter.h | 2 | ||||
| -rw-r--r-- | src/adapter/session_manager.c | 16 | ||||
| -rw-r--r-- | src/adapter/session_manager.h | 1 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp.c | 19 |
7 files changed, 49 insertions, 21 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c index 9f1e8ab..d2db719 100644 --- a/examples/sapp_plugin/publisher_loader.c +++ b/examples/sapp_plugin/publisher_loader.c @@ -83,10 +83,11 @@ static struct simple_stream_ctx *stream_ctx_dup(const struct simple_stream_ctx * return ctx; } -static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) +static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) { struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme; struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail; + unsigned char entry_ret=APP_STATE_GIVEME; if(*pme==NULL) { *pme=CALLOC(struct simple_stream_ctx,1); @@ -108,7 +109,7 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state { case OP_STATE_PENDING: ctx->sess=adapter_session_new(g_stellar, pstream); - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); break; case OP_STATE_DATA: msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme); @@ -116,10 +117,10 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state { FREE(msg); } - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); break; case OP_STATE_CLOSE: - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING); + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING); adapter_session_free(ctx->sess); print_stream_ctx(pstream, ctx); FREE(*pme); @@ -127,17 +128,15 @@ static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state default: break; } - return; + return entry_ret; } char publisher_loader_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) { - loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); - return APP_STATE_GIVEME; + return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); } char publisher_loader_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) { - loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); - return APP_STATE_GIVEME; + return loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); } diff --git a/include/stellar/session.h b/include/stellar/session.h index f365e6a..964218d 100644 --- a/include/stellar/session.h +++ b/include/stellar/session.h @@ -132,6 +132,8 @@ struct session_event; int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg);// register intrinsic event struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id); +void session_plugin_detach_others(struct session *sess, int plugin_id); + #include <sys/time.h> struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg); diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index c3333a0..c99d0a5 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -177,7 +177,7 @@ void adapter_session_free(struct session *sess) return session_free(sess); } -void adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state) +unsigned char adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state) { struct packet *pkt = NULL; if(sess) @@ -199,8 +199,17 @@ void adapter_session_state_update(struct streaminfo *stream, struct session *ses FREE(pkt); session_set_current_packet(sess, NULL); } + if(sess->set_detach_others==1) + { + return APP_STATE_GIVEME|APP_STATE_KILL_OTHER; + } + else + { + return APP_STATE_GIVEME; + } } - return; + else + return APP_STATE_DROPME; } const char* session_get0_readable_addr(struct session *sess) diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h index d281be3..3559f1a 100644 --- a/src/adapter/adapter.h +++ b/src/adapter/adapter.h @@ -12,6 +12,6 @@ void stellar_exit(struct stellar *st); struct session *adapter_session_new(struct stellar *st, struct streaminfo *stream); void adapter_session_free(struct session *sess); -void adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state); +unsigned char adapter_session_state_update(struct streaminfo *stream, struct session *sess, void *a_packet, enum session_state state); void adapter_session_poll(struct session *sess);
\ No newline at end of file diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c index 04cc760..d4315ac 100644 --- a/src/adapter/session_manager.c +++ b/src/adapter/session_manager.c @@ -322,6 +322,22 @@ void session_dispatch(struct session *sess, enum session_state state, struct pa return; } +void session_plugin_detach_others(struct session *sess, int plugin_id) +{ + struct session_event *intrinsic_events = (struct session_event *)session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx); + if(sess->st->plugin_specs_array == NULL)return; + unsigned int len = utarray_len(sess->st->intrinsic_session_event_schema_array); + for(unsigned int i = 0; i < len; i++) + { + if((int)i!=plugin_id) + { + (intrinsic_events+i)->events = 0; + } + } + sess->set_detach_others=1; + return; +} + static struct session_event* session_intrinsic_events_init(struct stellar *st, struct session *sess) { if(st->intrinsic_session_event_schema_array == NULL)return NULL; diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h index d923c47..e1e06a1 100644 --- a/src/adapter/session_manager.h +++ b/src/adapter/session_manager.h @@ -17,6 +17,7 @@ struct session struct session_exdata_runtime *ex_data_rt; struct session_mq *mq; struct session_event *ev_list; + uint64_t set_detach_others; }; struct session *session_new(struct stellar *st, enum session_type type, int thread_id); diff --git a/src/stellar_on_sapp/stellar_on_sapp.c b/src/stellar_on_sapp/stellar_on_sapp.c index 6843f32..6d8a58b 100644 --- a/src/stellar_on_sapp/stellar_on_sapp.c +++ b/src/stellar_on_sapp/stellar_on_sapp.c @@ -93,12 +93,13 @@ void STELLAR_DEFER_LOADER_EXIT(void) return; } -static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) +static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) { struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme; struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail; int is_ctrl_pkt=0; const void *raw_pkt=NULL; + unsigned char entry_ret=APP_STATE_GIVEME; if(state == OP_STATE_PENDING && (is_l7_type_tunnels(pstream)==1)) { return APP_STATE_DROPME; @@ -126,19 +127,19 @@ static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state ctx->sess=adapter_session_new(g_stellar, pstream); session_set_ex_data(ctx->sess, g_streaminfo_exdata_id, pstream); stream_bridge_async_data_put(pstream, g_session_bridge_id, ctx->sess); - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); break; case OP_STATE_DATA: raw_pkt = get_current_rawpkt_from_streaminfo(pstream); get_opt_from_rawpkt(raw_pkt, RAW_PKT_GET_IS_CTRL_PKT, (void *)&is_ctrl_pkt); if(is_ctrl_pkt==1) { - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CONTROL); - return APP_STATE_GIVEME|APP_STATE_DROPPKT; + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CONTROL); + entry_ret = entry_ret|APP_STATE_DROPPKT; } else { - adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); + entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); } break; case OP_STATE_CLOSE: @@ -147,20 +148,20 @@ static char loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state default: break; } - return APP_STATE_GIVEME; + return entry_ret; } -char stellar_on_sapp_udp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) +unsigned char stellar_on_sapp_udp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) { return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); } -char stellar_on_sapp_tcpall_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) +unsigned char stellar_on_sapp_tcpall_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) { return loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); } -char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) +unsigned char stellar_on_sapp_tcp_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) { return loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); } |
