summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-10 10:18:05 +0800
committeryangwei <[email protected]>2024-09-10 14:44:38 +0800
commite9825c3988575be0bc6b422568e81335eb3cb5ce (patch)
tree4cbb21495a4d51c90a6d563ba4cf154f748a2361
parent6403e832de204d262647cbc9fa964473c7802386 (diff)
🦄 refactor(stellar api): split exdata and mq
-rw-r--r--include/stellar/exdata.h (renamed from infra/exdata/exdata.h)17
-rw-r--r--include/stellar/mq.h52
-rw-r--r--include/stellar/stellar.h30
-rw-r--r--include/stellar/stellar_exdata.h34
-rw-r--r--include/stellar/stellar_mq.h45
-rw-r--r--infra/CMakeLists.txt2
-rw-r--r--infra/exdata/exdata.c10
-rw-r--r--infra/exdata/exdata_internal.h2
-rw-r--r--infra/mq/CMakeLists.txt3
-rw-r--r--infra/mq/mq.c212
-rw-r--r--infra/mq/mq_interna.h74
-rw-r--r--infra/plugin_manager/plugin_manager.c282
-rw-r--r--infra/plugin_manager/plugin_manager.h17
-rw-r--r--infra/plugin_manager/plugin_manager_interna.h97
-rw-r--r--infra/stellar_core.c20
15 files changed, 422 insertions, 475 deletions
diff --git a/infra/exdata/exdata.h b/include/stellar/exdata.h
index 941128a..8233c8b 100644
--- a/infra/exdata/exdata.h
+++ b/include/stellar/exdata.h
@@ -1,5 +1,10 @@
#pragma once
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
typedef void exdata_free(int idx, void *ex_ptr, void *arg);
struct exdata_schema;
@@ -7,16 +12,20 @@ struct exdata_schema;
struct exdata_schema *exdata_schema_new();
void exdata_schema_free(struct exdata_schema *s);
-int exdata_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg);
+int exdata_schema_new_index(struct exdata_schema *schema, const char *name, exdata_free *free_func,void *free_arg);
int exdata_schema_get_idx_by_name(struct exdata_schema *schema, const char *name);
struct exdata_runtime;
-struct exdata_runtime *exdata_handle_new(struct exdata_schema *h);
-void exdata_handle_free(struct exdata_runtime *h);
-void exdata_handle_reset(struct exdata_runtime *h);
+struct exdata_runtime *exdata_runtime_new(struct exdata_schema *h);
+void exdata_runtime_free(struct exdata_runtime *h);
+void exdata_runtime_reset(struct exdata_runtime *h);//call free_func, and set ex_ptr to NULL
int exdata_set(struct exdata_runtime *h, int idx, void *ex_ptr);
void *exdata_get(struct exdata_runtime *h, int idx);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/mq.h b/include/stellar/mq.h
new file mode 100644
index 0000000..57e860a
--- /dev/null
+++ b/include/stellar/mq.h
@@ -0,0 +1,52 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+struct mq_schema;
+struct mq_schema *mq_schema_new();
+void mq_schema_free(struct mq_schema *s);
+
+struct mq_runtime;
+struct mq_runtime *mq_runtime_new(struct mq_schema *s);
+void mq_runtime_free(struct mq_runtime *s);
+
+typedef void mq_msg_free_cb_func(void *msg, void *msg_free_arg);
+typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg);
+typedef void on_msg_dispatch_cb_func(int topic_id,
+ const void *msg,
+ on_msg_cb_func* on_msg_cb,
+ void *on_msg_cb_arg,
+ void *dispatch_arg);
+
+//return topic_id
+int mq_schema_create_topic(struct mq_schema *s,
+ const char *topic_name,
+ on_msg_dispatch_cb_func *on_dispatch_cb,
+ void *on_dispatch_arg,
+ mq_msg_free_cb_func *msg_free_cb,
+ void *msg_free_arg);
+
+int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name);
+
+int mq_schema_update_topic(struct mq_schema *s,
+ int topic_id,
+ on_msg_dispatch_cb_func *on_dispatch_cb,
+ void *on_dispatch_arg,
+ mq_msg_free_cb_func *msg_free_cb,
+ void *msg_free_arg);
+
+int mq_schema_destroy_topic(struct mq_schema *s, int topic_id);
+
+//return 0 if success, otherwise return -1.
+int mq_schema_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg);
+
+
+int mq_runtime_publish_message(struct mq_runtime *rt, int topic_id, void *msg);
+void mq_runtime_dispatch(struct mq_runtime *rt);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index 63f7ab3..cc66cfe 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -7,32 +7,24 @@ extern "C"
#include <stdint.h>
-struct stellar;
+#include "stellar/mq.h"
+#include "stellar/log.h"
+#include "stellar/packet.h"
-/**********************************************
- * PLUGIN API *
- **********************************************/
+struct stellar;
//return plugin_env
typedef void *plugin_on_load_func(struct stellar *st);
typedef void plugin_on_unload_func(void *plugin_env);
-/**********************************************
- * PLUGIN EVENT API *
- **********************************************/
-struct packet;
typedef void plugin_on_packet_func(struct packet *pkt, void *on_packet_cb_arg);
+//return 0 if success, otherwise return -1.
+int stellar_raw_packet_subscribe(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg);
-int stellar_on_raw_packet_register(struct stellar *st, plugin_on_packet_func *on_packet_cb, void *on_packet_cb_arg);
-
-//return polling work result, 0: no work, 1: work
+//return on_polling state, 0: idle, 1: working
typedef int plugin_on_polling_func(void *polling_arg);
-//return polling plugin_id
-int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg);
-
-/**********************************************
- * STELLAR DEV API *
- **********************************************/
+//return 0 if success, otherwise return -1.
+int stellar_polling_subscribe(struct stellar *st, plugin_on_polling_func on_polling, void *polling_arg);
void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str);
@@ -41,7 +33,6 @@ uint16_t stellar_get_current_thread_index();
// only send user build packet, can't send packet which come from network
void stellar_send_build_packet(struct stellar *st, struct packet *pkt);
-struct stellar;
struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg_file, const char *log_cfg_file);
void stellar_run(struct stellar *st);
void stellar_free(struct stellar *st);
@@ -49,6 +40,9 @@ void stellar_loopbreak(struct stellar *st);
void stellar_reload_log_level(struct stellar *st);
struct logger *stellar_get_logger(struct stellar *st);
+struct mq_schema *stellar_get_mq_schema(struct stellar *st);
+struct mq_runtime *stellar_get_mq_runtime(struct stellar *st);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stellar/stellar_exdata.h b/include/stellar/stellar_exdata.h
deleted file mode 100644
index 8ed67bb..0000000
--- a/include/stellar/stellar_exdata.h
+++ /dev/null
@@ -1,34 +0,0 @@
-#pragma once
-
-#include "utils.h"
-#include "stellar.h"
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-typedef void stellar_exdata_free(int idx, void *ex_ptr, void *arg);
-
-
-inline static void stellar_exdata_free_default(int idx __unused, void *ex_ptr, void *arg __unused)
-{
- if(ex_ptr)FREE(ex_ptr);
-}
-
-struct packet;
-int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg);
-
-//packet exdata api
-int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr);
-void *packet_exdata_get(struct packet *pkt, int idx);
-
-struct session;
-
-//session exdata api
-int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
-void *session_exdata_get(struct session *sess, int idx);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h
deleted file mode 100644
index 3d27c9f..0000000
--- a/include/stellar/stellar_mq.h
+++ /dev/null
@@ -1,45 +0,0 @@
-#pragma once
-
-#include "utils.h"
-#include "stellar.h"
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-//topic api
-typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg);
-
-inline static void stellar_msg_free_default(void *msg, void *msg_free_arg __unused)
-{
- if(msg)FREE(msg);
-}
-
-typedef void on_msg_cb_func(int topic_id, const void *msg, void *on_msg_arg);
-typedef void on_msg_dispatch_cb_func(int topic_id, const void *msg, on_msg_cb_func* on_msg_cb, void *on_msg_cb_arg, void *dispatch_arg);
-
-//return topic_id
-int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
-int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-int stellar_mq_destroy_topic(struct stellar *st, int topic_id);
-
-
-enum stellar_mq_priority
-{
- STELLAR_MQ_PRIORITY_LOW,
- STELLAR_MQ_PRIORITY_NORMAL,
- STELLAR_MQ_PRIORITY_HIGH,
- STELLAR_MQ_PRIORITY_MAX,
-};
-
-
-//return 0 if success, otherwise return -1.
-int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void * on_msg_cb_arg);
-int stellar_mq_publish_message(struct stellar *st, int topic_id, void *msg);
-int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *msg, enum stellar_mq_priority priority);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/infra/CMakeLists.txt b/infra/CMakeLists.txt
index cc4974e..aec3d43 100644
--- a/infra/CMakeLists.txt
+++ b/infra/CMakeLists.txt
@@ -1,4 +1,4 @@
-set(INFRA exdata tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager)
+set(INFRA exdata mq tuple packet_parser packet_io ip_reassembly tcp_reassembly session_manager plugin_manager)
set(DEPS bitmap dablooms interval_tree logger nmx_pool rbtree timeout toml)
#set(DECODERS http lpi)
set(WHOLE_ARCHIVE ${DEPS} ${INFRA} ${DECODERS})
diff --git a/infra/exdata/exdata.c b/infra/exdata/exdata.c
index e1b75df..e183c82 100644
--- a/infra/exdata/exdata.c
+++ b/infra/exdata/exdata.c
@@ -57,7 +57,7 @@ static void stellar_exdata_met_dtor(void *_elt)
UT_icd stellar_exdata_meta_icd = {sizeof(struct exdata_meta), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor};
-int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg)
+int exdata_schema_new_index(struct exdata_schema *s, const char *name, exdata_free *free_func,void *free_arg)
{
if(s==NULL || name==NULL)return -1;
if(s->exdata_meta_array == NULL)
@@ -90,7 +90,7 @@ int exdata_new_index(struct exdata_schema *s, const char *name, exdata_free *fre
/*******************************
* STELLAR EXDATA HANDLE API *
*******************************/
-struct exdata_runtime *exdata_handle_new(struct exdata_schema *s)
+struct exdata_runtime *exdata_runtime_new(struct exdata_schema *s)
{
if(s==NULL || s->exdata_meta_array==NULL)return NULL;
struct exdata_runtime *h = CALLOC(struct exdata_runtime, 1);
@@ -103,7 +103,7 @@ struct exdata_runtime *exdata_handle_new(struct exdata_schema *s)
return h;
}
-void exdata_handle_reset(struct exdata_runtime *h)
+void exdata_runtime_reset(struct exdata_runtime *h)
{
if(h==NULL||h->schema==NULL||h->exdata_array==NULL)return;
unsigned int len=utarray_len(h->schema->exdata_meta_array);
@@ -125,10 +125,10 @@ void exdata_handle_reset(struct exdata_runtime *h)
return;
}
-void exdata_handle_free(struct exdata_runtime *h)
+void exdata_runtime_free(struct exdata_runtime *h)
{
if(h==NULL)return;
- exdata_handle_reset(h);
+ exdata_runtime_reset(h);
if(h->exdata_array)FREE(h->exdata_array);
FREE(h);
}
diff --git a/infra/exdata/exdata_internal.h b/infra/exdata/exdata_internal.h
index 4b8e398..a4b7a4f 100644
--- a/infra/exdata/exdata_internal.h
+++ b/infra/exdata/exdata_internal.h
@@ -2,7 +2,7 @@
#include "uthash/utarray.h"
-#include "exdata.h"
+#include "stellar/exdata.h"
struct exdata_schema
{
diff --git a/infra/mq/CMakeLists.txt b/infra/mq/CMakeLists.txt
new file mode 100644
index 0000000..727073a
--- /dev/null
+++ b/infra/mq/CMakeLists.txt
@@ -0,0 +1,3 @@
+add_library(mq mq.c)
+
+#add_subdirectory(test) \ No newline at end of file
diff --git a/infra/mq/mq.c b/infra/mq/mq.c
new file mode 100644
index 0000000..9f49eb1
--- /dev/null
+++ b/infra/mq/mq.c
@@ -0,0 +1,212 @@
+#include "mq_interna.h"
+
+#include "stellar/utils.h"
+
+#include "uthash/utlist.h"
+
+/*******************************
+ * STELLAR MQ *
+ *******************************/
+static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
+{
+ struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
+ *src = (struct stellar_mq_topic_schema *)_src;
+ memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
+ dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
+}
+
+static void stellar_mq_topic_schema_dtor(void *_elt)
+{
+ struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
+ if (elt->topic_name)
+ FREE(elt->topic_name);
+ // FREE(elt); // free the item
+}
+
+UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
+
+int mq_schema_get_topic_id(struct mq_schema *s, const char *topic_name)
+{
+ if(topic_name == NULL || s == NULL || s->topic_array == NULL )return -1;
+ unsigned int len = utarray_len(s->topic_array);
+ struct stellar_mq_topic_schema *t_schema;
+ for(unsigned int i = 0; i < len; i++)
+ {
+ t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, i);
+ if(strcmp(t_schema->topic_name, topic_name) == 0)
+ {
+ return i;
+ }
+ }
+ return -1;
+}
+
+int mq_schema_update_topic(struct mq_schema *s, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ if(s == NULL || s->topic_array == NULL)return -1;
+ unsigned int len = utarray_len(s->topic_array);
+ if(len < (unsigned int)topic_id)return -1;
+ struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
+ if(t_schema == NULL)return -1;
+ t_schema->dispatch_cb=on_dispatch_cb;
+ t_schema->dispatch_cb_arg=on_dispatch_arg;
+ t_schema->free_cb=msg_free_cb;
+ t_schema->free_cb_arg=msg_free_arg;
+ return 0;
+}
+
+int mq_schema_create_topic(struct mq_schema *s, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, mq_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+{
+ if(s==NULL)return -1;
+ if(s->topic_array == NULL)
+ {
+ utarray_new(s->topic_array, &stellar_mq_topic_schema_icd);
+ }
+ unsigned int len = utarray_len(s->topic_array);
+ if(mq_schema_get_topic_id(s, topic_name) >= 0)
+ {
+ return -1;
+ }
+ struct stellar_mq_topic_schema t_schema;
+ memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
+ t_schema.dispatch_cb=on_dispatch_cb;
+ t_schema.free_cb=msg_free_cb;
+ t_schema.topic_name=(char *)topic_name;
+ t_schema.topic_id=len;//topid_id equals arrary index
+ t_schema.dispatch_cb_arg=on_dispatch_arg;
+ t_schema.free_cb_arg=msg_free_arg;
+ t_schema.subscribers=NULL;
+ t_schema.subscriber_cnt=0;
+ utarray_push_back(s->topic_array, &t_schema);
+ s->stellar_mq_topic_num+=1;
+ return t_schema.topic_id;
+}
+
+int mq_schema_destroy_topic(struct mq_schema *s, int topic_id)
+{
+ if(s==NULL)return -1;
+ if(s->topic_array==NULL)return -1;
+ unsigned int len = utarray_len(s->topic_array);
+ if (len <= (unsigned int)topic_id)
+ return -1;
+ struct stellar_mq_topic_schema *topic =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+
+ if(topic == NULL)return -1;
+
+ if (topic->is_destroyed == 1)return 0;
+
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ DL_DELETE(topic->subscribers, sub_elt);
+ FREE(sub_elt);
+ }
+ topic->is_destroyed = 1;
+ s->stellar_mq_topic_num-=1;
+ return 1; // success
+}
+
+
+static void stellar_mq_dispatch_one_message(struct mq_schema *s, struct stellar_message *mq_elt)
+{
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array,
+ (unsigned int)(mq_elt->header.topic_id));
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ if (sub_elt->plugin_msg_cb)
+ {
+ if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, sub_elt->plugin_msg_cb_arg);
+ else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, sub_elt->plugin_msg_cb_arg);
+ }
+ }
+ }
+}
+
+static void mq_runtime_clean_dlq(struct mq_runtime *rt)
+{
+ struct stellar_message *mq_elt, *tmp;
+ struct stellar_mq_topic_schema *topic;
+ DL_FOREACH_SAFE(rt->dealth_letter_queue, mq_elt, tmp)
+ {
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(rt->schema->topic_array,
+ (unsigned int)(mq_elt->header.topic_id));
+ if (topic && topic->free_cb)
+ {
+ topic->free_cb(mq_elt->body, topic->free_cb_arg);
+ }
+ DL_DELETE(rt->dealth_letter_queue, mq_elt);
+ FREE(mq_elt);
+ }
+}
+
+void stellar_mq_dispatch(struct mq_runtime *rt)
+{
+ struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
+ int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
+ while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
+ {
+ if(rt->priority_mq[cur_priority]==NULL)
+ {
+ cur_priority--;
+ continue;
+ }
+ DL_FOREACH_SAFE(rt->priority_mq[cur_priority], mq_elt, mq_tmp)
+ {
+ stellar_mq_dispatch_one_message(rt->schema, mq_elt);
+ DL_DELETE(rt->priority_mq[mq_elt->header.priority], mq_elt);
+ DL_APPEND(rt->dealth_letter_queue, mq_elt); // move to dlq list
+
+ cur_priority=STELLAR_MQ_PRIORITY_HIGH;
+ break;
+ }
+ }
+ mq_runtime_clean_dlq(rt);
+ return;
+}
+
+//return 0 if success, otherwise return -1.
+int stellar_mq_subscribe(struct mq_schema *s, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
+{
+
+ if(s == NULL || s->topic_array == NULL)return -1;
+
+ unsigned int len = utarray_len(s->topic_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(s->topic_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->plugin_msg_cb = on_msg_cb;
+ new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ topic->subscriber_cnt+=1;
+ s->mq_topic_subscriber_num+=1;
+ return 0;
+}
+
+int stellar_mq_publish_message_with_priority(struct mq_runtime *rt, int topic_id, void *data, enum stellar_mq_priority priority)
+{
+ if(rt==NULL || rt->schema == NULL || rt->schema->topic_array == NULL)return -1;
+
+ unsigned int len = utarray_len(rt->schema->topic_array);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct stellar_message *msg= CALLOC(struct stellar_message,1);
+ msg->rt=rt;
+ msg->header.topic_id = topic_id;
+ msg->header.priority = priority;
+ msg->body = data;
+ DL_APPEND(rt->priority_mq[priority], msg);
+ return 0;
+}
+
+int stellar_mq_publish_message(struct mq_runtime *rt, int topic_id, void *data)
+{
+ return stellar_mq_publish_message_with_priority(rt, topic_id, data, STELLAR_MQ_PRIORITY_MEDIUM);
+} \ No newline at end of file
diff --git a/infra/mq/mq_interna.h b/infra/mq/mq_interna.h
new file mode 100644
index 0000000..1a46d3e
--- /dev/null
+++ b/infra/mq/mq_interna.h
@@ -0,0 +1,74 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar/mq.h"
+
+#include "uthash/utarray.h"
+
+struct mq_schema
+{
+ UT_array *topic_array;
+ int stellar_mq_topic_num;
+ int mq_topic_subscriber_num;
+};
+
+enum stellar_mq_priority
+{
+ STELLAR_MQ_PRIORITY_LOW = 0,
+ STELLAR_MQ_PRIORITY_MEDIUM,
+ STELLAR_MQ_PRIORITY_HIGH,
+ STELLAR_MQ_PRIORITY_MAX
+};
+
+struct stellar_message
+{
+ struct mq_runtime *rt;
+ struct
+ {
+ int topic_id;
+ enum stellar_mq_priority priority;
+ } header;
+ void *body;
+ struct stellar_message *next, *prev;
+} __attribute__((aligned(sizeof(void *))));
+
+typedef struct stellar_mq_subscriber
+{
+ int topic_subscriber_idx;
+ int plugin_idx;
+ on_msg_cb_func *plugin_msg_cb;
+ void *plugin_msg_cb_arg;
+ struct stellar_mq_subscriber *next, *prev;
+}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
+
+
+struct stellar_mq_topic_schema
+{
+ char *topic_name;
+ int topic_id;
+ int subscriber_cnt;
+ int is_destroyed;
+ on_msg_dispatch_cb_func *dispatch_cb;
+ void *dispatch_cb_arg;
+ mq_msg_free_cb_func *free_cb;
+ void *free_cb_arg;
+ struct stellar_mq_subscriber *subscribers;
+}__attribute__((aligned(sizeof(void*))));
+
+
+
+struct mq_runtime
+{
+ struct mq_schema *schema;
+ struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
+ struct stellar_message *dealth_letter_queue;// dlq list
+};
+
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/infra/plugin_manager/plugin_manager.c b/infra/plugin_manager/plugin_manager.c
index 0c50319..f9773a5 100644
--- a/infra/plugin_manager/plugin_manager.c
+++ b/infra/plugin_manager/plugin_manager.c
@@ -1,16 +1,12 @@
#include "plugin_manager_interna.h"
-#include "stellar/stellar_exdata.h"
#include "stellar/utils.h"
#include "toml/toml.h"
-#include "uthash/utlist.h"
-
+#include <dlfcn.h>
#include <stdbool.h>
#include "stellar_core.h"
-#include "packet_private.h"
-#include "session_private.h"
-
+#if 0
void stellar_per_stage_message_counter_incby(struct plugin_manager_schema *plug_mgr, int tid, long long increment)
{
plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=increment;
@@ -26,6 +22,7 @@ bool stellar_per_stage_message_counter_overlimt(struct plugin_manager_schema *pl
if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return true;
return false;
}
+#endif
UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
@@ -87,6 +84,7 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
+#if 0
static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
{
if(st == NULL)return NULL;
@@ -94,7 +92,6 @@ static struct plugin_manager_per_thread_data *plugin_manager_per_thread_data_new
struct plugin_manager_per_thread_data *per_thread_data = CALLOC(struct plugin_manager_per_thread_data, thread_num);
return per_thread_data;
}
-
static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread_data *per_thread_data, struct stellar *st)
{
if(per_thread_data == NULL || st == NULL)return;
@@ -108,8 +105,9 @@ static void plugin_manager_per_thread_data_free(struct plugin_manager_per_thread
FREE(per_thread_data);
return;
}
+#endif
-struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage)
+struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
{
int spec_num;
struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
@@ -118,7 +116,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
return NULL;
}
struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
- plug_mgr->max_message_dispatch=max_msg_per_stage;
+ //plug_mgr->max_message_dispatch=max_msg_per_stage;
if(spec_num > 0)
{
utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
@@ -128,7 +126,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
plug_mgr->st = st;
stellar_set_plugin_manger(st, plug_mgr);
- plug_mgr->exdata_schema=exdata_schema_new();
+ //plug_mgr->exdata_schema=exdata_schema_new();
for(int i = 0; i < spec_num; i++)
{
@@ -139,7 +137,7 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
}
}
FREE(specs);
- plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
+ //plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
return plug_mgr;
}
@@ -156,6 +154,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->plugin_load_specs_array);
}
+#if 0
if(plug_mgr->stellar_mq_schema_array)
{
for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
@@ -164,6 +163,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->stellar_mq_schema_array);
}
+
//if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
if(plug_mgr->registered_packet_plugin_array)
@@ -175,8 +175,9 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_packet_plugin_array);
}
- plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
- exdata_schema_free(plug_mgr->exdata_schema);
+#endif
+ //plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
+ //exdata_schema_free(plug_mgr->exdata_schema);
FREE(plug_mgr);
return;
}
@@ -184,7 +185,7 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
/*******************************
* STELLAR EXDATA *
*******************************/
-
+#if 0
int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
{
if(st==NULL || name==NULL)return -1;
@@ -245,238 +246,10 @@ void *session_exdata_get(struct session *sess, int idx)
if(sess_exdata == NULL)return NULL;
return exdata_get(sess_exdata, idx);
}
+#endif
-/*******************************
- * STELLAR MQ *
- *******************************/
-static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
-{
- struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
- *src = (struct stellar_mq_topic_schema *)_src;
- memcpy(_dst, _src, sizeof(struct stellar_mq_topic_schema));
- dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
-}
-
-static void stellar_mq_topic_schema_dtor(void *_elt)
-{
- struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
- if (elt->topic_name)
- FREE(elt->topic_name);
- // FREE(elt); // free the item
-}
-
-UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
-
-int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
-
- if(topic_name == NULL || mq_schema_array == NULL )return -1;
- unsigned int len = utarray_len(mq_schema_array);
- struct stellar_mq_topic_schema *t_schema;
- for(unsigned int i = 0; i < len; i++)
- {
- t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i);
- if(strcmp(t_schema->topic_name, topic_name) == 0)
- {
- return i;
- }
- }
- return -1;
-}
-
-int stellar_mq_update_topic(struct stellar *st, int topic_id, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
- if(mq_schema_array == NULL)return -1;
- unsigned int len = utarray_len(mq_schema_array);
- if(len < (unsigned int)topic_id)return -1;
- struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
- if(t_schema == NULL)return -1;
- t_schema->dispatch_cb=on_dispatch_cb;
- t_schema->dispatch_cb_arg=on_dispatch_arg;
- t_schema->free_cb=msg_free_cb;
- t_schema->free_cb_arg=msg_free_arg;
- return 0;
-}
-
-int stellar_mq_create_topic(struct stellar *st, const char *topic_name, on_msg_dispatch_cb_func *on_dispatch_cb, void *on_dispatch_arg, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->stellar_mq_schema_array == NULL)
- {
- utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd);
- }
- unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
- if(stellar_mq_get_topic_id(st, topic_name) >= 0)
- {
- return -1;
- }
- struct stellar_mq_topic_schema t_schema;
- memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
- t_schema.dispatch_cb=on_dispatch_cb;
- t_schema.free_cb=msg_free_cb;
- t_schema.topic_name=(char *)topic_name;
- t_schema.topic_id=len;//topid_id equals arrary index
- t_schema.dispatch_cb_arg=on_dispatch_arg;
- t_schema.free_cb_arg=msg_free_arg;
- t_schema.subscribers=NULL;
- t_schema.subscriber_cnt=0;
- utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema);
- plug_mgr->stellar_mq_topic_num+=1;
- return t_schema.topic_id;
-}
-
-int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->stellar_mq_schema_array==NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
- if (len <= (unsigned int)topic_id)
- return -1;
- struct stellar_mq_topic_schema *topic =
- (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
- struct stellar_mq_subscriber *sub_elt, *sub_tmp;
-
- if(topic == NULL)return -1;
-
- if (topic->is_destroyed == 1)return 0;
-
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- DL_DELETE(topic->subscribers, sub_elt);
- FREE(sub_elt);
- }
- topic->is_destroyed = 1;
- plug_mgr->stellar_mq_topic_num-=1;
- return 1; // success
-}
-
-
-static void stellar_mq_dispatch_one_message(struct stellar_message *mq_elt)
-{
- struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(mq_elt->st);
- struct stellar_mq_subscriber *sub_elt, *sub_tmp;
- struct registered_plugin_schema *plugin_schema;
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
- (unsigned int)(mq_elt->header.topic_id));
- if (topic)
- {
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- if (sub_elt->plugin_msg_cb)
- {
- plugin_schema = (struct registered_plugin_schema *)utarray_eltptr(
- plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
- if (plugin_schema)
- {
- if(topic->dispatch_cb)topic->dispatch_cb(mq_elt->header.topic_id,mq_elt->body, sub_elt->plugin_msg_cb, topic->dispatch_cb_arg, plugin_schema->plugin_env);
- else sub_elt->plugin_msg_cb(mq_elt->header.topic_id, mq_elt->body, plugin_schema->plugin_env);
- }
- }
- }
- }
-}
-
-static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue)
-{
- struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
- int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
- while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
- {
- if(priority_mq[cur_priority]==NULL)
- {
- cur_priority--;
- continue;
- }
- DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
- {
- stellar_mq_dispatch_one_message(mq_elt);
- DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
- DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
-
- cur_priority=STELLAR_MQ_PRIORITY_HIGH;
- break;
- }
- }
- return;
-}
-
-static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array)
-{
- struct stellar_message *mq_elt, *tmp;
- struct stellar_mq_topic_schema *topic;
- DL_FOREACH_SAFE(*head, mq_elt, tmp)
- {
- topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
- (unsigned int)(mq_elt->header.topic_id));
- if (topic && topic->free_cb)
- {
- topic->free_cb(mq_elt->body, topic->free_cb_arg);
- }
- DL_DELETE(*head, mq_elt);
- FREE(mq_elt);
- }
-}
-
-UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
-
-//return 0 if success, otherwise return -1.
-int stellar_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *on_msg_cb, void *on_msg_cb_arg)
-{
-
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
-
- unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
-
- struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
- if(topic==NULL)return -1;
-
- struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->plugin_msg_cb = on_msg_cb;
- new_subscriber->plugin_msg_cb_arg = on_msg_cb_arg;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- struct stellar_mq_subscriber_info sub_info;
- memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
- sub_info.topic_id=topic_id;
- sub_info.subscriber_idx=topic->subscriber_cnt;
- topic->subscriber_cnt+=1;
- plug_mgr->mq_topic_subscriber_num+=1;
- return 0;
-}
-
-int stellar_mq_publish_message_with_priority(struct stellar *st, int topic_id, void *data, enum stellar_mq_priority priority)
-{
- if(st==NULL)return -1;
- struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)stellar_get_plugin_manager(st);
- if(plug_mgr==NULL || plug_mgr->stellar_mq_schema_array == NULL)return -1;
-
- int tid = stellar_get_current_thread_index();
- if(stellar_per_stage_message_counter_overlimt(plug_mgr, tid)==true)return -1;
-
- unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
- struct stellar_message *msg= CALLOC(struct stellar_message,1);
- msg->st=plug_mgr->st;
- msg->header.topic_id = topic_id;
- msg->header.priority = priority;
- msg->body = data;
- DL_APPEND(plug_mgr->per_thread_data[tid].priority_mq[priority], msg);
- stellar_per_stage_message_counter_incby(plug_mgr, tid, 1);
- return 0;
-}
-
-int stellar_mq_publish_message(struct stellar *st, int topic_id, void *data)
-{
- return stellar_mq_publish_message_with_priority(st, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
-}
+#if 0
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
@@ -491,7 +264,6 @@ void session_exdata_runtime_free(struct exdata_runtime *exdata_h)
return exdata_handle_free(exdata_h);
}
-
/*********************************************
* PLUGIN MANAGER PLUGIN *
*********************************************/
@@ -518,8 +290,8 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
if(plug_mgr==NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_plugin_schema *p=NULL;
- int tid=stellar_get_current_thread_index();
- stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
+ //int tid=stellar_get_current_thread_index();
+ //stellar_per_stage_message_counter_set(plug_mgr, tid, 0);
while ((p = (struct registered_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->on_packet[in_out])
@@ -527,7 +299,7 @@ static void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, str
p->on_packet[in_out](pkt, p->plugin_env);
}
}
- stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
+ //stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue);
return;
}
@@ -540,11 +312,11 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
{
if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
plugin_manager_on_packet(plug_mgr, pkt, PACKET_STAGE_OUTPUT);
- int tid=stellar_get_current_thread_index();
- stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
- stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
- plug_mgr->stellar_mq_schema_array);
- per_thread_packet_exdata_arrary_clean(plug_mgr);
+ //int tid=stellar_get_current_thread_index();
+ //stellar_per_stage_message_counter_set(plug_mgr, tid, -1);
+ //stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
+ // plug_mgr->stellar_mq_schema_array);
+ //per_thread_packet_exdata_arrary_clean(plug_mgr);
}
/*********************************************
@@ -552,7 +324,7 @@ void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, str
*********************************************/
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
-int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
+int stellar_on_polling_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_polling_plugin_array == NULL)
@@ -584,3 +356,5 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
}
return polling_state;
}
+
+#endif \ No newline at end of file
diff --git a/infra/plugin_manager/plugin_manager.h b/infra/plugin_manager/plugin_manager.h
index 1ecb68c..f2e9147 100644
--- a/infra/plugin_manager/plugin_manager.h
+++ b/infra/plugin_manager/plugin_manager.h
@@ -7,22 +7,19 @@ extern "C"
{
#endif
-#define MAX_MSG_PER_STAGE 256
-
struct plugin_manager_schema;
struct plugin_manager_runtime;
-struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path, unsigned int max_msg_per_stage);
+struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
-void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-//return polling work state, 0: idle, 1: working
-int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
+//TODO
+void *plugin_manager_get_plugin_env(const char *plugin_name);
-struct exdata_runtime;
-struct exdata_runtime *session_exdata_runtime_new(struct stellar *st);
-void session_exdata_runtime_free(struct exdata_runtime *exdata_h);
+//void plugin_manager_on_packet_input(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
+//void plugin_manager_on_packet_output(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
+//return polling work state, 0: idle, 1: working
+//int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
#ifdef __cplusplus
}
diff --git a/infra/plugin_manager/plugin_manager_interna.h b/infra/plugin_manager/plugin_manager_interna.h
index 74b1662..b918e0e 100644
--- a/infra/plugin_manager/plugin_manager_interna.h
+++ b/infra/plugin_manager/plugin_manager_interna.h
@@ -5,110 +5,21 @@ extern "C"
{
#endif
-#include "plugin_manager.h"
-
#include "stellar/stellar.h"
-#include "stellar/stellar_mq.h"
#include "uthash/utarray.h"
-#include "exdata/exdata.h"
-
-struct stellar_message;
-struct plugin_manager_per_thread_data
-{
- struct exdata_runtime *exdata_array;
- struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
- struct stellar_message *dealth_letter_queue;// dlq list
- unsigned long long pub_packet_msg_cnt;
-};
+/*******************************
+ * PLUGIN MANAGER INIT & EXIT *
+ *******************************/
struct plugin_manager_schema
{
struct stellar *st;
- UT_array *plugin_load_specs_array;
- struct exdata_schema *exdata_schema;
- UT_array *stellar_mq_schema_array;
- UT_array *registered_packet_plugin_array;
- UT_array *registered_polling_plugin_array;
- int stellar_mq_topic_num;
- int mq_topic_subscriber_num;
- unsigned int max_message_dispatch;
- struct plugin_manager_per_thread_data *per_thread_data;
-}__attribute__((aligned(sizeof(void*))));
-
-
-struct stellar_message
-{
- struct stellar *st;
- struct
- {
- int topic_id;
- enum stellar_mq_priority priority;
- } header;
- void *body;
- struct stellar_message *next, *prev;
-} __attribute__((aligned(sizeof(void *))));
-
-typedef struct stellar_mq_subscriber
-{
- int topic_subscriber_idx;
- int plugin_idx;
- on_msg_cb_func *plugin_msg_cb;
- void *plugin_msg_cb_arg;
- struct stellar_mq_subscriber *next, *prev;
-}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
-
-
-struct stellar_mq_topic_schema
-{
- char *topic_name;
- int topic_id;
- int subscriber_cnt;
- int is_destroyed;
- on_msg_dispatch_cb_func *dispatch_cb;
- void *dispatch_cb_arg;
- stellar_msg_free_cb_func *free_cb;
- void *free_cb_arg;
- struct stellar_mq_subscriber *subscribers;
-}__attribute__((aligned(sizeof(void*))));
-
-
-enum packet_stage
-{
- PACKET_STAGE_INPUT=0,
- PACKET_STAGE_OUTPUT,
- PACKET_STAGE_MAX
-};
-
-struct registered_plugin_schema
-{
- char ip_protocol;
- plugin_on_packet_func *on_packet[PACKET_STAGE_MAX];
- void *plugin_env;
- UT_array *registed_mq_subscriber_info;
+ UT_array *plugin_load_specs_array;
}__attribute__((aligned(sizeof(void*))));
-struct registered_polling_plugin_schema
-{
- plugin_on_polling_func *on_polling;
- void *plugin_env;
-}__attribute__((aligned(sizeof(void*))));
-
-struct stellar_mq_subscriber_info
-{
- int topic_id;
- int subscriber_idx;
-}__attribute__((aligned(sizeof(void*))));
-
-
-/*******************************
- * PLUGIN MANAGER INIT & EXIT *
- *******************************/
-
-#include <dlfcn.h>
-
struct plugin_specific
{
char plugin_name[256];
diff --git a/infra/stellar_core.c b/infra/stellar_core.c
index ac307aa..f72574e 100644
--- a/infra/stellar_core.c
+++ b/infra/stellar_core.c
@@ -109,7 +109,7 @@ static inline void clean_session(struct session_manager *sess_mgr, uint64_t now_
for (uint64_t j = 0; j < nr_sess_cleaned; j++)
{
sess = cleaned_sess[j];
- session_exdata_runtime_free(session_get_user_data(sess));
+ //session_exdata_runtime_free(session_get_user_data(sess));
session_manager_free_session(sess_mgr, sess);
}
}
@@ -183,7 +183,7 @@ static void *worker_thread(void *arg)
defraged_pkt = NULL;
pkt = &packets[i];
- plugin_manager_on_packet_input(plug_mgr, pkt);
+ //plugin_manager_on_packet_input(plug_mgr, pkt);
if (packet_is_fragment(pkt))
{
defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms);
@@ -194,7 +194,7 @@ static void *worker_thread(void *arg)
else
{
pkt = defraged_pkt;
- plugin_manager_on_packet_input(plug_mgr, defraged_pkt);
+ //plugin_manager_on_packet_input(plug_mgr, defraged_pkt);
}
}
@@ -206,8 +206,8 @@ static void *worker_thread(void *arg)
{
goto fast_path;
}
- struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st);
- session_set_user_data(sess, per_sess_exdata);
+ //struct exdata_runtime *per_sess_exdata=session_exdata_runtime_new(st);
+ //session_set_user_data(sess, per_sess_exdata);
}
else
{
@@ -220,12 +220,12 @@ static void *worker_thread(void *arg)
fast_path:
if (pkt == defraged_pkt)
{
- plugin_manager_on_packet_output(plug_mgr, defraged_pkt);
- plugin_manager_on_packet_output(plug_mgr, &packets[i]);
+ //plugin_manager_on_packet_output(plug_mgr, defraged_pkt);
+ //plugin_manager_on_packet_output(plug_mgr, &packets[i]);
}
else
{
- plugin_manager_on_packet_output(plug_mgr, pkt);
+ //plugin_manager_on_packet_output(plug_mgr, pkt);
}
if (sess)
@@ -272,7 +272,7 @@ static void *worker_thread(void *arg)
idle_tasks:
clean_session(sess_mgr, now_ms);
ip_reassembly_expire(ip_reass, now_ms);
- plugin_manager_on_polling(plug_mgr);
+ //plugin_manager_on_polling(plug_mgr);
stellar_stat_merge(runtime->stat, &thr_stat, thr_idx, now_ms);
if (nr_pkt_received == 0)
@@ -448,7 +448,7 @@ struct stellar *stellar_new(const char *stellar_cfg_file, const char *plugin_cfg
CORE_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
- runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file, MAX_MSG_PER_STAGE);
+ runtime->plug_mgr = plugin_manager_init(st, plugin_cfg_file);
if (runtime->plug_mgr == NULL)
{
CORE_LOG_ERROR("unable to create plugin manager");