diff options
| author | yangwei <[email protected]> | 2024-05-27 21:13:51 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-05-28 04:40:31 +0800 |
| commit | 0737ab92295eca2690e875db240f1b0af495d7dc (patch) | |
| tree | 388b4a678d90d4196b34f41e0517383ed1a23d92 /src/plugin_manager | |
| parent | 307b2f601cff543a585eb8f04d8b9102ab96196c (diff) | |
🧪 test(example plugin): add packet mq example
Diffstat (limited to 'src/plugin_manager')
| -rw-r--r-- | src/plugin_manager/plugin_manager.c | 155 |
1 files changed, 95 insertions, 60 deletions
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c index cb57e37..744b160 100644 --- a/src/plugin_manager/plugin_manager.c +++ b/src/plugin_manager/plugin_manager.c @@ -19,6 +19,12 @@ struct per_thread_exdata_array; struct per_thread_mq_array; +struct plugin_manger_per_thread_data +{ + struct per_thread_exdata_array *per_thread_pkt_exdata_array; + struct per_thread_mq_array *per_thread_pkt_mq_array; +}; + struct plugin_manager_schema { struct stellar *st; @@ -39,8 +45,7 @@ struct plugin_manager_schema int tcp_stream_topic_id; int egress_topic_id; int control_packet_topic_id; - struct per_thread_exdata_array *per_thread_pkt_exdata_array; - struct per_thread_mq_array *per_thread_pkt_mq_array; + struct plugin_manger_per_thread_data *per_thread_data; }; struct stellar_message; @@ -86,7 +91,7 @@ struct stellar_message typedef struct stellar_mq_subscriber { int topic_subscriber_idx; - int plugin_id; + int plugin_idx; union { on_session_msg_cb_func *sess_msg_cb; @@ -244,7 +249,16 @@ PLUGIN_SPEC_LOAD_ERROR: } static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr); -static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr); +static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st); + +static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + if(st == NULL || plug_mgr == NULL)return NULL; + struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, 1); + per_thread_data->per_thread_pkt_exdata_array = per_thread_packet_exdata_arrary_new(st, plug_mgr); + per_thread_data->per_thread_pkt_mq_array = per_thread_mq_arrary_new(st); + return per_thread_data; +} struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path) { @@ -254,39 +268,47 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char { return NULL; } - struct plugin_manager_schema *pm = CALLOC(struct plugin_manager_schema, 1); + struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); if(spec_num > 0) { - utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd); - utarray_reserve(pm->plugin_load_specs_array, spec_num); + utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); + utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num); } - pm->st = st; - stellar_plugin_manager_schema_set(st, pm); + plug_mgr->st = st; + stellar_plugin_manager_schema_set(st, plug_mgr); - pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL); - pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL); - pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL); - pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); - pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); + plug_mgr->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL); + plug_mgr->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, NULL, NULL); + plug_mgr->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL); + plug_mgr->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); + plug_mgr->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); for(int i = 0; i < spec_num; i++) { if (specs[i].load_cb != NULL) { specs[i].plugin_ctx=specs[i].load_cb(st); - utarray_push_back(pm->plugin_load_specs_array, &specs[i]); + utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]); } } FREE(specs); - pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm); - pm->per_thread_pkt_mq_array=per_thread_mq_arrary_new(st, pm); - return pm; + plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st, plug_mgr); + return plug_mgr; } -static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr); -static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr); +static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct per_thread_exdata_array *exdata_array); +static void per_thread_mq_arrary_free(struct per_thread_mq_array *mq_array); + +static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st) +{ + if(per_thread_data == NULL || st == NULL)return; + per_thread_packet_exdata_arrary_free(st, per_thread_data->per_thread_pkt_exdata_array); + per_thread_mq_arrary_free(per_thread_data->per_thread_pkt_mq_array); + FREE(per_thread_data); + return; +} void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { @@ -317,9 +339,18 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) utarray_free(plug_mgr->packet_mq_schema_array); } + if(plug_mgr->packet_exdata_schema_array)utarray_free(plug_mgr->packet_exdata_schema_array); if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); - if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); + if(plug_mgr->registered_packet_plugin_array) + { + struct registered_packet_plugin_schema *s = NULL; + while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s))) + { + if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info); + } + utarray_free(plug_mgr->registered_packet_plugin_array); + } if(plug_mgr->registered_session_plugin_array) { struct registered_session_plugin_schema *s = NULL; @@ -329,7 +360,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_session_plugin_array); } - per_thread_packet_exdata_arrary_free(plug_mgr->st, plug_mgr); + plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); FREE(plug_mgr); return; } @@ -407,7 +438,7 @@ void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_ static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr) { - if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL; + if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL )return NULL; int thread_num=stellar_get_worker_thread_num(st); struct per_thread_exdata_array *per_thread_pkt_exdata_array = CALLOC(struct per_thread_exdata_array, thread_num); unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); @@ -418,28 +449,28 @@ static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struc return per_thread_pkt_exdata_array; } -static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr) +static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct per_thread_exdata_array *exdata_array) { - if(st == NULL || plug_mgr == NULL || plug_mgr->per_thread_pkt_exdata_array)return; + if(st == NULL || exdata_array == NULL)return; int thread_num=stellar_get_worker_thread_num(st); for (int i = 0; i < thread_num; i++) { - FREE((plug_mgr->per_thread_pkt_exdata_array+i)->exdata_array); + FREE((exdata_array+i)->exdata_array); } - FREE(plug_mgr->per_thread_pkt_exdata_array); + FREE(exdata_array); return; } static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) { - if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return NULL; + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL)return NULL; int tid=stellar_get_current_thread_id(plug_mgr->st); - return (plug_mgr->per_thread_pkt_exdata_array+tid)->exdata_array; + return (plug_mgr->per_thread_data->per_thread_pkt_exdata_array+tid)->exdata_array; } static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { - if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return; + if(plug_mgr==NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_data->per_thread_pkt_exdata_array == NULL)return; unsigned int len=utarray_len(plug_mgr->packet_exdata_schema_array); struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr); for (unsigned int i = 0; i < len; i++) @@ -607,7 +638,7 @@ int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array) return 0; // success } -int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq) +int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message **mq) { if(mq_schema_array==NULL || topic_id < 0)return -1; unsigned int len = utarray_len(mq_schema_array); @@ -615,7 +646,7 @@ int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_arr struct stellar_message *msg= CALLOC(struct stellar_message,1); msg->topic_id = topic_id; msg->msg_data = data; - DL_APPEND(mq, msg); + DL_APPEND(*mq, msg); return 0; } @@ -625,18 +656,18 @@ UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_inf * PACKET MQ * *******************************/ -static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr) +static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st) { - if(st == NULL || plug_mgr == NULL || plug_mgr->packet_mq_schema_array == NULL || plug_mgr->per_thread_pkt_mq_array)return NULL; + if(st == NULL)return NULL; int thread_num=stellar_get_worker_thread_num(st); struct per_thread_mq_array *per_thread_pkt_mq_array = CALLOC(struct per_thread_mq_array, thread_num); return per_thread_pkt_mq_array; } -static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr) +static void per_thread_mq_arrary_free(struct per_thread_mq_array *mq_array) { - if(plug_mgr == NULL || plug_mgr->per_thread_pkt_mq_array)return; - FREE(plug_mgr->per_thread_pkt_mq_array); + if(mq_array == NULL)return; + FREE(mq_array); return; } @@ -674,12 +705,13 @@ int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id) int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only { if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin + int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); if(plug_mgr->packet_mq_schema_array==NULL)return -1; unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array); if (len <= (unsigned int)topic_id)return -1; - struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_id); + struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); if(packet_plugin_schema==NULL)return -1; struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id); @@ -700,7 +732,7 @@ int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_id = plugin_id; + new_subscriber->plugin_idx = plugin_idx; new_subscriber->pkt_msg_cb = plugin_on_msg_cb; DL_APPEND(topic->subscribers, new_subscriber); @@ -721,7 +753,7 @@ int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg) struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st); assert(plug_mgr); int tid = stellar_get_current_thread_id(st); - return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, (plug_mgr->per_thread_pkt_mq_array+tid)->mq); + return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, &(plug_mgr->per_thread_data->per_thread_pkt_mq_array+tid)->mq); } static void plugin_manager_packet_message_dispatch(struct packet *pkt) @@ -736,15 +768,15 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) int tid = stellar_get_current_thread_id(st); - struct stellar_message *mq= (plug_mgr->per_thread_pkt_mq_array+tid)->mq; + struct stellar_message **mq= &(plug_mgr->per_thread_data->per_thread_pkt_mq_array+tid)->mq; struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; struct stellar_mq_subscriber *sub_elt, *sub_tmp; struct stellar_mq_topic_schema *topic; struct registered_packet_plugin_schema *packet_plugin_schema; - while (mq != NULL) + while (*mq != NULL) { - DL_FOREACH_SAFE(mq, mq_elt, mq_tmp) + DL_FOREACH_SAFE(*mq, mq_elt, mq_tmp) { topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)(mq_elt->topic_id)); @@ -754,8 +786,8 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) { if (sub_elt->pkt_msg_cb) { - packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_id); - sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); + packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); + if(packet_plugin_schema)sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env); } } if (topic->pkt_msg_free_cb) @@ -763,7 +795,7 @@ static void plugin_manager_packet_message_dispatch(struct packet *pkt) topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg); } } - DL_DELETE(mq, mq_elt); + DL_DELETE(*mq, mq_elt); FREE(mq_elt); } } @@ -827,7 +859,7 @@ int session_mq_publish_message(struct session *sess, int topic_id, void *data) struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess); assert(plug_mgr_rt); if(plug_mgr_rt->enable_session_mq==0)return -1; - return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->pending_mq); + return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, &plug_mgr_rt->pending_mq); } static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) @@ -902,7 +934,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->plugin_id = plugin_id; + new_subscriber->plugin_idx = plugin_id; new_subscriber->sess_msg_cb = plugin_on_msg_cb; DL_APPEND(topic->subscribers, new_subscriber); @@ -939,18 +971,21 @@ static void plugin_manager_session_message_dispatch(struct session *sess) { if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0) { - plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_id); - session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_id); - if(plugin_ctx_rt->state==INIT) - { - if(session_plugin_schema->on_ctx_new) - { - plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - plugin_ctx_rt->state=ACTIVE; - } - } - if(sub_elt->sess_msg_cb)sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, - session_plugin_schema->plugin_env); + plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_idx); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (session_plugin_schema) + { + if (plugin_ctx_rt->state == INIT) + { + if (session_plugin_schema->on_ctx_new) + { + plugin_ctx_rt->plugin_ctx = session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); + plugin_ctx_rt->state = ACTIVE; + } + } + if (sub_elt->sess_msg_cb) + sub_elt->sess_msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); + } } cur_sub_idx++; } |
