diff options
| author | yangwei <[email protected]> | 2024-05-24 02:52:27 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-26 18:39:47 +0800 |
| commit | 6e3992676dbecf43d6552db703995714644aba50 (patch) | |
| tree | 61c8b0fe54fe3eeb00dcb3c9d16973492f3b0a57 | |
| parent | 41e35a2f98a51b7d11402d376cb9b055793a1af7 (diff) | |
✨ feat(packet exdata): clean pkt exdata after egress
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 87 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.h | 4 |
2 files changed, 77 insertions, 14 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 9111fb3..65a7200 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -16,6 +16,7 @@ #include <threads.h> +struct per_thread_exdata_array; struct plugin_manager_schema { @@ -34,15 +35,22 @@ struct plugin_manager_schema int tcp_stream_topic_id; int egress_topic_id; int control_packet_topic_id; + struct per_thread_exdata_array *per_thread_pkt_exdata_array; }; - - struct stellar_exdata { void *exdata; }; +struct per_thread_exdata_array +{ + struct stellar_exdata *exdata_array; +}; + + + + struct stellar_exdata_schema { char *name; @@ -160,6 +168,7 @@ struct plugin_specific UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; + static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) { *spec_num = 0; @@ -218,6 +227,8 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } +static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr); + struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { int spec_num; @@ -252,9 +263,12 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char } } FREE(specs); + pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm); return pm; } +static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr); + void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { struct plugin_specific *p=NULL; @@ -287,6 +301,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_session_plugin_array); } + per_thread_packet_exdata_arrary_free(plug_mgr->st, plug_mgr); FREE(plug_mgr); return; } @@ -361,7 +376,58 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_ /******************************* * PACKET EXDATA * *******************************/ -__thread struct stellar_exdata *per_thread_pkt_exdata_arrary=NULL; + +static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL; + int thread_num=stellar_get_worker_thread_num(st); + struct per_thread_exdata_array *per_thread_pkt_exdata_array = CALLOC(struct per_thread_exdata_array, thread_num); + unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); + for (int i = 0; i < thread_num; i++) + { + (per_thread_pkt_exdata_array+i)->exdata_array = CALLOC(struct stellar_exdata, len); + } + return per_thread_pkt_exdata_array; +} + +static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return; + int thread_num=stellar_get_worker_thread_num(st); + for (int i = 0; i < thread_num; i++) + { + FREE((plug_mgr->per_thread_pkt_exdata_array+i)->exdata_array); + } + FREE(plug_mgr->per_thread_pkt_exdata_array); + return; +} + +static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL; + int tid=stellar_get_current_thread_id(plug_mgr->st); + return (plug_mgr->per_thread_pkt_exdata_array+tid)->exdata_array; +} + +static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return; + unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); + struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr); + for (unsigned int i = 0; i < len; i++) + { + void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata; + struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i); + if (exdata) + { + if (schema->pkt_free_func) + { + schema->pkt_free_func(pkt, i, exdata, schema->free_arg); + } + (per_thread_pkt_exdata_arrary + i)->exdata=NULL; + } + } +} int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg) { @@ -374,11 +440,7 @@ int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr) { if(pkt == NULL)return -1; struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); - if(per_thread_pkt_exdata_arrary == NULL) - { - per_thread_pkt_exdata_arrary=CALLOC(struct stellar_exdata, utarray_len(plug_mgr->packet_exdata_schema_array)); - } - stellar_exdata_set(plug_mgr->session_exdata_schema_array, per_thread_pkt_exdata_arrary, idx, ex_ptr); + stellar_exdata_set(plug_mgr->session_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr); return 0; } @@ -386,7 +448,7 @@ void *packet_exdata_get(struct packet *pkt, int idx) { if(pkt == NULL)return NULL; struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); - return stellar_exdata_get( plug_mgr->session_exdata_schema_array, per_thread_pkt_exdata_arrary, idx); + return stellar_exdata_get( plug_mgr->session_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx); } @@ -394,7 +456,7 @@ void *packet_exdata_get(struct packet *pkt, int idx) * SESSION EXDATA * *******************************/ -//TODO: allow exdata new index in plugin init stage +//TODO: limit exdata new index in plugin init stage int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg) { struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); @@ -877,7 +939,7 @@ int stellar_session_plugin_register(struct stellar *st, return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index } -void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt) +void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; @@ -905,7 +967,7 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet return; } -void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt) +void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); if(plug_mgr_rt==NULL)return; @@ -913,6 +975,7 @@ void plugin_manager_on_session_egress(struct session *sess,const struct packet * plugin_manager_session_message_dispatch(sess); session_mq_free(plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array); plug_mgr_rt->delivered_mq=NULL; + per_thread_packet_exdata_arrary_clean(plug_mgr_rt->plug_mgr, pkt); return; } diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h index 6d3da99..eb45a8b 100644 --- a/src/plugin_manager/plugin_manager.h +++ b/src/plugin_manager/plugin_manager.h @@ -14,8 +14,8 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); //publish and dispatch session msg(msg, pkt) on session_mq -void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt); -void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt); +void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt); struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
\ No newline at end of file |
