summaryrefslogtreecommitdiff
path: root/src/plugin_manager
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-27 21:13:51 +0800
committeryangwei <[email protected]>2024-05-28 04:40:31 +0800
commit0737ab92295eca2690e875db240f1b0af495d7dc (patch)
tree388b4a678d90d4196b34f41e0517383ed1a23d92 /src/plugin_manager
parent307b2f601cff543a585eb8f04d8b9102ab96196c (diff)
🧪 test(example plugin): add packet mq example
Diffstat (limited to 'src/plugin_manager')
-rw-r--r--src/plugin_manager/plugin_manager.c155
1 files changed, 95 insertions, 60 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index cb57e37..744b160 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -19,6 +19,12 @@
struct per_thread_exdata_array;
struct per_thread_mq_array;
+struct plugin_manger_per_thread_data
+{
+ struct per_thread_exdata_array *per_thread_pkt_exdata_array;
+ struct per_thread_mq_array *per_thread_pkt_mq_array;
+};
+
struct plugin_manager_schema
{
struct stellar *st;
@@ -39,8 +45,7 @@ struct plugin_manager_schema
int tcp_stream_topic_id;
int egress_topic_id;
int control_packet_topic_id;
- struct per_thread_exdata_array *per_thread_pkt_exdata_array;
- struct per_thread_mq_array *per_thread_pkt_mq_array;
+ struct plugin_manger_per_thread_data *per_thread_data;
};
struct stellar_message;
@@ -86,7 +91,7 @@ struct stellar_message
typedef struct stellar_mq_subscriber
{
int topic_subscriber_idx;
- int plugin_id;
+ int plugin_idx;
union
{
on_session_msg_cb_func *sess_msg_cb;
@@ -244,7 +249,16 @@ PLUGIN_SPEC_LOAD_ERROR:
}
static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr);
-static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st);
+
+static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ if(st == NULL || plug_mgr == NULL)return NULL;
+ struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, 1);
+ per_thread_data->per_thread_pkt_exdata_array = per_thread_packet_exdata_arrary_new(st, plug_mgr);
+ per_thread_data->per_thread_pkt_mq_array = per_thread_mq_arrary_new(st);
+ return per_thread_data;
+}
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
@@ -254,39 +268,47 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
{
return NULL;
}
- struct plugin_manager_schema *pm = CALLOC(struct plugin_manager_schema, 1);
+ struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
if(spec_num > 0)
{
- utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd);
- utarray_reserve(pm->plugin_load_specs_array, spec_num);
+ utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
+ utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num);
}
- pm->st = st;
- stellar_plugin_manager_schema_set(st, pm);
+ plug_mgr->st = st;
+ stellar_plugin_manager_schema_set(st, plug_mgr);
- pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
- pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL);
- pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
- pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
- pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
+ plug_mgr->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
+ plug_mgr->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL);
+ plug_mgr->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
+ plug_mgr->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
+ plug_mgr->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
{
specs[i].plugin_ctx=specs[i].load_cb(st);
- utarray_push_back(pm->plugin_load_specs_array, &specs[i]);
+ utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]);
}
}
FREE(specs);
- pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm);
- pm->per_thread_pkt_mq_array=per_thread_mq_arrary_new(st, pm);
- return pm;
+ plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st, plug_mgr);
+ return plug_mgr;
}
-static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr);
-static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct per_thread_exdata_array *exdata_array);
+static void per_thread_mq_arrary_free(struct per_thread_mq_array *mq_array);
+
+static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
+{
+ if(per_thread_data == NULL || st == NULL)return;
+ per_thread_packet_exdata_arrary_free(st, per_thread_data->per_thread_pkt_exdata_array);
+ per_thread_mq_arrary_free(per_thread_data->per_thread_pkt_mq_array);
+ FREE(per_thread_data);
+ return;
+}
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
@@ -317,9 +339,18 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
utarray_free(plug_mgr->packet_mq_schema_array);
}
+ if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array);
if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
- if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
+ if(plug_mgr->registered_packet_plugin_array)
+ {
+ struct registered_packet_plugin_schema *s = NULL;
+ while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
+ {
+ if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info);
+ }
+ utarray_free(plug_mgr->registered_packet_plugin_array);
+ }
if(plug_mgr->registered_session_plugin_array)
{
struct registered_session_plugin_schema *s = NULL;
@@ -329,7 +360,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
- per_thread_packet_exdata_arrary_free(plug_mgr->st, plug_mgr);
+ plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
return;
}
@@ -407,7 +438,7 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_
static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr)
{
- if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL;
+ if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL )return NULL;
int thread_num=stellar_get_worker_thread_num(st);
struct per_thread_exdata_array *per_thread_pkt_exdata_array = CALLOC(struct per_thread_exdata_array, thread_num);
unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
@@ -418,28 +449,28 @@ static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struc
return per_thread_pkt_exdata_array;
}
-static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct per_thread_exdata_array *exdata_array)
{
- if(st == NULL || plug_mgr == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
+ if(st == NULL || exdata_array == NULL)return;
int thread_num=stellar_get_worker_thread_num(st);
for (int i = 0; i < thread_num; i++)
{
- FREE((plug_mgr->per_thread_pkt_exdata_array+i)->exdata_array);
+ FREE((exdata_array+i)->exdata_array);
}
- FREE(plug_mgr->per_thread_pkt_exdata_array);
+ FREE(exdata_array);
return;
}
static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
{
- if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL;
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL;
int tid=stellar_get_current_thread_id(plug_mgr->st);
- return (plug_mgr->per_thread_pkt_exdata_array+tid)->exdata_array;
+ return (plug_mgr->per_thread_data->per_thread_pkt_exdata_array+tid)->exdata_array;
}
static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
- if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
+ if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_data->per_thread_pkt_exdata_array == NULL)return;
unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array);
struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
for (unsigned int i = 0; i < len; i++)
@@ -607,7 +638,7 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
return 0; // success
}
-int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq)
+int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message **mq)
{
if(mq_schema_array==NULL || topic_id < 0)return -1;
unsigned int len = utarray_len(mq_schema_array);
@@ -615,7 +646,7 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->topic_id = topic_id;
msg->msg_data = data;
- DL_APPEND(mq, msg);
+ DL_APPEND(*mq, msg);
return 0;
}
@@ -625,18 +656,18 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf
* PACKET MQ *
*******************************/
-static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st)
{
- if(st == NULL || plug_mgr == NULL || plug_mgr->packet_mq_schema_array == NULL || plug_mgr->per_thread_pkt_mq_array)return NULL;
+ if(st == NULL)return NULL;
int thread_num=stellar_get_worker_thread_num(st);
struct per_thread_mq_array *per_thread_pkt_mq_array = CALLOC(struct per_thread_mq_array, thread_num);
return per_thread_pkt_mq_array;
}
-static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+static void per_thread_mq_arrary_free(struct per_thread_mq_array *mq_array)
{
- if(plug_mgr == NULL || plug_mgr->per_thread_pkt_mq_array)return;
- FREE(plug_mgr->per_thread_pkt_mq_array);
+ if(mq_array == NULL)return;
+ FREE(mq_array);
return;
}
@@ -674,12 +705,13 @@ int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only
{
if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
+ int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
if(plug_mgr->packet_mq_schema_array==NULL)return -1;
unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
- struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_id);
+ struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
if(packet_plugin_schema==NULL)return -1;
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id);
@@ -700,7 +732,7 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_id = plugin_id;
+ new_subscriber->plugin_idx = plugin_idx;
new_subscriber->pkt_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
@@ -721,7 +753,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg)
struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
assert(plug_mgr);
int tid = stellar_get_current_thread_id(st);
- return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, (plug_mgr->per_thread_pkt_mq_array+tid)->mq);
+ return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data->per_thread_pkt_mq_array+tid)->mq);
}
static void plugin_manager_packet_message_dispatch(struct packet *pkt)
@@ -736,15 +768,15 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
int tid = stellar_get_current_thread_id(st);
- struct stellar_message *mq= (plug_mgr->per_thread_pkt_mq_array+tid)->mq;
+ struct stellar_message **mq= &(plug_mgr->per_thread_data->per_thread_pkt_mq_array+tid)->mq;
struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
struct stellar_mq_topic_schema *topic;
struct registered_packet_plugin_schema *packet_plugin_schema;
- while (mq != NULL)
+ while (*mq != NULL)
{
- DL_FOREACH_SAFE(mq, mq_elt, mq_tmp)
+ DL_FOREACH_SAFE(*mq, mq_elt, mq_tmp)
{
topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array,
(unsigned int)(mq_elt->topic_id));
@@ -754,8 +786,8 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
{
if (sub_elt->pkt_msg_cb)
{
- packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_id);
- sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
}
}
if (topic->pkt_msg_free_cb)
@@ -763,7 +795,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt)
topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg);
}
}
- DL_DELETE(mq, mq_elt);
+ DL_DELETE(*mq, mq_elt);
FREE(mq_elt);
}
}
@@ -827,7 +859,7 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data)
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
assert(plug_mgr_rt);
if(plug_mgr_rt->enable_session_mq==0)return -1;
- return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->pending_mq);
+ return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq);
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
@@ -902,7 +934,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_id = plugin_id;
+ new_subscriber->plugin_idx = plugin_id;
new_subscriber->sess_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
@@ -939,18 +971,21 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
{
if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
{
- plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_id);
- session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_id);
- if(plugin_ctx_rt->state==INIT)
- {
- if(session_plugin_schema->on_ctx_new)
- {
- plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
- plugin_ctx_rt->state=ACTIVE;
- }
- }
- if(sub_elt->sess_msg_cb)sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
- session_plugin_schema->plugin_env);
+ plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (session_plugin_schema)
+ {
+ if (plugin_ctx_rt->state == INIT)
+ {
+ if (session_plugin_schema->on_ctx_new)
+ {
+ plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
+ plugin_ctx_rt->state = ACTIVE;
+ }
+ }
+ if (sub_elt->sess_msg_cb)
+ sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ }
}
cur_sub_idx++;
}