diff options
| author | yangwei <[email protected]> | 2024-05-29 00:21:05 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-30 06:00:30 +0800 |
| commit | a7adf2fe90bf44403dc486ae10e1af5ae6cafc37 (patch) | |
| tree | 8ff4c17bba47d07d0ab117315f8f79d70aeeb54a /src | |
| parent | af179a089c5fe9c0a4a681c2845eccc97bdf0565 (diff) | |
✨ feat(packet direction unknonw): set session in session packet
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 67 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 1 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 10 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_loader.c | 6 |
4 files changed, 54 insertions, 30 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 7f11fb5..1760c6d 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -227,6 +227,8 @@ int stellar_exdata_new_index(struct stellar *st, const char *name, UT_array **ex t_schema = (struct stellar_exdata_schema *)utarray_eltptr(*exdata_schema, i); if(strcmp(t_schema->name, name) == 0) { + t_schema->free_func=free_func; + t_schema->free_arg=free_arg; return t_schema->idx; } } @@ -424,7 +426,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) { - if(mq_schema_array==NULL)return 0; + if(mq_schema_array==NULL)return -1; unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id) return -1; @@ -535,7 +537,6 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ cnt++; tmp_subscriber=tmp_subscriber->next; } - return -1; } }; @@ -615,11 +616,11 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) * SESSION MQ * *******************************/ -void session_mq_free(struct session *sess, struct stellar_message *head, UT_array *mq_schema_array) +void session_mq_free(struct session *sess, struct stellar_message **head, UT_array *mq_schema_array) { struct stellar_message *mq_elt, *tmp; struct stellar_mq_topic_schema *topic; - DL_FOREACH_SAFE(head, mq_elt, tmp) + DL_FOREACH_SAFE(*head, mq_elt, tmp) { topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)(mq_elt->topic_id)); @@ -627,10 +628,10 @@ void session_mq_free(struct session *sess, struct stellar_message *head, UT_arra { topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg); } - DL_DELETE(head, mq_elt); + DL_DELETE(*head, mq_elt); FREE(mq_elt); } - FREE(head); + FREE(*head); } inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) @@ -720,7 +721,8 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms { if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->session_mq_schema_array==NULL)return -1; + if(plug_mgr == NULL || plug_mgr->session_mq_schema_array==NULL || plug_mgr->registered_session_plugin_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; @@ -739,8 +741,18 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms struct stellar_mq_subscriber_info *p=NULL; while( (p=(struct stellar_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) { - if(p->topic_id==topic_id) - return 0; + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->sess_msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } }; struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); @@ -791,7 +803,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess) if (session_plugin_schema->on_ctx_new) { plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - plugin_ctx_rt->state = ACTIVE; + if(plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) + { + session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + plugin_ctx_rt->plugin_ctx=NULL; + } + else + { + plugin_ctx_rt->state = ACTIVE; + } } } if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again @@ -860,15 +880,15 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; + assert(rt->pending_mq==NULL); if(rt->pending_mq != NULL) { - session_mq_free(rt->sess, rt->pending_mq, rt->plug_mgr->session_mq_schema_array); - rt->pending_mq=NULL; + session_mq_free(rt->sess, &rt->pending_mq, rt->plug_mgr->session_mq_schema_array); } + assert(rt->delivered_mq==NULL); if(rt->delivered_mq != NULL) { - session_mq_free(rt->sess, rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); - rt->delivered_mq=NULL; + session_mq_free(rt->sess, &rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); } if(rt->session_mq_status != NULL) { @@ -1032,7 +1052,7 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; + //plug_mgr_rt->enable_session_mq=0; return; } @@ -1040,12 +1060,13 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - plug_mgr_rt->enable_session_mq=1; + //plug_mgr_rt->enable_session_mq=1; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); - plug_mgr_rt->delivered_mq=NULL; plug_mgr_rt->enable_session_mq=0; + session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); + assert(plug_mgr_rt->pending_mq==NULL); + assert(plug_mgr_rt->delivered_mq==NULL); return; } @@ -1066,12 +1087,14 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) } } - if(session_plugin_schema->on_ctx_free) + //dettach in ctx INIT, do not call on_ctx_free immidiately + if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && session_plugin_schema->on_ctx_free) { - session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env); + session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env); + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL; + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT; } - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL; - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT; + return; } diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 0c9e055..24b8d42 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -164,6 +164,7 @@ struct registered_session_plugin_schema UT_array *registed_session_mq_subscriber_info; }__attribute__((aligned(sizeof(void*)))); +#define SESSION_PULGIN_ID_BASE 0x00000 #define PACKET_PULGIN_ID_BASE 0x10000 #define POLLING_PULGIN_ID_BASE 0x20000 diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index f9e5637..9f4f0ae 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -22,6 +22,7 @@ struct packet enum packet_type type; unsigned char ip_proto; const void *raw_pkt; + struct session *sess; struct stellar *st; }__attribute__((aligned(sizeof(void*)))) ; @@ -110,6 +111,7 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea sess->session_direction=-1; memset(&sess->cur_pkt, 0, sizeof(struct packet)); sess->cur_pkt.st=st; + sess->cur_pkt.sess=sess; sess->plug_mgr_rt=plugin_manager_session_runtime_new(st->plug_mgr, sess); return sess; } @@ -170,6 +172,7 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void pkt.type=type; pkt.raw_pkt=get_current_rawpkt_from_streaminfo(pstream); pkt.st=st; + pkt.sess=NULL; switch (type) { case IPv4: @@ -187,7 +190,7 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void default: return; } - //TODO: exclude non innermost packet + //FIXME: defer TCP/UDP packet on session update plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) { @@ -226,8 +229,9 @@ inline enum packet_type packet_get_type(const struct packet *pkt) int packet_get_direction(const struct packet *pkt) { - struct session *sess = container_of(pkt, struct session, cur_pkt); - assert(sess); + struct session *sess = pkt->sess; + if(sess==NULL)return PACKET_DIRECTION_UNKNOWN; + const struct streaminfo *pstream=sess->pstream; if(pstream->curdir==DIR_C2S) { diff --git a/src/stellar_on_sapp/stellar_on_sapp_loader.c b/src/stellar_on_sapp/stellar_on_sapp_loader.c index 111c44d..0bbb83a 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_loader.c +++ b/src/stellar_on_sapp/stellar_on_sapp_loader.c @@ -74,12 +74,8 @@ char stellar_on_sapp_defer_entry(struct streaminfo *pstream,void **pme, int thre if(sess) { session_defer_on_sapp(sess); - return APP_STATE_GIVEME; - } - else - { - return APP_STATE_DROPME; } + return APP_STATE_GIVEME; } void STELLAR_DEFER_LOADER_EXIT(void) |
