diff options
| author | yangwei <[email protected]> | 2023-12-28 17:43:26 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2023-12-28 19:48:18 +0800 |
| commit | 48ba62a3bcbcfc6d42eb04a9de19f6e1442a067b (patch) | |
| tree | 8ba69f06a58d2b5aace7f0936ee77f30c8b4e193 | |
| parent | 37117f9f350c0c7b624b8f74f0b03cb8ee20be72 (diff) | |
✨ feat(session event dispatch): update dispatch logic and add examplev1.0.7
| -rw-r--r-- | examples/sapp_plugin/publisher_loader.c | 3 | ||||
| -rw-r--r-- | examples/stellar_plugin/simple_stellar_plugin.c | 18 | ||||
| -rw-r--r-- | src/adapter/adapter.c | 3 | ||||
| -rw-r--r-- | src/adapter/session_manager.c | 66 |
4 files changed, 61 insertions, 29 deletions
diff --git a/examples/sapp_plugin/publisher_loader.c b/examples/sapp_plugin/publisher_loader.c index d2db719..0003680 100644 --- a/examples/sapp_plugin/publisher_loader.c +++ b/examples/sapp_plugin/publisher_loader.c @@ -110,6 +110,7 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC case OP_STATE_PENDING: ctx->sess=adapter_session_new(g_stellar, pstream); entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_OPENING); + adapter_session_poll(ctx->sess); break; case OP_STATE_DATA: msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme); @@ -118,9 +119,11 @@ static unsigned char loader_transfer_stream_entry(struct streaminfo *pstream, UC FREE(msg); } entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_ACTIVE); + adapter_session_poll(ctx->sess); break; case OP_STATE_CLOSE: entry_ret=adapter_session_state_update(pstream, ctx->sess, a_packet, SESSION_STATE_CLOSING); + adapter_session_poll(ctx->sess); adapter_session_free(ctx->sess); print_stream_ctx(pstream, ctx); FREE(*pme); diff --git a/examples/stellar_plugin/simple_stellar_plugin.c b/examples/stellar_plugin/simple_stellar_plugin.c index 7e8b7ee..50f38f0 100644 --- a/examples/stellar_plugin/simple_stellar_plugin.c +++ b/examples/stellar_plugin/simple_stellar_plugin.c @@ -18,6 +18,7 @@ struct mq_message_stat uint32_t c2s_bytes; uint32_t s2c_pkts; uint32_t s2c_bytes; + struct session_event* ud_ev; }; extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg); @@ -51,6 +52,16 @@ static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, return; } +int simple_event_plugin_user_defined_ev(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +{ + if(cb_arg == NULL)return -1; + struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; + struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); + printf("%s(plug:%d)trigger-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); + print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); + return 0; +} + int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) { if(cb_arg == NULL)return -1; @@ -62,6 +73,12 @@ int simple_event_plugin_entry(struct session *sess, int events, const struct pac session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat); } + if ((events & SESS_EV_OPENING)) + { + mg_stat->ud_ev=session_event_new(plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_user_defined_ev, cb_arg); + session_event_add(mg_stat->ud_ev, NULL); + } + if (pkt) { size_t payload_len = 0; @@ -81,6 +98,7 @@ int simple_event_plugin_entry(struct session *sess, int events, const struct pac if (mg_stat != NULL && (events & SESS_EV_CLOSING)) { print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); + session_event_free(mg_stat->ud_ev); } return 0; } diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index 603e62b..518b16f 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -266,8 +266,7 @@ const char *session_get0_current_payload(struct session *sess, size_t *payload_l #include <assert.h> const struct packet *session_get0_current_packet(struct session *sess) { - assert(sess->cur_pkt); - assert(sess->cur_pkt->a_stream); + if(sess->cur_pkt == NULL || sess->cur_pkt->a_stream == NULL)return NULL; sess->cur_pkt->raw_pkt=(void *)get_current_rawpkt_from_streaminfo(sess->cur_pkt->a_stream); return sess->cur_pkt; } diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c index b552875..b552b96 100644 --- a/src/adapter/session_manager.c +++ b/src/adapter/session_manager.c @@ -427,7 +427,10 @@ int session_event_add(struct session_event *ev, const struct timeval *timeout) { ev->timeout = *timeout; } - DL_APPEND(ev->sess->ev_list, ev); + if(ev->sess) + { + DL_APPEND(ev->sess->ev_list, ev); + } return 0; } @@ -445,7 +448,9 @@ int session_event_assign(struct session_event *ev, struct stellar *st, struct se int session_event_del(struct session_event *ev) { + if(ev->is_delete==1)return 0; ev->is_delete=1; + if(ev->is_intrinsic==0 && (ev->prev != NULL || ev->next != NULL)) { DL_DELETE(ev->sess->ev_list, ev); @@ -479,38 +484,45 @@ struct session_event *session_get_intrinsic_event(struct session *sess, int plug void session_defer_loop(struct session *sess) { + struct session_mq *mq_elt=NULL, *mq_tmp=NULL; + struct session_mq_subscriber *sub_elt, *sub_tmp; + struct session_mq_topic_schema *topic; + struct session_event *ev=NULL, *ev_tmp=NULL; int events=(1<<sess->state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP)); const struct packet *pkt= session_get0_current_packet(sess); - DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp) - { - if(ev->session_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0)) - { - ev->session_event_cb(sess, events, pkt, ev->cb_arg); - } - } - struct session_mq *mq_elt=NULL, *mq_tmp=NULL; - struct session_mq_subscriber *sub_elt, *sub_tmp; - struct session_mq_topic_schema *topic; - DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp) + while (sess->mq != NULL || sess->ev_list != NULL) { - topic=(struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array,(unsigned int)(mq_elt->topic_id)); - if(topic) - { - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg); - } - if(topic->free_cb) - { - topic->free_cb(mq_elt->message, topic->cb_arg); - } - } - DL_DELETE(sess->mq, mq_elt); - free(mq_elt); + DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp) + { + topic = (struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array, + (unsigned int)(mq_elt->topic_id)); + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg); + } + if (topic->free_cb) + { + topic->free_cb(mq_elt->message, topic->cb_arg); + } + } + DL_DELETE(sess->mq, mq_elt); + free(mq_elt); + } + + DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp) + { + if (ev->session_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0)) + { + ev->session_event_cb(sess, events, pkt, ev->cb_arg); + DL_DELETE(sess->ev_list, ev); + } + } } - return; + return; } int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval) |
