summaryrefslogtreecommitdiff
path: root/src/plugin_manager
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-29 00:21:05 +0800
committeryangwei <[email protected]>2024-05-30 06:00:30 +0800
commita7adf2fe90bf44403dc486ae10e1af5ae6cafc37 (patch)
tree8ff4c17bba47d07d0ab117315f8f79d70aeeb54a /src/plugin_manager
parentaf179a089c5fe9c0a4a681c2845eccc97bdf0565 (diff)
✨ feat(packet direction unknonw): set session in session packet
Diffstat (limited to 'src/plugin_manager')
-rw-r--r--src/plugin_manager/plugin_manager.c67
-rw-r--r--src/plugin_manager/plugin_manager_interna.h1
2 files changed, 46 insertions, 22 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 7f11fb5..1760c6d 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -227,6 +227,8 @@ int stellar_exdata_new_index(struct stellar *st, const char *name, UT_array **ex
t_schema = (struct stellar_exdata_schema *)utarray_eltptr(*exdata_schema, i);
if(strcmp(t_schema->name, name) == 0)
{
+ t_schema->free_func=free_func;
+ t_schema->free_arg=free_arg;
return t_schema->idx;
}
}
@@ -424,7 +426,7 @@ int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *ms
int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
{
- if(mq_schema_array==NULL)return 0;
+ if(mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
@@ -535,7 +537,6 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_
cnt++;
tmp_subscriber=tmp_subscriber->next;
}
- return -1;
}
};
@@ -615,11 +616,11 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
* SESSION MQ *
*******************************/
-void session_mq_free(struct session *sess, struct stellar_message *head, UT_array *mq_schema_array)
+void session_mq_free(struct session *sess, struct stellar_message **head, UT_array *mq_schema_array)
{
struct stellar_message *mq_elt, *tmp;
struct stellar_mq_topic_schema *topic;
- DL_FOREACH_SAFE(head, mq_elt, tmp)
+ DL_FOREACH_SAFE(*head, mq_elt, tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
(unsigned int)(mq_elt->topic_id));
@@ -627,10 +628,10 @@ void session_mq_free(struct session *sess, struct stellar_message *head, UT_arra
{
topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg);
}
- DL_DELETE(head, mq_elt);
+ DL_DELETE(*head, mq_elt);
FREE(mq_elt);
}
- FREE(head);
+ FREE(*head);
}
inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
@@ -720,7 +721,8 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
{
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr->session_mq_schema_array==NULL)return -1;
+ if(plug_mgr == NULL || plug_mgr->session_mq_schema_array==NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
+
unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
@@ -739,8 +741,18 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
struct stellar_mq_subscriber_info *p=NULL;
while( (p=(struct stellar_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
{
- if(p->topic_id==topic_id)
- return 0;
+ struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
+ int cnt=0;
+ while(tmp_subscriber)
+ {
+ if(cnt==p->subscriber_idx)
+ {
+ tmp_subscriber->sess_msg_cb=plugin_on_msg_cb;
+ return 0;
+ }
+ cnt++;
+ tmp_subscriber=tmp_subscriber->next;
+ }
};
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
@@ -791,7 +803,15 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
if (session_plugin_schema->on_ctx_new)
{
plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
- plugin_ctx_rt->state = ACTIVE;
+ if(plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
+ {
+ session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ plugin_ctx_rt->plugin_ctx=NULL;
+ }
+ else
+ {
+ plugin_ctx_rt->state = ACTIVE;
+ }
}
}
if (sub_elt->sess_msg_cb && bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)// ctx_new maybe call detach, so need check again
@@ -860,15 +880,15 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
+ assert(rt->pending_mq==NULL);
if(rt->pending_mq != NULL)
{
- session_mq_free(rt->sess, rt->pending_mq, rt->plug_mgr->session_mq_schema_array);
- rt->pending_mq=NULL;
+ session_mq_free(rt->sess, &rt->pending_mq, rt->plug_mgr->session_mq_schema_array);
}
+ assert(rt->delivered_mq==NULL);
if(rt->delivered_mq != NULL)
{
- session_mq_free(rt->sess, rt->delivered_mq, rt->plug_mgr->session_mq_schema_array);
- rt->delivered_mq=NULL;
+ session_mq_free(rt->sess, &rt->delivered_mq, rt->plug_mgr->session_mq_schema_array);
}
if(rt->session_mq_status != NULL)
{
@@ -1032,7 +1052,7 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
//TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
- plug_mgr_rt->enable_session_mq=0;
+ //plug_mgr_rt->enable_session_mq=0;
return;
}
@@ -1040,12 +1060,13 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
- plug_mgr_rt->enable_session_mq=1;
+ //plug_mgr_rt->enable_session_mq=1;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->sess,plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
- plug_mgr_rt->delivered_mq=NULL;
plug_mgr_rt->enable_session_mq=0;
+ session_mq_free(plug_mgr_rt->sess,&plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ assert(plug_mgr_rt->pending_mq==NULL);
+ assert(plug_mgr_rt->delivered_mq==NULL);
return;
}
@@ -1066,12 +1087,14 @@ void stellar_session_plugin_dettach_current_session(struct session *sess)
}
}
- if(session_plugin_schema->on_ctx_free)
+ //dettach in ctx INIT, do not call on_ctx_free immidiately
+ if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && session_plugin_schema->on_ctx_free)
{
- session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env);
+ session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env);
+ plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL;
+ plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT;
}
- (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL;
- (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT;
+
return;
}
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 0c9e055..24b8d42 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -164,6 +164,7 @@ struct registered_session_plugin_schema
UT_array *registed_session_mq_subscriber_info;
}__attribute__((aligned(sizeof(void*))));
+#define SESSION_PULGIN_ID_BASE 0x00000
#define PACKET_PULGIN_ID_BASE 0x10000
#define POLLING_PULGIN_ID_BASE 0x20000