summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-12-28 17:43:26 +0800
committer刘学利 <[email protected]>2023-12-28 12:46:51 +0000
commitc131eeda82a32b0db6f4dc56adcc5e5d6815c9fb (patch)
treea4a975b9a345d1fdf639f88deecdea8101195993
parenta9afd24fafcf867b6fa272e6651c41dac5407567 (diff)
✨ feat(session event dispatch): update dispatch logic and add examplev1.0.8
-rw-r--r--examples/sapp_plugin/publisher_loader.c3
-rw-r--r--examples/stellar_plugin/simple_stellar_plugin.c18
-rw-r--r--src/adapter/adapter.c3
-rw-r--r--src/adapter/session_manager.c66
4 files changed, 61 insertions, 29 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c
index d2db719..0003680 100644
--- a/examples/sapp_plugin/publisher_loader.c
+++ b/examples/sapp_plugin/publisher_loader.c
@@ -110,6 +110,7 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC
case OP_STATE_PENDING:
ctx->sess=adapter_session_new(g_stellar, pstream);
entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING);
+ adapter_session_poll(ctx->sess);
break;
case OP_STATE_DATA:
msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme);
@@ -118,9 +119,11 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC
FREE(msg);
}
entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE);
+ adapter_session_poll(ctx->sess);
break;
case OP_STATE_CLOSE:
entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING);
+ adapter_session_poll(ctx->sess);
adapter_session_free(ctx->sess);
print_stream_ctx(pstream, ctx);
FREE(*pme);
diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c
index 7e8b7ee..50f38f0 100644
--- a/examples/stellar_plugin/simple_stellar_plugin.c
+++ b/examples/stellar_plugin/simple_stellar_plugin.c
@@ -18,6 +18,7 @@ struct mq_message_stat
uint32_t c2s_bytes;
uint32_t s2c_pkts;
uint32_t s2c_bytes;
+ struct session_event* ud_ev;
};
extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg);
@@ -51,6 +52,16 @@ static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx,
return;
}
+int simple_event_plugin_user_defined_ev(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
+{
+ if(cb_arg == NULL)return -1;
+ struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
+ struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx);
+ printf("%s(plug:%d)trigger-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
+ print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id);
+ return 0;
+}
+
int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
{
if(cb_arg == NULL)return -1;
@@ -62,6 +73,12 @@ int simple_event_plugin_entry(struct session *sess, int events, const struct pac
session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat);
}
+ if ((events & SESS_EV_OPENING))
+ {
+ mg_stat->ud_ev=session_event_new(plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_user_defined_ev, cb_arg);
+ session_event_add(mg_stat->ud_ev, NULL);
+ }
+
if (pkt)
{
size_t payload_len = 0;
@@ -81,6 +98,7 @@ int simple_event_plugin_entry(struct session *sess, int events, const struct pac
if (mg_stat != NULL && (events & SESS_EV_CLOSING))
{
print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id);
+ session_event_free(mg_stat->ud_ev);
}
return 0;
}
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index 603e62b..518b16f 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -266,8 +266,7 @@ const char *session_get0_current_payload(struct session *sess, size_t *payload_l
#include <assert.h>
const struct packet *session_get0_current_packet(struct session *sess)
{
- assert(sess->cur_pkt);
- assert(sess->cur_pkt->a_stream);
+ if(sess->cur_pkt == NULL || sess->cur_pkt->a_stream == NULL)return NULL;
sess->cur_pkt->raw_pkt=(void *)get_current_rawpkt_from_streaminfo(sess->cur_pkt->a_stream);
return sess->cur_pkt;
}
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
index b552875..b552b96 100644
--- a/src/adapter/session_manager.c
+++ b/src/adapter/session_manager.c
@@ -427,7 +427,10 @@ int session_event_add(struct session_event *ev, const struct timeval *timeout)
{
ev->timeout = *timeout;
}
- DL_APPEND(ev->sess->ev_list, ev);
+ if(ev->sess)
+ {
+ DL_APPEND(ev->sess->ev_list, ev);
+ }
return 0;
}
@@ -445,7 +448,9 @@ int session_event_assign(struct session_event *ev, struct stellar *st, struct se
int session_event_del(struct session_event *ev)
{
+ if(ev->is_delete==1)return 0;
ev->is_delete=1;
+
if(ev->is_intrinsic==0 && (ev->prev != NULL || ev->next != NULL))
{
DL_DELETE(ev->sess->ev_list, ev);
@@ -479,38 +484,45 @@ struct session_event *session_get_intrinsic_event(struct session *sess, int plug
void session_defer_loop(struct session *sess)
{
+ struct session_mq *mq_elt=NULL, *mq_tmp=NULL;
+ struct session_mq_subscriber *sub_elt, *sub_tmp;
+ struct session_mq_topic_schema *topic;
+
struct session_event *ev=NULL, *ev_tmp=NULL;
int events=(1<<sess->state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP));
const struct packet *pkt= session_get0_current_packet(sess);
- DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp)
- {
- if(ev->session_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0))
- {
- ev->session_event_cb(sess, events, pkt, ev->cb_arg);
- }
- }
- struct session_mq *mq_elt=NULL, *mq_tmp=NULL;
- struct session_mq_subscriber *sub_elt, *sub_tmp;
- struct session_mq_topic_schema *topic;
- DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp)
+ while (sess->mq != NULL || sess->ev_list != NULL)
{
- topic=(struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array,(unsigned int)(mq_elt->topic_id));
- if(topic)
- {
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg);
- }
- if(topic->free_cb)
- {
- topic->free_cb(mq_elt->message, topic->cb_arg);
- }
- }
- DL_DELETE(sess->mq, mq_elt);
- free(mq_elt);
+ DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp)
+ {
+ topic = (struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array,
+ (unsigned int)(mq_elt->topic_id));
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg);
+ }
+ if (topic->free_cb)
+ {
+ topic->free_cb(mq_elt->message, topic->cb_arg);
+ }
+ }
+ DL_DELETE(sess->mq, mq_elt);
+ free(mq_elt);
+ }
+
+ DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp)
+ {
+ if (ev->session_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0))
+ {
+ ev->session_event_cb(sess, events, pkt, ev->cb_arg);
+ DL_DELETE(sess->ev_list, ev);
+ }
+ }
}
- return;
+ return;
}
int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval)