summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-07-01 17:17:45 +0800
committeryangwei <[email protected]>2024-07-01 18:03:21 +0800
commita7da42f66efc93349b795189d8dbdb1f6c9c2765 (patch)
treec85f15499fd86e39f251bac2b00f89a63de2546c /src
parent20b65aef017c25644da73f244168f25587f171b4 (diff)
↩ revert(packet mq & exdata): remove correlative code
Diffstat (limited to 'src')
-rw-r--r--src/plugin_manager/plugin_manager.c285
-rw-r--r--src/plugin_manager/plugin_manager.h1
-rw-r--r--src/plugin_manager/plugin_manager_interna.h28
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c6
4 files changed, 0 insertions, 320 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 9f9f7bd..3eeeb21 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -70,32 +70,6 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
-
-static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
-{
- if(st == NULL)return NULL;
- int thread_num=stellar_get_worker_thread_num(st);
- struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num);
- return per_thread_data;
-}
-
-
-static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
-{
- if(per_thread_data == NULL || st == NULL)return;
- int thread_num=stellar_get_worker_thread_num(st);
- struct plugin_manger_per_thread_data *p_data;
- for (int i = 0; i < thread_num; i++)
- {
- p_data=per_thread_data+i;
- if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
- if(p_data->per_thread_pkt_mq.mq)FREE(p_data->per_thread_pkt_mq.mq);
- }
- FREE(per_thread_data);
- return;
-}
-
-
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
@@ -131,7 +105,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
}
}
FREE(specs);
- plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
return plug_mgr;
}
@@ -156,16 +129,6 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->session_mq_schema_array);
}
- if(plug_mgr->packet_mq_schema_array)
- {
- for(unsigned int i = 0; i < utarray_len(plug_mgr->packet_mq_schema_array); i++)
- {
- stellar_packet_mq_destroy_topic(plug_mgr->st, i);
- }
- utarray_free(plug_mgr->packet_mq_schema_array);
- }
-
- if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array);
if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array)
@@ -186,7 +149,6 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
- plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
return;
}
@@ -261,66 +223,6 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_
}
/*******************************
- * PACKET EXDATA *
- *******************************/
-
-
-static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL;
- int tid=stellar_get_current_thread_id(plug_mgr->st);
- if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL)
- {
- unsigned int len = utarray_len(plug_mgr->packet_exdata_schema_array);
- plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len);
- }
- return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array;
-}
-
-static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
-{
- if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return;
- unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
- struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
- if(per_thread_pkt_exdata_arrary == NULL)return;
- for (unsigned int i = 0; i < len; i++)
- {
- void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
- struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->packet_exdata_schema_array, i);
- if (exdata)
- {
- if (schema->pkt_free_func)
- {
- schema->pkt_free_func(pkt, i, exdata, schema->free_arg);
- }
- (per_thread_pkt_exdata_arrary + i)->exdata=NULL;
- }
- }
-}
-
-int stellar_packet_exdata_new_index(struct stellar *st, const char *name, packet_exdata_free *free_func,void *free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_exdata_new_index(st, name, &plug_mgr->packet_exdata_schema_array, (void*)free_func, free_arg);
-}
-
-int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
-{
- if(pkt == NULL)return -1;
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
- return stellar_exdata_set(plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
-}
-
-void *packet_exdata_get(struct packet *pkt, int idx)
-{
- if(pkt == NULL)return NULL;
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(packet_stellar_get(pkt));
- return stellar_exdata_get( plug_mgr->packet_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
-}
-
-
-/*******************************
* SESSION EXDATA *
*******************************/
@@ -462,184 +364,6 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr
UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
-/*******************************
- * PACKET MQ *
- *******************************/
-
-static inline int stellar_current_thread_packet_mq_counter_inc(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL)return -1;
- int tid = stellar_get_current_thread_id(plug_mgr->st);
- plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt+=1;
- return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
-}
-
-static inline void stellar_current_thread_packet_mq_counter_reset(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL)return;
- int tid = stellar_get_current_thread_id(plug_mgr->st);
- plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt=0;
-}
-
-static inline int stellar_current_thread_packet_mq_counter_get(struct plugin_manager_schema *plug_mgr)
-{
- if(plug_mgr==NULL)return 0;
- int tid = stellar_get_current_thread_id(plug_mgr->st);
- return plug_mgr->per_thread_data[tid].per_thread_pkt_mq.pub_msg_cnt;
-}
-
-int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->packet_mq_schema_array);
- if(topic_id>=0)plug_mgr->packet_mq_topic_num+=1;
- return topic_id;
-}
-
-int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_get_topic_id(topic_name, plug_mgr->packet_mq_schema_array);
-}
-
-int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->packet_mq_schema_array);
-}
-
-int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int ret = stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array);
- if(ret==1)plug_mgr->packet_mq_topic_num-=1;
- return ret;
-}
-
-//return 0 if success, otherwise return -1.
-int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only
-{
- if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
- int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr == NULL || plug_mgr->packet_mq_schema_array==NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
-
- unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
- struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
- if(packet_plugin_schema==NULL)return -1;
-
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id);
- if(topic==NULL)return -1;
-
- if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
- {
- utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
- }
-
- // if plugin already subscribe current topic, return 0
- struct stellar_mq_subscriber_info *p=NULL;
- while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p)))
- {
- if(p->topic_id==topic_id)
- {
- struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
- int cnt=0;
- while(tmp_subscriber)
- {
- if(cnt==p->subscriber_idx)
- {
- tmp_subscriber->pkt_msg_cb=plugin_on_msg_cb;
- return 0;
- }
- cnt++;
- tmp_subscriber=tmp_subscriber->next;
- }
- }
- };
-
- struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_idx = plugin_idx;
- new_subscriber->pkt_msg_cb = plugin_on_msg_cb;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- struct stellar_mq_subscriber_info sub_info;
- memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
- sub_info.topic_id=topic_id;
- sub_info.subscriber_idx=topic->subscriber_cnt;
- utarray_push_back(packet_plugin_schema->registed_packet_mq_subscriber_info, &sub_info);
- topic->subscriber_cnt+=1;
- plug_mgr->packet_topic_subscriber_num+=1;
- return 0;
-}
-
-int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg)
-{
- struct stellar *st = packet_stellar_get(pkt);
- assert(st);
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
- int tid = stellar_get_current_thread_id(st);
- if(stellar_current_thread_packet_mq_counter_get(plug_mgr) >= plug_mgr->max_message_dispatch)return -1;//packet mq donot contain intrinisic msg
- int ret=stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq));
- if(ret==0)stellar_current_thread_packet_mq_counter_inc(plug_mgr);
- return ret;
-}
-
-// TODO: limit maximum pub message number in one loop
-static void plugin_manager_packet_message_dispatch(struct packet *pkt)
-{
-
- struct stellar *st = packet_stellar_get(pkt);
- assert(st);
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- assert(plug_mgr);
-
- if(plug_mgr->packet_mq_schema_array==NULL)return;
-
- int tid = stellar_get_current_thread_id(st);
-
- struct stellar_message **mq= &(plug_mgr->per_thread_data[tid].per_thread_pkt_mq.mq);
-
- struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
- struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- struct stellar_mq_topic_schema *topic;
- struct registered_packet_plugin_schema *packet_plugin_schema;
- while (*mq != NULL)
- {
- DL_FOREACH_SAFE(*mq, mq_elt, mq_tmp)
- {
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
- if (topic)
- {
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- if (sub_elt->pkt_msg_cb)
- {
- packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if(packet_plugin_schema)
- {
- sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
- }
- }
- }
- if (topic->pkt_msg_free_cb)
- {
- topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg);
- }
- }
- DL_DELETE(*mq, mq_elt);
- FREE(mq_elt);
- }
- }
- return;
-}
/*******************************
* SESSION MQ *
@@ -984,7 +708,6 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
unsigned char ip_proto=packet_get_ip_protocol(pkt);
- stellar_current_thread_packet_mq_counter_reset(plug_mgr);
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
@@ -992,17 +715,9 @@ void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, st
p->on_packet(pkt, ip_proto, p->plugin_env);
}
}
- plugin_manager_packet_message_dispatch(pkt);
return;
}
-void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
-{
- if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
- plugin_manager_packet_message_dispatch(pkt);
- per_thread_packet_exdata_arrary_clean(plug_mgr, pkt);
-}
-
/*********************************************
* PLUGIN MANAGER POLLING PLUGIN *
*********************************************/
diff --git a/src/plugin_manager/plugin_manager.h b/src/plugin_manager/plugin_manager.h
index 6810da8..2d07c25 100644
--- a/src/plugin_manager/plugin_manager.h
+++ b/src/plugin_manager/plugin_manager.h
@@ -9,7 +9,6 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
diff --git a/src/plugin_manager/plugin_manager_interna.h b/src/plugin_manager/plugin_manager_interna.h
index 0cf55d6..6b2beba 100644
--- a/src/plugin_manager/plugin_manager_interna.h
+++ b/src/plugin_manager/plugin_manager_interna.h
@@ -4,44 +4,22 @@
#include "stellar/session_exdata.h"
#include "stellar/session_mq.h"
-#include "stellar/packet_exdata.h"
-#include "stellar/packet_mq.h"
#include "bitmap/bitmap.h"
#include "uthash/utarray.h"
-struct per_thread_exdata_array
-{
- struct stellar_exdata *exdata_array;
-};
-
struct stellar_message;
-struct per_thread_mq
-{
- struct stellar_message *mq;
- int pub_msg_cnt;
-};
-
-struct plugin_manger_per_thread_data
-{
- struct per_thread_exdata_array per_thread_pkt_exdata_array;
- struct per_thread_mq per_thread_pkt_mq;
-};
struct plugin_manager_schema
{
struct stellar *st;
UT_array *plugin_load_specs_array;
- UT_array *packet_exdata_schema_array;
- UT_array *packet_mq_schema_array;
UT_array *session_exdata_schema_array;
UT_array *session_mq_schema_array;
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
- int packet_mq_topic_num;
int session_mq_topic_num;
- int packet_topic_subscriber_num;
int session_topic_subscriber_num;
int tcp_topic_id;
int tcp_stream_topic_id;
@@ -49,12 +27,9 @@ struct plugin_manager_schema
int egress_topic_id;
int control_packet_topic_id;
int max_message_dispatch;
- struct plugin_manger_per_thread_data *per_thread_data;
}__attribute__((aligned(sizeof(void*))));
-
-
struct stellar_exdata
{
void *exdata;
@@ -69,7 +44,6 @@ struct stellar_exdata_schema
{
void *free_func;
session_exdata_free *sess_free_func;
- packet_exdata_free *pkt_free_func;
};
void *free_arg;
@@ -91,7 +65,6 @@ typedef struct stellar_mq_subscriber
union
{
on_session_msg_cb_func *sess_msg_cb;
- on_packet_msg_cb_func *pkt_msg_cb;
};
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
@@ -108,7 +81,6 @@ struct stellar_mq_topic_schema
{
void *free_cb;
session_msg_free_cb_func *sess_msg_free_cb;
- packet_msg_free_cb_func *pkt_msg_free_cb;
};
struct stellar_mq_subscriber *subscribers;
}__attribute__((aligned(sizeof(void*))));
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index f7dd17d..5974ace 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -60,7 +60,6 @@ inline struct plugin_manager_runtime * session_plugin_manager_runtime_get(struct
/*********************************************
* STELLAR INIT & EXIT ON SAPP *
*********************************************/
-
struct stellar *stellar_init_on_sapp(const char *toml_conf_path)
{
struct stellar *st = CALLOC(struct stellar, 1);
@@ -157,7 +156,6 @@ void session_defer_on_sapp(struct session *sess)
if(pkt->raw_pkt)
{
plugin_manager_on_session_egress(sess, pkt);
- plugin_manager_on_packet_egress(sess->st->plug_mgr, pkt);
}
}
sess->cur_pkt.raw_pkt=NULL;//clear raw_pkt
@@ -193,10 +191,6 @@ void packet_update_on_sapp(struct stellar *st, struct streaminfo *pstream, void
}
//FIXME: defer TCP/UDP packet on session update
plugin_manager_on_packet_ingress(st->plug_mgr, &pkt);
- if(pkt.ip_proto!=IPPROTO_TCP && pkt.ip_proto!=IPPROTO_UDP)
- {
- plugin_manager_on_packet_egress(st->plug_mgr, &pkt);
- }
}
inline int polling_on_sapp(struct stellar *st)