diff options
| author | yangwei <[email protected]> | 2024-07-01 17:17:45 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-07-01 18:03:21 +0800 |
| commit | a7da42f66efc93349b795189d8dbdb1f6c9c2765 (patch) | |
| tree | c85f15499fd86e39f251bac2b00f89a63de2546c /src | |
| parent | 20b65aef017c25644da73f244168f25587f171b4 (diff) | |
↩ revert(packet mq & exdata): remove correlative code
Diffstat (limited to 'src')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 285 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager.h | 1 | ||||
| -rw-r--r-- | src/plugin_manager/plugin_manager_interna.h | 28 | ||||
| -rw-r--r-- | src/stellar_on_sapp/stellar_on_sapp_api.c | 6 |
4 files changed, 0 insertions, 320 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index 9f9f7bd..3eeeb21 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -70,32 +70,6 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } - -static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) -{ - if(st == NULL)return NULL; - int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num); - return per_thread_data; -} - - -static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st) -{ - if(per_thread_data == NULL || st == NULL)return; - int thread_num=stellar_get_worker_thread_num(st); - struct plugin_manger_per_thread_data *p_data; - for (int i = 0; i < thread_num; i++) - { - p_data=per_thread_data+i; - if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array); - if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq); - } - FREE(per_thread_data); - return; -} - - struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { int spec_num; @@ -131,7 +105,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char } } FREE(specs); - plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); return plug_mgr; } @@ -156,16 +129,6 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->session_mq_schema_array); } - if(plug_mgr->packet_mq_schema_array) - { - for(unsigned int i = 0; i < utarray_len(plug_mgr->packet_mq_schema_array); i++) - { - stellar_packet_mq_destroy_topic(plug_mgr->st, i); - } - utarray_free(plug_mgr->packet_mq_schema_array); - } - - if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array); if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); if(plug_mgr->registered_packet_plugin_array) @@ -186,7 +149,6 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_session_plugin_array); } - plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); FREE(plug_mgr); return; } @@ -261,66 +223,6 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_ } /******************************* - * PACKET EXDATA * - *******************************/ - - -static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL; - int tid=stellar_get_current_thread_id(plug_mgr->st); - if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL) - { - unsigned int len = utarray_len(plug_mgr->packet_exdata_schema_array); - plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len); - } - return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array; -} - -static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt) -{ - if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return; - unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); - struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr); - if(per_thread_pkt_exdata_arrary == NULL)return; - for (unsigned int i = 0; i < len; i++) - { - void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata; - struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i); - if (exdata) - { - if (schema->pkt_free_func) - { - schema->pkt_free_func(pkt, i, exdata, schema->free_arg); - } - (per_thread_pkt_exdata_arrary + i)->exdata=NULL; - } - } -} - -int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_exdata_new_index(st, name, &plug_mgr->packet_exdata_schema_array, (void*)free_func, free_arg); -} - -int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr) -{ - if(pkt == NULL)return -1; - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); - return stellar_exdata_set(plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr); -} - -void *packet_exdata_get(struct packet *pkt, int idx) -{ - if(pkt == NULL)return NULL; - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt)); - return stellar_exdata_get( plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx); -} - - -/******************************* * SESSION EXDATA * *******************************/ @@ -462,184 +364,6 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; -/******************************* - * PACKET MQ * - *******************************/ - -static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL)return -1; - int tid = stellar_get_current_thread_id(plug_mgr->st); - plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1; - return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; -} - -static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL)return; - int tid = stellar_get_current_thread_id(plug_mgr->st); - plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0; -} - -static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr) -{ - if(plug_mgr==NULL)return 0; - int tid = stellar_get_current_thread_id(plug_mgr->st); - return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt; -} - -int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->packet_mq_schema_array); - if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1; - return topic_id; -} - -int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_get_topic_id(topic_name, plug_mgr->packet_mq_schema_array); -} - -int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->packet_mq_schema_array); -} - -int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id) -{ - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array); - if(ret==1)plug_mgr->packet_mq_topic_num-=1; - return ret; -} - -//return 0 if success, otherwise return -1. -int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only -{ - if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin - int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - if(plug_mgr == NULL || plug_mgr->packet_mq_schema_array==NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; - - unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); - if(packet_plugin_schema==NULL)return -1; - - struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id); - if(topic==NULL)return -1; - - if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL) - { - utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd); - } - - // if plugin already subscribe current topic, return 0 - struct stellar_mq_subscriber_info *p=NULL; - while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p))) - { - if(p->topic_id==topic_id) - { - struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; - int cnt=0; - while(tmp_subscriber) - { - if(cnt==p->subscriber_idx) - { - tmp_subscriber->pkt_msg_cb=plugin_on_msg_cb; - return 0; - } - cnt++; - tmp_subscriber=tmp_subscriber->next; - } - } - }; - - struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_idx = plugin_idx; - new_subscriber->pkt_msg_cb = plugin_on_msg_cb; - DL_APPEND(topic->subscribers, new_subscriber); - - struct stellar_mq_subscriber_info sub_info; - memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); - sub_info.topic_id=topic_id; - sub_info.subscriber_idx=topic->subscriber_cnt; - utarray_push_back(packet_plugin_schema->registed_packet_mq_subscriber_info, &sub_info); - topic->subscriber_cnt+=1; - plug_mgr->packet_topic_subscriber_num+=1; - return 0; -} - -int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg) -{ - struct stellar *st = packet_stellar_get(pkt); - assert(st); - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - int tid = stellar_get_current_thread_id(st); - if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg - int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq)); - if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr); - return ret; -} - -// TODO: limit maximum pub message number in one loop -static void plugin_manager_packet_message_dispatch(struct packet *pkt) -{ - - struct stellar *st = packet_stellar_get(pkt); - assert(st); - struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); - assert(plug_mgr); - - if(plug_mgr->packet_mq_schema_array==NULL)return; - - int tid = stellar_get_current_thread_id(st); - - struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq); - - struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; - struct stellar_mq_subscriber *sub_elt, *sub_tmp; - struct stellar_mq_topic_schema *topic; - struct registered_packet_plugin_schema *packet_plugin_schema; - while (*mq != NULL) - { - DL_FOREACH_SAFE(*mq, mq_elt, mq_tmp) - { - topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, - (unsigned int)(mq_elt->topic_id)); - if (topic) - { - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - if (sub_elt->pkt_msg_cb) - { - packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); - if(packet_plugin_schema) - { - sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); - } - } - } - if (topic->pkt_msg_free_cb) - { - topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg); - } - } - DL_DELETE(*mq, mq_elt); - FREE(mq_elt); - } - } - return; -} /******************************* * SESSION MQ * @@ -984,7 +708,6 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; unsigned char ip_proto=packet_get_ip_protocol(pkt); - stellar_current_thread_packet_mq_counter_reset(plug_mgr); while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet) @@ -992,17 +715,9 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st p->on_packet(pkt, ip_proto, p->plugin_env); } } - plugin_manager_packet_message_dispatch(pkt); return; } -void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) -{ - if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; - plugin_manager_packet_message_dispatch(pkt); - per_thread_packet_exdata_arrary_clean(plug_mgr, pkt); -} - /********************************************* * PLUGIN MANAGER POLLING PLUGIN * *********************************************/ diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h index 6810da8..2d07c25 100644 --- a/src/plugin_manager/plugin_manager.h +++ b/src/plugin_manager/plugin_manager.h @@ -9,7 +9,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); -void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); //return polling work state, 0: idle, 1: working int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h index 0cf55d6..6b2beba 100644 --- a/src/plugin_manager/plugin_manager_interna.h +++ b/src/plugin_manager/plugin_manager_interna.h @@ -4,44 +4,22 @@ #include "stellar/session_exdata.h" #include "stellar/session_mq.h" -#include "stellar/packet_exdata.h" -#include "stellar/packet_mq.h" #include "bitmap/bitmap.h" #include "uthash/utarray.h" -struct per_thread_exdata_array -{ - struct stellar_exdata *exdata_array; -}; - struct stellar_message; -struct per_thread_mq -{ - struct stellar_message *mq; - int pub_msg_cnt; -}; - -struct plugin_manger_per_thread_data -{ - struct per_thread_exdata_array per_thread_pkt_exdata_array; - struct per_thread_mq per_thread_pkt_mq; -}; struct plugin_manager_schema { struct stellar *st; UT_array *plugin_load_specs_array; - UT_array *packet_exdata_schema_array; - UT_array *packet_mq_schema_array; UT_array *session_exdata_schema_array; UT_array *session_mq_schema_array; UT_array *registered_session_plugin_array; UT_array *registered_packet_plugin_array; UT_array *registered_polling_plugin_array; - int packet_mq_topic_num; int session_mq_topic_num; - int packet_topic_subscriber_num; int session_topic_subscriber_num; int tcp_topic_id; int tcp_stream_topic_id; @@ -49,12 +27,9 @@ struct plugin_manager_schema int egress_topic_id; int control_packet_topic_id; int max_message_dispatch; - struct plugin_manger_per_thread_data *per_thread_data; }__attribute__((aligned(sizeof(void*)))); - - struct stellar_exdata { void *exdata; @@ -69,7 +44,6 @@ struct stellar_exdata_schema { void *free_func; session_exdata_free *sess_free_func; - packet_exdata_free *pkt_free_func; }; void *free_arg; @@ -91,7 +65,6 @@ typedef struct stellar_mq_subscriber union { on_session_msg_cb_func *sess_msg_cb; - on_packet_msg_cb_func *pkt_msg_cb; }; struct stellar_mq_subscriber *next, *prev; }stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); @@ -108,7 +81,6 @@ struct stellar_mq_topic_schema { void *free_cb; session_msg_free_cb_func *sess_msg_free_cb; - packet_msg_free_cb_func *pkt_msg_free_cb; }; struct stellar_mq_subscriber *subscribers; }__attribute__((aligned(sizeof(void*)))); diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c index f7dd17d..5974ace 100644 --- a/src/stellar_on_sapp/stellar_on_sapp_api.c +++ b/src/stellar_on_sapp/stellar_on_sapp_api.c @@ -60,7 +60,6 @@ inline struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct /********************************************* * STELLAR INIT & EXIT ON SAPP * *********************************************/ - struct stellar *stellar_init_on_sapp(const char *toml_conf_path) { struct stellar *st = CALLOC(struct stellar, 1); @@ -157,7 +156,6 @@ void session_defer_on_sapp(struct session *sess) if(pkt->raw_pkt) { plugin_manager_on_session_egress(sess, pkt); - plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt); } } sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt @@ -193,10 +191,6 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void } //FIXME: defer TCP/UDP packet on session update plugin_manager_on_packet_ingress(st->plug_mgr, &pkt); - if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP) - { - plugin_manager_on_packet_egress(st->plug_mgr, &pkt); - } } inline int polling_on_sapp(struct stellar *st) |
