diff options
| author | yangwei <[email protected]> | 2024-05-29 00:21:05 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-30 06:00:30 +0800 |
| commit | a7adf2fe90bf44403dc486ae10e1af5ae6cafc37 (patch) | |
| tree | 8ff4c17bba47d07d0ab117315f8f79d70aeeb54a /src/plugin_manager/plugin_manager.c | |
| parent | af179a089c5fe9c0a4a681c2845eccc97bdf0565 (diff) | |
✨ feat(packet direction unknonw): set session in session packet
Diffstat (limited to 'src/plugin_manager/plugin_manager.c')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 67 |
1 files changed, 45 insertions, 22 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 7f11fb5..1760c6d 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -227,6 +227,8 @@ int stellar_exdata_new_index(struct stellar *st, const char *name, UT_array **ex t_schema = (struct stellar_exdata_schema *)utarray_eltptr(*exdata_schema, i); if(strcmp(t_schema->name, name) == 0) { + t_schema->free_func=free_func; + t_schema->free_arg=free_arg; return t_schema->idx; } } @@ -424,7 +426,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) { - if(mq_schema_array==NULL)return 0; + if(mq_schema_array==NULL)return -1; unsigned int len = utarray_len(mq_schema_array); if (len <= (unsigned int)topic_id) return -1; @@ -535,7 +537,6 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ cnt++; tmp_subscriber=tmp_subscriber->next; } - return -1; } }; @@ -615,11 +616,11 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) * SESSION MQ * *******************************/ -void session_mq_free(struct session *sess, struct stellar_message *head, UT_array *mq_schema_array) +void session_mq_free(struct session *sess, struct stellar_message **head, UT_array *mq_schema_array) { struct stellar_message *mq_elt, *tmp; struct stellar_mq_topic_schema *topic; - DL_FOREACH_SAFE(head, mq_elt, tmp) + DL_FOREACH_SAFE(*head, mq_elt, tmp) { topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)(mq_elt->topic_id)); @@ -627,10 +628,10 @@ void session_mq_free(struct session *sess, struct stellar_message *head, UT_arra { topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg); } - DL_DELETE(head, mq_elt); + DL_DELETE(*head, mq_elt); FREE(mq_elt); } - FREE(head); + FREE(*head); } inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) @@ -720,7 +721,8 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms { if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr->session_mq_schema_array==NULL)return -1; + if(plug_mgr == NULL || plug_mgr->session_mq_schema_array==NULL || plug_mgr->registered_session_plugin_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; @@ -739,8 +741,18 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms struct stellar_mq_subscriber_info *p=NULL; while( (p=(struct stellar_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) { - if(p->topic_id==topic_id) - return 0; + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->sess_msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } }; struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); @@ -791,7 +803,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess) if (session_plugin_schema->on_ctx_new) { plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - plugin_ctx_rt->state = ACTIVE; + if(plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) + { + session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + plugin_ctx_rt->plugin_ctx=NULL; + } + else + { + plugin_ctx_rt->state = ACTIVE; + } } } if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again @@ -860,15 +880,15 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; + assert(rt->pending_mq==NULL); if(rt->pending_mq != NULL) { - session_mq_free(rt->sess, rt->pending_mq, rt->plug_mgr->session_mq_schema_array); - rt->pending_mq=NULL; + session_mq_free(rt->sess, &rt->pending_mq, rt->plug_mgr->session_mq_schema_array); } + assert(rt->delivered_mq==NULL); if(rt->delivered_mq != NULL) { - session_mq_free(rt->sess, rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); - rt->delivered_mq=NULL; + session_mq_free(rt->sess, &rt->delivered_mq, rt->plug_mgr->session_mq_schema_array); } if(rt->session_mq_status != NULL) { @@ -1032,7 +1052,7 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) //TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead session_mq_publish_message(sess, topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - plug_mgr_rt->enable_session_mq=0; + //plug_mgr_rt->enable_session_mq=0; return; } @@ -1040,12 +1060,13 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; - plug_mgr_rt->enable_session_mq=1; + //plug_mgr_rt->enable_session_mq=1; session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->sess,plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); - plug_mgr_rt->delivered_mq=NULL; plug_mgr_rt->enable_session_mq=0; + session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); + assert(plug_mgr_rt->pending_mq==NULL); + assert(plug_mgr_rt->delivered_mq==NULL); return; } @@ -1066,12 +1087,14 @@ void stellar_session_plugin_dettach_current_session(struct session *sess) } } - if(session_plugin_schema->on_ctx_free) + //dettach in ctx INIT, do not call on_ctx_free immidiately + if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && session_plugin_schema->on_ctx_free) { - session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env); + session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env); + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL; + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT; } - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL; - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT; + return; } |
