summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-24 02:52:27 +0800
committeryangwei <[email protected]>2024-05-26 18:39:47 +0800
commit6e3992676dbecf43d6552db703995714644aba50 (patch)
tree61c8b0fe54fe3eeb00dcb3c9d16973492f3b0a57
parent41e35a2f98a51b7d11402d376cb9b055793a1af7 (diff)
✨ feat(packet exdata): clean pkt exdata after egress
-rw-r--r--src/plugin_manager/plugin_manager.c87
-rw-r--r--src/plugin_manager/plugin_manager.h4
2 files changed, 77 insertions, 14 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 9111fb3..65a7200 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -16,6 +16,7 @@
#include <threads.h>
+struct per_thread_exdata_array;
struct plugin_manager_schema
{
@@ -34,15 +35,22 @@ struct plugin_manager_schema
int tcp_stream_topic_id;
int egress_topic_id;
int control_packet_topic_id;
+ struct per_thread_exdata_array *per_thread_pkt_exdata_array;
};
-
-
struct stellar_exdata
{
void *exdata;
};
+struct per_thread_exdata_array
+{
+ struct stellar_exdata *exdata_array;
+};
+
+
+
+
struct stellar_exdata_schema
{
char *name;
@@ -160,6 +168,7 @@ struct plugin_specific
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
+
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
*spec_num = 0;
@@ -218,6 +227,8 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
+static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
@@ -252,9 +263,12 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
}
}
FREE(specs);
+ pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm);
return pm;
}
+static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
struct plugin_specific *p=NULL;
@@ -287,6 +301,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
+ per_thread_packet_exdata_arrary_free(plug_mgr->st, plug_mgr);
FREE(plug_mgr);
return;
}
@@ -361,7 +376,58 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_
/*******************************
* PACKET EXDATA *
*******************************/
-__thread struct stellar_exdata *per_thread_pkt_exdata_arrary=NULL;
+
+static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct per_thread_exdata_array *per_thread_pkt_exdata_array = CALLOC(struct per_thread_exdata_array, thread_num);
+ unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
+ for (int i = 0; i < thread_num; i++)
+ {
+ (per_thread_pkt_exdata_array+i)->exdata_array = CALLOC(struct stellar_exdata, len);
+ }
+ return per_thread_pkt_exdata_array;
+}
+
+static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
+ int thread_num=stellar_get_worker_thread_num(st);
+ for (int i = 0; i < thread_num; i++)
+ {
+ FREE((plug_mgr->per_thread_pkt_exdata_array+i)->exdata_array);
+ }
+ FREE(plug_mgr->per_thread_pkt_exdata_array);
+ return;
+}
+
+static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL;
+ int tid=stellar_get_current_thread_id(plug_mgr->st);
+ return (plug_mgr->per_thread_pkt_exdata_array+tid)->exdata_array;
+}
+
+static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+{
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
+ unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
+ struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
+ for (unsigned int i = 0; i < len; i++)
+ {
+ void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
+ struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i);
+ if (exdata)
+ {
+ if (schema->pkt_free_func)
+ {
+ schema->pkt_free_func(pkt, i, exdata, schema->free_arg);
+ }
+ (per_thread_pkt_exdata_arrary + i)->exdata=NULL;
+ }
+ }
+}
int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg)
{
@@ -374,11 +440,7 @@ int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
{
if(pkt == NULL)return -1;
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
- if(per_thread_pkt_exdata_arrary == NULL)
- {
- per_thread_pkt_exdata_arrary=CALLOC(struct stellar_exdata, utarray_len(plug_mgr->packet_exdata_schema_array));
- }
- stellar_exdata_set(plug_mgr->session_exdata_schema_array, per_thread_pkt_exdata_arrary, idx, ex_ptr);
+ stellar_exdata_set(plug_mgr->session_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
return 0;
}
@@ -386,7 +448,7 @@ void *packet_exdata_get(struct packet *pkt, int idx)
{
if(pkt == NULL)return NULL;
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
- return stellar_exdata_get( plug_mgr->session_exdata_schema_array, per_thread_pkt_exdata_arrary, idx);
+ return stellar_exdata_get( plug_mgr->session_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
}
@@ -394,7 +456,7 @@ void *packet_exdata_get(struct packet *pkt, int idx)
* SESSION EXDATA *
*******************************/
-//TODO: allow exdata new index in plugin init stage
+//TODO: limit exdata new index in plugin init stage
int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
@@ -877,7 +939,7 @@ int stellar_session_plugin_register(struct stellar *st,
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
}
-void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt)
+void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
@@ -905,7 +967,7 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet
return;
}
-void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt)
+void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
@@ -913,6 +975,7 @@ void plugin_manager_on_session_egress(struct session *sess,const struct packet *
plugin_manager_session_message_dispatch(sess);
session_mq_free(plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
plug_mgr_rt->delivered_mq=NULL;
+ per_thread_packet_exdata_arrary_clean(plug_mgr_rt->plug_mgr, pkt);
return;
}
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index 6d3da99..eb45a8b 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -14,8 +14,8 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
-void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt);
-void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt);
+void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt);
+void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt);
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); \ No newline at end of file