summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-24 04:57:48 +0800
committeryangwei <[email protected]>2024-05-26 18:39:47 +0800
commitdfa960c6ec7d99f5d833046d579599d10674bac2 (patch)
tree4df0d1f872830b4e27eac4b1ae6437b591874f2a
parent6e3992676dbecf43d6552db703995714644aba50 (diff)
✨ feat(packet mq): build success, to be tested
-rw-r--r--include/stellar/packet_mq.h4
-rw-r--r--include/stellar/session_mq.h2
-rw-r--r--src/plugin_manager/plugin_manager.c351
-rw-r--r--src/stellar_on_sapp/stellar_on_sapp_api.c1
4 files changed, 289 insertions, 69 deletions
diff --git a/include/stellar/packet_mq.h b/include/stellar/packet_mq.h
index b21c46a..94bb39b 100644
--- a/include/stellar/packet_mq.h
+++ b/include/stellar/packet_mq.h
@@ -3,11 +3,11 @@
#include "stellar.h"
//session mq
-typedef void packet_msg_free_cb_func(void *msg, void *msg_free_arg);
+typedef void packet_msg_free_cb_func(struct packet *pkt, void *msg, void *msg_free_arg);
typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
//return topic_id
-int stellar_pakcet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name);
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
index a59e6e5..cddde81 100644
--- a/include/stellar/session_mq.h
+++ b/include/stellar/session_mq.h
@@ -3,7 +3,7 @@
#include "stellar.h"
//session mq
-typedef void session_msg_free_cb_func(void *msg, void *msg_free_arg);
+typedef void session_msg_free_cb_func(struct session *sess, void *msg, void *msg_free_arg);
typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
//return topic_id
diff --git a/src/plugin_manager/plugin_manager.c b/src/plugin_manager/plugin_manager.c
index 65a7200..cb57e37 100644
--- a/src/plugin_manager/plugin_manager.c
+++ b/src/plugin_manager/plugin_manager.c
@@ -17,27 +17,39 @@
struct per_thread_exdata_array;
+struct per_thread_mq_array;
struct plugin_manager_schema
{
struct stellar *st;
UT_array *packet_exdata_schema_array;
+ UT_array *packet_mq_schema_array;
UT_array *session_exdata_schema_array;
UT_array *plugin_load_specs_array;
UT_array *session_mq_schema_array;
UT_array *registered_session_plugin_array;
UT_array *registered_packet_plugin_array;
UT_array *registered_polling_plugin_array;
- int topic_num;
- int subscriber_num;
+ int packet_mq_topic_num;
+ int session_mq_topic_num;
+ int packet_topic_subscriber_num;
+ int session_topic_subscriber_num;
int tcp_topic_id;
int udp_topic_id;
int tcp_stream_topic_id;
int egress_topic_id;
int control_packet_topic_id;
struct per_thread_exdata_array *per_thread_pkt_exdata_array;
+ struct per_thread_mq_array *per_thread_pkt_mq_array;
};
+struct stellar_message;
+struct per_thread_mq_array
+{
+ struct stellar_message *mq;
+};
+
+
struct stellar_exdata
{
void *exdata;
@@ -49,8 +61,6 @@ struct per_thread_exdata_array
};
-
-
struct stellar_exdata_schema
{
char *name;
@@ -76,7 +86,7 @@ struct stellar_message
typedef struct stellar_mq_subscriber
{
int topic_subscriber_idx;
- int session_plugin_id;
+ int plugin_id;
union
{
on_session_msg_cb_func *sess_msg_cb;
@@ -85,7 +95,6 @@ typedef struct stellar_mq_subscriber
struct stellar_mq_subscriber *next, *prev;
}stellar_mq_subscriber;
-typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg);
struct stellar_mq_topic_schema
{
@@ -93,7 +102,12 @@ struct stellar_mq_topic_schema
void *free_cb_arg;
int topic_id;
int subscriber_cnt;
- stellar_msg_free_cb_func *free_cb;
+ union
+ {
+ void *free_cb;
+ session_msg_free_cb_func *sess_msg_free_cb;
+ packet_msg_free_cb_func *pkt_msg_free_cb;
+ };
struct stellar_mq_subscriber *subscribers;
};
@@ -119,6 +133,7 @@ struct plugin_manager_runtime
struct stellar_exdata *sess_exdata_array;
struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
int current_session_plugin_id;
+ int enable_session_mq;
};
struct registered_packet_plugin_schema
@@ -126,6 +141,7 @@ struct registered_packet_plugin_schema
char ip_protocol;
plugin_on_packet_func *on_packet;
void *plugin_env;
+ UT_array *registed_packet_mq_subscriber_info;
};
struct registered_polling_plugin_schema
@@ -228,6 +244,7 @@ PLUGIN_SPEC_LOAD_ERROR:
}
static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr);
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
@@ -264,10 +281,12 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
}
FREE(specs);
pm->per_thread_pkt_exdata_array=per_thread_packet_exdata_arrary_new(st, pm);
+ pm->per_thread_pkt_mq_array=per_thread_mq_arrary_new(st, pm);
return pm;
}
static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr);
+static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
@@ -289,6 +308,15 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->session_mq_schema_array);
}
+ if(plug_mgr->packet_mq_schema_array)
+ {
+ for(unsigned int i = 0; i < utarray_len(plug_mgr->packet_mq_schema_array); i++)
+ {
+ stellar_packet_mq_destroy_topic(plug_mgr->st, i);
+ }
+ utarray_free(plug_mgr->packet_mq_schema_array);
+ }
+
if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
@@ -392,7 +420,7 @@ static struct per_thread_exdata_array *per_thread_packet_exdata_arrary_new(struc
static void per_thread_packet_exdata_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr)
{
- if(st == NULL || plug_mgr == NULL || plug_mgr->packet_exdata_schema_array == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
+ if(st == NULL || plug_mgr == NULL || plug_mgr->per_thread_pkt_exdata_array)return;
int thread_num=stellar_get_worker_thread_num(st);
for (int i = 0; i < thread_num; i++)
{
@@ -505,35 +533,16 @@ static void stellar_mq_topic_schema_dtor(void *_elt)
// FREE(elt); // free the item
}
-UT_icd session_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
+UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
-void session_mq_free(struct stellar_message *head, UT_array *mq_schema_array)
+int stellar_mq_get_topic_id(const char *topic_name, UT_array *mq_schema_array)
{
- struct stellar_message *mq_elt, *tmp;
- struct stellar_mq_topic_schema *topic;
- DL_FOREACH_SAFE(head, mq_elt, tmp)
- {
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
- if (topic && topic->free_cb)
- {
- topic->free_cb(mq_elt->msg_data, topic->free_cb_arg);
- }
- DL_DELETE(head, mq_elt);
- FREE(mq_elt);
- }
- FREE(head);
-}
-
-int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);;
- if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(topic_name == NULL || mq_schema_array == NULL )return -1;
+ unsigned int len = utarray_len(mq_schema_array);
struct stellar_mq_topic_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
- t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i);
+ t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
@@ -542,27 +551,25 @@ int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
return -1;
}
-int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_update_topic(int topic_id, void *msg_free_cb, void *msg_free_arg, UT_array *mq_schema_array)
{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr->session_mq_schema_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(mq_schema_array);
if(len < (unsigned int)topic_id)return -1;
- struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
-int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, void *msg_free_cb, void *msg_free_arg, UT_array **mq_schema_array)
{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr->session_mq_schema_array == NULL)
+ if(*mq_schema_array == NULL)
{
- utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd);
+ utarray_new(*mq_schema_array, &stellar_mq_topic_schema_icd);
}
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ unsigned int len = utarray_len(*mq_schema_array);
if(stellar_session_mq_get_topic_id(st, topic_name) >= 0)
{
return -1;
@@ -575,20 +582,18 @@ int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name,
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
- utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema);
- plug_mgr->topic_num+=1;
+ utarray_push_back(*mq_schema_array, &t_schema);
return t_schema.topic_id;
}
-int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
+int stellar_mq_destroy_topic(int topic_id, UT_array *mq_schema_array)
{
- struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
- if(plug_mgr->session_mq_schema_array==NULL)return 0;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(mq_schema_array==NULL)return 0;
+ unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
struct stellar_mq_topic_schema *topic =
- (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
struct stellar_mq_subscriber *sub_elt, *sub_tmp;
if (topic)
@@ -602,20 +607,229 @@ int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
return 0; // success
}
-int session_mq_publish_message(struct session *sess, int topic_id, void *data)
+int stellar_mq_publish_message(int topic_id, void *data, UT_array *mq_schema_array, struct stellar_message *mq)
{
- struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
- if(plug_mgr_rt==NULL || topic_id < 0)return -1;
- if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1;
- unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ if(mq_schema_array==NULL || topic_id < 0)return -1;
+ unsigned int len = utarray_len(mq_schema_array);
if (len <= (unsigned int)topic_id)return -1;
struct stellar_message *msg= CALLOC(struct stellar_message,1);
msg->topic_id = topic_id;
msg->msg_data = data;
- DL_APPEND(plug_mgr_rt->pending_mq, msg);
+ DL_APPEND(mq, msg);
return 0;
}
+UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
+
+/*******************************
+ * PACKET MQ *
+ *******************************/
+
+static struct per_thread_mq_array *per_thread_mq_arrary_new(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ if(st == NULL || plug_mgr == NULL || plug_mgr->packet_mq_schema_array == NULL || plug_mgr->per_thread_pkt_mq_array)return NULL;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct per_thread_mq_array *per_thread_pkt_mq_array = CALLOC(struct per_thread_mq_array, thread_num);
+ return per_thread_pkt_mq_array;
+}
+
+static void per_thread_mq_arrary_free(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr == NULL || plug_mgr->per_thread_pkt_mq_array)return;
+ FREE(plug_mgr->per_thread_pkt_mq_array);
+ return;
+}
+
+int stellar_packet_mq_create_topic(struct stellar *st, const char *topic_name, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->packet_mq_schema_array);
+ if(topic_id>0)plug_mgr->packet_mq_topic_num+=1;
+ return topic_id;
+}
+
+int stellar_packet_mq_get_topic_id(struct stellar *st, const char *topic_name)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_get_topic_id(topic_name, plug_mgr->packet_mq_schema_array);
+}
+
+int stellar_packet_mq_update_topic(struct stellar *st, int topic_id, packet_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->packet_mq_schema_array);
+}
+
+int stellar_packet_mq_destroy_topic(struct stellar *st, int topic_id)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_destroy_topic(topic_id, plug_mgr->packet_mq_schema_array);
+}
+
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) //packet plugin only
+{
+ if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->packet_mq_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->packet_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_id);
+ if(packet_plugin_schema==NULL)return -1;
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
+ {
+ utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
+ }
+
+ // if plugin already subscribe current topic, return 0
+ struct stellar_mq_subscriber_info *p=NULL;
+ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(packet_plugin_schema->registed_packet_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ return 0;
+ };
+
+ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->plugin_id = plugin_id;
+ new_subscriber->pkt_msg_cb = plugin_on_msg_cb;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ struct stellar_mq_subscriber_info sub_info;
+ memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
+ sub_info.topic_id=topic_id;
+ sub_info.subscriber_idx=topic->subscriber_cnt;
+ utarray_push_back(packet_plugin_schema->registed_packet_mq_subscriber_info, &sub_info);
+ topic->subscriber_cnt+=1;
+ plug_mgr->packet_topic_subscriber_num+=1;
+ return 0;
+}
+
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg)
+{
+ struct stellar *st = packet_stellar_get(pkt);
+ assert(st);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ int tid = stellar_get_current_thread_id(st);
+ return stellar_mq_publish_message(topic_id, msg, plug_mgr->packet_mq_schema_array, (plug_mgr->per_thread_pkt_mq_array+tid)->mq);
+}
+
+static void plugin_manager_packet_message_dispatch(struct packet *pkt)
+{
+
+ struct stellar *st = packet_stellar_get(pkt);
+ assert(st);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+
+ if(plug_mgr->packet_mq_schema_array==NULL)return;
+
+ int tid = stellar_get_current_thread_id(st);
+
+ struct stellar_message *mq= (plug_mgr->per_thread_pkt_mq_array+tid)->mq;
+
+ struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct stellar_mq_topic_schema *topic;
+ struct registered_packet_plugin_schema *packet_plugin_schema;
+ while (mq != NULL)
+ {
+ DL_FOREACH_SAFE(mq, mq_elt, mq_tmp)
+ {
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->packet_mq_schema_array,
+ (unsigned int)(mq_elt->topic_id));
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ if (sub_elt->pkt_msg_cb)
+ {
+ packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_id);
+ sub_elt->pkt_msg_cb(pkt, mq_elt->topic_id, mq_elt->msg_data, packet_plugin_schema->plugin_env);
+ }
+ }
+ if (topic->pkt_msg_free_cb)
+ {
+ topic->pkt_msg_free_cb(pkt, mq_elt->msg_data, topic->free_cb_arg);
+ }
+ }
+ DL_DELETE(mq, mq_elt);
+ FREE(mq_elt);
+ }
+ }
+ return;
+}
+
+/*******************************
+ * SESSION MQ *
+ *******************************/
+
+void session_mq_free(struct session *sess, struct stellar_message *head, UT_array *mq_schema_array)
+{
+ struct stellar_message *mq_elt, *tmp;
+ struct stellar_mq_topic_schema *topic;
+ DL_FOREACH_SAFE(head, mq_elt, tmp)
+ {
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
+ (unsigned int)(mq_elt->topic_id));
+ if (topic && topic->free_cb)
+ {
+ topic->sess_msg_free_cb(sess, mq_elt->msg_data, topic->free_cb_arg);
+ }
+ DL_DELETE(head, mq_elt);
+ FREE(mq_elt);
+ }
+ FREE(head);
+}
+
+inline int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_get_topic_id(topic_name, plug_mgr->session_mq_schema_array);
+}
+
+int stellar_session_mq_update_topic(struct stellar *st, int topic_id, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_update_topic(topic_id, (void *)msg_free_cb, msg_free_arg, plug_mgr->session_mq_schema_array);
+}
+
+int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, session_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ int topic_id=stellar_mq_create_topic(st, topic_name, (void *)msg_free_cb, msg_free_arg, &plug_mgr->session_mq_schema_array);
+ if(topic_id>0)plug_mgr->session_mq_topic_num+=1;
+ return topic_id;
+}
+
+int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ assert(plug_mgr);
+ return stellar_mq_destroy_topic(topic_id, plug_mgr->session_mq_schema_array);
+}
+
+int session_mq_publish_message(struct session *sess, int topic_id, void *data)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
+ assert(plug_mgr_rt);
+ if(plug_mgr_rt->enable_session_mq==0)return -1;
+ return stellar_mq_publish_message(topic_id, data, plug_mgr_rt->plug_mgr->session_mq_schema_array, plug_mgr_rt->pending_mq);
+}
+
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
if(bit_value!=0 && bit_value!=1)return -1;
@@ -623,7 +837,7 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
if(topic_id < 0 || plugin_id < 0)return -1;
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return -1;
- if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range
+ if(topic_id >= plug_mgr_rt->plug_mgr->session_mq_topic_num)return -1;// topic_id out of range
struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
@@ -658,7 +872,6 @@ int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_i
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
}
-UT_icd session_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
@@ -676,7 +889,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
{
- utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd);
+ utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
}
// if plugin already subscribe current topic, return 0
@@ -689,7 +902,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->session_plugin_id = plugin_id;
+ new_subscriber->plugin_id = plugin_id;
new_subscriber->sess_msg_cb = plugin_on_msg_cb;
DL_APPEND(topic->subscribers, new_subscriber);
@@ -699,7 +912,7 @@ int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_ms
sub_info.subscriber_idx=topic->subscriber_cnt;
utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info);
topic->subscriber_cnt+=1;
- plug_mgr->subscriber_num+=1;
+ plug_mgr->session_topic_subscriber_num+=1;
return 0;
}
@@ -726,8 +939,8 @@ static void plugin_manager_session_message_dispatch(struct session *sess)
{
if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
{
- plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id);
- session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id);
+ plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->plugin_id);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_id);
if(plugin_ctx_rt->state==INIT)
{
if(session_plugin_schema->on_ctx_new)
@@ -791,7 +1004,7 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
rt->sess = sess;
rt->pending_mq = NULL;
rt->delivered_mq = NULL;
- rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1);
+ rt->session_mq_status=bitmap_new(plug_mgr->session_mq_topic_num, plug_mgr->session_topic_subscriber_num, 1);
rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
return rt;
@@ -803,12 +1016,12 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
if(rt==NULL)return;
if(rt->pending_mq != NULL)
{
- session_mq_free(rt->pending_mq, rt->plug_mgr->session_mq_schema_array);
+ session_mq_free(rt->sess, rt->pending_mq, rt->plug_mgr->session_mq_schema_array);
rt->pending_mq=NULL;
}
if(rt->delivered_mq != NULL)
{
- session_mq_free(rt->delivered_mq, rt->plug_mgr->session_mq_schema_array);
+ session_mq_free(rt->sess, rt->delivered_mq, rt->plug_mgr->session_mq_schema_array);
rt->delivered_mq=NULL;
}
if(rt->session_mq_status != NULL)
@@ -868,6 +1081,7 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac
p->on_packet(pkt, ip_proto, p->plugin_env);
}
}
+ plugin_manager_packet_message_dispatch(pkt);
return;
}
@@ -961,9 +1175,11 @@ void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
default:
break;
}
+ plug_mgr_rt->enable_session_mq=1;
//TODO: check TCP topic active subscirber num, if 0, return APP_STATE_DROPME, to reduce tcp reassemble overhead
session_mq_publish_message(sess, topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
+ plug_mgr_rt->enable_session_mq=0;
return;
}
@@ -971,10 +1187,13 @@ void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
struct plugin_manager_runtime *plug_mgr_rt = session_plugin_manager_runtime_get(sess);
if(plug_mgr_rt==NULL)return;
+ plug_mgr_rt->enable_session_mq=1;
session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ session_mq_free(plug_mgr_rt->sess,plug_mgr_rt->delivered_mq, plug_mgr_rt->plug_mgr->session_mq_schema_array);
plug_mgr_rt->delivered_mq=NULL;
+ plug_mgr_rt->enable_session_mq=0;
+ plugin_manager_packet_message_dispatch(pkt);
per_thread_packet_exdata_arrary_clean(plug_mgr_rt->plug_mgr, pkt);
return;
}
diff --git a/src/stellar_on_sapp/stellar_on_sapp_api.c b/src/stellar_on_sapp/stellar_on_sapp_api.c
index b3eed53..68bb2a7 100644
--- a/src/stellar_on_sapp/stellar_on_sapp_api.c
+++ b/src/stellar_on_sapp/stellar_on_sapp_api.c
@@ -111,6 +111,7 @@ struct session *session_new_on_sapp(struct stellar *st, struct streaminfo *strea
sess->pstream=stream;
sess->session_direction=-1;
memset(&sess->cur_pkt, 0, sizeof(struct packet));
+ sess->cur_pkt.st=st;
sess->plug_mgr_rt=plugin_manager_session_runtime_new(st->plug_mgr, sess);
return sess;
}