summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-05-17 16:55:46 +0800
committeryangwei <[email protected]>2024-05-17 00:00:59 +0800
commit13d9d10f8317608fe55d9a671d97aaa4d58a70ad (patch)
treeff41339121afcf3495cbc6acc9e2a8effcdc238b
parent13fc7e3f77441b9697b0d6606499c9a30833c3fe (diff)
✨ feat(plugin manager): integrated plugin manager, build success
-rw-r--r--deps/CMakeLists.txt3
-rw-r--r--deps/bitmap/CMakeLists.txt3
-rw-r--r--deps/bitmap/bitmap.c59
-rw-r--r--deps/bitmap/bitmap.h5
-rw-r--r--include/stellar/session.h10
-rw-r--r--include/stellar/session_exdata.h17
-rw-r--r--include/stellar/session_mq.h33
-rw-r--r--include/stellar/stellar.h42
-rw-r--r--include/stellar/utils.h36
-rw-r--r--src/plugin/CMakeLists.txt3
-rw-r--r--src/plugin/plugin_manager.cpp934
-rw-r--r--src/plugin/plugin_manager.h23
-rw-r--r--src/session/session.cpp55
-rw-r--r--src/stellar/CMakeLists.txt6
-rw-r--r--src/stellar/main.cpp7
-rw-r--r--src/stellar/stellar.cpp18
-rw-r--r--src/stellar/stellar_priv.h8
17 files changed, 1201 insertions, 61 deletions
diff --git a/deps/CMakeLists.txt b/deps/CMakeLists.txt
index 3c9947a..8e10935 100644
--- a/deps/CMakeLists.txt
+++ b/deps/CMakeLists.txt
@@ -2,4 +2,5 @@ add_subdirectory(timeout)
add_subdirectory(dablooms)
add_subdirectory(toml)
add_subdirectory(rbtree)
-add_subdirectory(interval_tree) \ No newline at end of file
+add_subdirectory(interval_tree)
+add_subdirectory(bitmap) \ No newline at end of file
diff --git a/deps/bitmap/CMakeLists.txt b/deps/bitmap/CMakeLists.txt
new file mode 100644
index 0000000..4eefd63
--- /dev/null
+++ b/deps/bitmap/CMakeLists.txt
@@ -0,0 +1,3 @@
+set(CMAKE_C_FLAGS "-std=c99")
+add_definitions(-fPIC)
+add_library(bitmap STATIC bitmap.c) \ No newline at end of file
diff --git a/deps/bitmap/bitmap.c b/deps/bitmap/bitmap.c
new file mode 100644
index 0000000..59fb4b7
--- /dev/null
+++ b/deps/bitmap/bitmap.c
@@ -0,0 +1,59 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+struct bitmap {
+ unsigned char *data;
+ int width;
+ int height;
+};
+
+struct bitmap * bitmap_new(int width, int height, int value) {
+ struct bitmap *bmp = (struct bitmap *)malloc(sizeof(struct bitmap));
+ bmp->width = width;
+ bmp->height = height;
+ int size = (width * height + 7) / 8; // Calculate total bytes needed
+ bmp->data = (unsigned char *)calloc(size,1 );
+ memset(bmp->data, value ? 0xFF : 0x00, size);
+ return bmp;
+}
+
+int bitmap_set(struct bitmap *bmp, int x, int y, int value) {
+ if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
+ return -1; // Return error code if coordinates are out of bounds
+ }
+ int idx = y * bmp->width + x;
+ if (value)
+ bmp->data[idx / 8] |= (1 << (idx % 8));
+ else
+ bmp->data[idx / 8] &= ~(1 << (idx % 8));
+ return 0;
+}
+
+int bitmap_get(struct bitmap *bmp, int x, int y) {
+ if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
+ return -1; // Return error code if coordinates are out of bounds
+ }
+ int idx = y * bmp->width + x;
+ return (bmp->data[idx / 8] & (1 << (idx % 8))) != 0;
+}
+
+void bitmap_free(struct bitmap *bmp) {
+ if(bmp)
+ {
+ if(bmp->data)free(bmp->data);
+ free(bmp);
+ }
+}
+
+
+
+
+int test_bitmap() {
+ struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap
+ if (bitmap_set(bmp, 2, 2, 1) == 0) { // Set bit at position (2,2)
+ printf("Bit at (2,2): %d\n", bitmap_get(bmp, 2, 2)); // Get bit at position (2,2)
+ }
+ bitmap_free(bmp);
+ return 0;
+}
diff --git a/deps/bitmap/bitmap.h b/deps/bitmap/bitmap.h
new file mode 100644
index 0000000..210ea35
--- /dev/null
+++ b/deps/bitmap/bitmap.h
@@ -0,0 +1,5 @@
+struct bitmap;
+struct bitmap * bitmap_new(int width, int height, int value);
+int bitmap_set(struct bitmap *bmp, int x, int y, int value);
+int bitmap_get(struct bitmap *bmp, int x, int y);
+void bitmap_free(struct bitmap *bmp); \ No newline at end of file
diff --git a/include/stellar/session.h b/include/stellar/session.h
index db027a1..87c7612 100644
--- a/include/stellar/session.h
+++ b/include/stellar/session.h
@@ -121,10 +121,18 @@ enum session_timestamp
};
struct session;
+#define SESSION_SEEN_C2S_FLOW (1 << 0)
+#define SESSION_SEEN_S2C_FLOW (1 << 1)
+int session_is_symmetric(struct session *sess, unsigned char *flag);
int session_has_duplicate_traffic(const struct session *sess);
enum session_type session_get_type(const struct session *sess);
enum session_state session_get_state(const struct session *sess);
+
+enum session_state session_get_current_state(const struct session *sess);
+const struct packet *session_get0_current_packet(struct session *sess);
+const char *session_get0_current_payload(struct session *sess, size_t *payload_len);
+
enum closing_reason session_get_closing_reason(const struct session *sess);
enum session_direction session_get_direction(const struct session *sess);
enum flow_direction session_get_current_flow_direction(const struct session *sess);
@@ -139,6 +147,8 @@ uint64_t session_get_id(const struct session *sess);
uint64_t session_get_timestamp(const struct session *sess, enum session_timestamp type);
uint64_t session_get_stat(const struct session *sess, enum flow_direction dir, enum session_stat stat);
+const char *session_get0_readable_addr(struct session *sess);
+
#ifdef __cplusplus
}
#endif
diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h
new file mode 100644
index 0000000..a10139e
--- /dev/null
+++ b/include/stellar/session_exdata.h
@@ -0,0 +1,17 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar.h"
+
+typedef void session_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg);
+int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *arg);
+int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
+void *session_exdata_get(struct session *sess, int idx);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
new file mode 100644
index 0000000..2ad9f9f
--- /dev/null
+++ b/include/stellar/session_mq.h
@@ -0,0 +1,33 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "stellar.h"
+
+//session mq
+typedef void msg_free_cb_func(void *msg, void *msg_free_arg);
+typedef void on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
+
+//return topic_id
+int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+
+int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name);
+
+int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+
+int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id);
+
+//return 0 if success, otherwise return -1.
+int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id);
+
+int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
+
+int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
+int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index e02076d..8ef9c2b 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -9,6 +9,48 @@ extern "C"
#include <stdint.h>
#include "stellar/session.h"
+#include "session.h"
+
+struct session;
+struct stellar;
+
+//return plugin_env
+typedef void *plugin_on_load_func(struct stellar *st);
+typedef void plugin_on_unload_func(void *plugin_env);
+
+//return per_session_ctx
+typedef void *session_ctx_new_func(struct session *sess, void *plugin_env);
+typedef void session_ctx_free_func(struct session *sess, void *session_ctx, void *plugin_env);
+
+#define TOPIC_TCP "TCP"
+#define TOPIC_TCP_STREAM "TCP_STREAM"
+#define TOPIC_UDP "UDP"
+#define TOPIC_EGRESS "EGRESS"
+#define TOPIC_CONTROL_PACKET "CONTROL_PACKET"
+
+//return session plugin_id
+int stellar_session_plugin_register(struct stellar *st,
+ session_ctx_new_func session_ctx_new,
+ session_ctx_free_func session_ctx_free,
+ void *plugin_env);
+
+void stellar_session_plugin_dettach_current_session(struct session *sess);
+
+
+
+struct packet;
+typedef void plugin_on_packet_func(struct packet *pkt, unsigned char ip_protocol, void *plugin_env);
+
+//return packet plugin_id
+int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env);
+
+
+//return polling work result, 0: no work, 1: work
+typedef int plugin_on_polling_func(void *plugin_env);
+
+//return polling plugin_id
+int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env);
+
uint16_t stellar_get_current_thread_index();
// return inject packet length, return 0 if failed
diff --git a/include/stellar/utils.h b/include/stellar/utils.h
new file mode 100644
index 0000000..fb1dc5b
--- /dev/null
+++ b/include/stellar/utils.h
@@ -0,0 +1,36 @@
+#pragma once
+
+#include <stdlib.h> //calloc
+#include <stddef.h> //NULL
+
+#define ALLOC(type, number) ((type *)calloc(sizeof(type), number))
+#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
+
+#define REALLOC(type, ptr, number) ((type *)realloc(ptr, (number) * sizeof(type)))
+
+#define FREE(p) {free(p); p = NULL;}
+
+#define TRUE 1
+#define FALSE 0
+
+#ifndef MAX
+#define MAX(a, b) (((a) > (b)) ? (a) : (b))
+#endif
+
+#ifndef MIN
+#define MIN(a, b) (((a) < (b)) ? (a) : (b))
+#endif
+
+#ifndef offsetof
+#define offsetof(TYPE, MEMBER) ((size_t) &((TYPE *)0)->MEMBER)
+#endif
+
+#ifndef container_of
+#define container_of(ptr, type, member) ({ \
+ const typeof( ((type *)0)->member ) *__mptr = (ptr); \
+ (type *)( (char *)__mptr - offsetof(type,member) );})
+#endif
+
+#ifndef __unused
+#define __unused __attribute__((__unused__))
+#endif
diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt
index 3dbcff5..5ad4c6d 100644
--- a/src/plugin/CMakeLists.txt
+++ b/src/plugin/CMakeLists.txt
@@ -1,3 +1,4 @@
add_library(plugin_manager plugin_manager.cpp)
target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
-target_link_libraries(plugin_manager session_manager core) \ No newline at end of file
+target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
+target_link_libraries(plugin_manager bitmap toml session_manager core ${CMAKE_DL_LIBS}) \ No newline at end of file
diff --git a/src/plugin/plugin_manager.cpp b/src/plugin/plugin_manager.cpp
index df2e920..86373eb 100644
--- a/src/plugin/plugin_manager.cpp
+++ b/src/plugin/plugin_manager.cpp
@@ -3,80 +3,940 @@
#include "session_priv.h"
#include "stellar_priv.h"
-struct plugin_manager
+
+#include "plugin_manager.h"
+
+#include "stellar/utils.h"
+#include "stellar/session.h"
+#include "stellar/session_exdata.h"
+#include "stellar/session_mq.h"
+#include "tcp_reassembly.h"
+
+extern "C"
+{
+ #include "uthash/utlist.h"
+ #include "uthash/utarray.h"
+ #include "bitmap/bitmap.h"
+}
+
+struct plugin_manager_schema
+{
+ struct stellar *st;
+ UT_array *session_exdata_schema_array;
+ UT_array *plugin_load_specs_array;
+ UT_array *session_mq_schema_array;
+ UT_array *registered_session_plugin_array;
+ UT_array *registered_packet_plugin_array;
+ UT_array *registered_polling_plugin_array;
+ int topic_num;
+ int subscriber_num;
+ int tcp_topic_id;
+ int udp_topic_id;
+ int tcp_stream_topic_id;
+ int egress_topic_id;
+ int control_packet_topic_id;
+};
+
+
+struct session_exdata_schema
+{
+ char *name;
+ session_exdata_free *free_func;
+ void *free_arg;
+ int idx;
+};
+
+struct session_message
{
- // TODO
+ int topic_id;
+ void *msg_data;
+ struct session_message *next, *prev;
};
-void *plugin_manager_new_ctx(struct session *sess)
+typedef struct session_mq_subscriber
+{
+ int topic_subscriber_idx;
+ int session_plugin_id;
+ on_msg_cb_func *msg_cb;
+ struct session_mq_subscriber *next, *prev;
+}session_mq_subscribers;
+
+struct session_mq_topic_schema
+{
+ char *topic_name;
+ msg_free_cb_func *free_cb;
+ void *free_cb_arg;
+ int topic_id;
+ int subscriber_cnt;
+ struct session_mq_subscriber *subscribers;
+};
+
+enum plugin_ctx_state
+{ INIT, ACTIVE, EXIT };
+
+struct session_plugin_ctx_runtime
+{
+ enum plugin_ctx_state state;
+ int session_plugin_id;
+ void *plugin_ctx;
+};
+
+struct plugin_exdata
+{
+ void *exdata;
+};
+
+struct plugin_manager_runtime
+{
+ struct plugin_manager_schema *plug_mgr;
+ struct session *sess;
+ struct session_message *pending_mq;// message list
+ struct session_message *delivered_mq;// message list
+ struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
+ struct plugin_exdata *plugin_exdata_array;
+ struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
+ int current_session_plugin_id;
+};
+
+struct registered_packet_plugin_schema
+{
+ char ip_protocol;
+ plugin_on_packet_func *on_packet;
+ void *plugin_env;
+};
+
+struct registered_polling_plugin_schema
+{
+ plugin_on_polling_func *on_polling;
+ void *plugin_env;
+};
+
+struct session_mq_subscriber_info
+{
+ int topic_id;
+ int subscriber_idx;
+};
+
+struct registered_session_plugin_schema
+{
+ session_ctx_new_func *on_ctx_new;
+ session_ctx_free_func *on_ctx_free;
+ void *plugin_env;
+ UT_array *registed_session_mq_subscriber_info;
+};
+
+#define PACKET_PULGIN_ID_BASE 0x10000
+#define POLLING_PULGIN_ID_BASE 0x20000
+
+/*******************************
+ * PLUGIN MANAGER INIT & EXIT *
+ *******************************/
+
+#include <dlfcn.h>
+#include "toml/toml.h"
+
+struct plugin_specific
+{
+ char plugin_name[256];
+ plugin_on_load_func *load_cb;
+ plugin_on_unload_func *unload_cb;
+ void *plugin_ctx;
+};
+
+
+thread_local struct session *per_thread_scratch_sess;
+
+inline static void plugin_manager_scratch_session_set(struct session *sess)
+{
+ per_thread_scratch_sess = sess;
+}
+
+inline static struct session *plugin_manager_scratch_session_get()
+{
+ return per_thread_scratch_sess;
+}
+
+inline struct plugin_manager_schema * stellar_plugin_manager_schema_get(struct stellar *st)
+{
+ return st->st_rt->plug_mgr;
+}
+
+inline int stellar_plugin_manager_schema_set(struct stellar *st, struct plugin_manager_schema *pm)
+{
+ if(st->st_rt->plug_mgr)return -1;
+ st->st_rt->plug_mgr=pm;
+ return 0;
+}
+
+UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
+
+static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
+{
+ *spec_num = 0;
+ FILE* fp = fopen(toml_conf_path, "r");
+ if(fp==NULL)return NULL;
+ char errbuf[256];
+ toml_table_t* conf = toml_parse_file(fp, errbuf, sizeof(errbuf));
+ fclose(fp);
+ if (!conf) {
+ fprintf(stderr, "Error parsing toml: %s\n", errbuf);
+ return NULL;
+ }
+ toml_array_t* plugin_array = toml_array_in(conf, "plugin");
+ if(plugin_array==NULL)return NULL;
+ *spec_num = toml_array_nelem(plugin_array);
+ struct plugin_specific* plugins = ALLOC(struct plugin_specific, *spec_num);
+
+ for (int i = 0; i < *spec_num; i++) {
+ toml_table_t* plugin = toml_table_at(plugin_array, i);
+
+ const char *path_raw = toml_raw_in(plugin, "path");
+ const char *init_func_name_raw = toml_raw_in(plugin, "init");
+ const char *exit_func_name_raw = toml_raw_in(plugin, "exit");
+ char *path = NULL;
+ char *init_func_name = NULL;
+ char *exit_func_name = NULL;
+ if (toml_rtos(path_raw, &path) || toml_rtos(init_func_name_raw, &init_func_name) ||
+ toml_rtos(exit_func_name_raw, &exit_func_name))
+ {
+ goto PLUGIN_SPEC_LOAD_ERROR;
+ }
+ void* handle = dlopen(path, RTLD_NOW|RTLD_LAZY|RTLD_GLOBAL);
+ if (!handle) {
+ fprintf(stderr, "Error loading plugin %s: %s\n", path, dlerror());
+ goto PLUGIN_SPEC_LOAD_ERROR;
+ }
+
+ plugins[i].load_cb = (plugin_on_load_func *) dlsym(handle, init_func_name);
+ if (!plugins[i].load_cb) {
+ fprintf(stderr, "Could not load init function %s: %s\n", init_func_name, dlerror());
+ }
+
+ plugins[i].unload_cb = (plugin_on_unload_func *) dlsym(handle, exit_func_name);
+ if (!plugins[i].unload_cb) {
+ fprintf(stderr, "Could not load exit function %s: %s\n", exit_func_name, dlerror());
+ }
+ FREE(path);
+ FREE(init_func_name);
+ FREE(exit_func_name);
+ }
+ toml_free(conf);
+ return plugins;
+PLUGIN_SPEC_LOAD_ERROR:
+ toml_free(conf);
+ FREE(plugins);
+ return NULL;
+}
+
+#include "session_priv.h"
+static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg)
+{
+ struct session *cur_sess = plugin_manager_scratch_session_get();
+ if(msg && cur_sess)session_free_tcp_segment(cur_sess, (struct tcp_segment *)msg);
+}
+
+struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path)
+{
+ int spec_num;
+ struct plugin_specific *specs = plugin_specs_load(plugin_spec_file_path, &spec_num);
+ if(spec_num < 0)
+ {
+ return NULL;
+ }
+ struct plugin_manager_schema *pm = ALLOC(struct plugin_manager_schema, 1);
+ if(spec_num > 0)
+ {
+ utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd);
+ utarray_reserve(pm->plugin_load_specs_array, spec_num);
+ }
+
+ pm->st = st;
+ stellar_plugin_manager_schema_set(st, pm);
+
+
+ pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
+ pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
+ pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
+ pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
+ pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
+
+ for(int i = 0; i < spec_num; i++)
+ {
+ if (specs[i].load_cb != NULL)
+ {
+ specs[i].plugin_ctx=specs[i].load_cb(st);
+ utarray_push_back(pm->plugin_load_specs_array, &specs[i]);
+ }
+ }
+ FREE(specs);
+ return pm;
+}
+
+void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
+{
+ struct plugin_specific *p=NULL;
+ if (plug_mgr->plugin_load_specs_array)
+ {
+ while ((p = (struct plugin_specific *)utarray_next(plug_mgr->plugin_load_specs_array, p)))
+ {
+ if (p->unload_cb)
+ p->unload_cb(p->plugin_ctx);
+ }
+ utarray_free(plug_mgr->plugin_load_specs_array);
+ }
+ if(plug_mgr->session_mq_schema_array)
+ {
+ for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++)
+ {
+ stellar_session_mq_destroy_topic(plug_mgr->st, i);
+ }
+ utarray_free(plug_mgr->session_mq_schema_array);
+ }
+ if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
+ if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array);
+ if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
+ if(plug_mgr->registered_session_plugin_array)
+ {
+ struct registered_session_plugin_schema *s = NULL;
+ while ((s = (struct registered_session_plugin_schema *)utarray_next(plug_mgr->registered_session_plugin_array, s)))
+ {
+ if(s->registed_session_mq_subscriber_info)utarray_free(s->registed_session_mq_subscriber_info);
+ }
+ utarray_free(plug_mgr->registered_session_plugin_array);
+ }
+ FREE(plug_mgr);
+ return;
+}
+
+
+/*******************************
+ * SESSION EXDATA *
+ *******************************/
+
+static void session_exdata_met_copy(void *_dst, const void *_src)
+{
+ struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src;
+ dst->free_func = src->free_func;
+ dst->free_arg = src->free_arg;
+ dst->idx = src->idx;
+ dst->name = src->name ? strdup(src->name) : NULL;
+}
+
+static void session_exdata_met_dtor(void *_elt)
+{
+ struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt;
+ if (elt->name)
+ FREE(elt->name);
+}
+
+UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor};
+
+
+int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->session_exdata_schema_array == NULL)
+ {
+ utarray_new(plug_mgr->session_exdata_schema_array, &session_exdata_meta_icd);
+ }
+ if(plug_mgr->session_exdata_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
+ struct session_exdata_schema *t_schema;
+ for(unsigned int i = 0; i < len; i++)
+ {
+ t_schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
+ if(strcmp(t_schema->name, name) == 0)
+ {
+ return t_schema->idx;
+ }
+ }
+ struct session_exdata_schema new_schema;
+ memset(&new_schema, 0, sizeof(struct session_exdata_schema));
+ new_schema.free_func=free_func;
+ new_schema.name=(char *)name;
+ new_schema.idx=len;
+ new_schema.free_arg=free_arg;
+ utarray_push_back(plug_mgr->session_exdata_schema_array, &new_schema);
+ return new_schema.idx;
+}
+
+int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt == NULL)return -1;
+ if(plug_mgr_rt->plug_mgr->session_exdata_schema_array == NULL)return -1;
+ unsigned int len=utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
+ if(len < (unsigned int)idx)return -1;
+ if(plug_mgr_rt->plugin_exdata_array==NULL)return -1;
+ (plug_mgr_rt->plugin_exdata_array+idx)->exdata=ex_ptr;
+ return 0;
+}
+
+void *session_exdata_get(struct session *sess, int idx)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt == NULL)return NULL;
+ if(plug_mgr_rt->plug_mgr->session_exdata_schema_array==NULL)return NULL;
+ unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
+ if(len < (unsigned int)idx)return NULL;
+ return (plug_mgr_rt->plugin_exdata_array+idx)->exdata;
+}
+
+/*******************************
+ * SESSION MQ *
+ *******************************/
+
+
+
+static void session_mq_topic_schema_copy(void *_dst, const void *_src)
+{
+ struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst,
+ *src = (struct session_mq_topic_schema *)_src;
+ dst->subscribers = src->subscribers;
+ dst->free_cb = src->free_cb;
+ dst->free_cb_arg = src->free_cb_arg;
+ dst->topic_id = src->topic_id;
+ dst->subscriber_cnt = src->subscriber_cnt;
+ dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
+}
+
+static void session_mq_topic_schema_dtor(void *_elt)
+{
+ struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt;
+ if (elt->topic_name)
+ FREE(elt->topic_name);
+ // FREE(elt); // free the item
+}
+
+UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor};
+
+void session_mq_free(struct session_message *head)
+{
+ struct session_message *elt, *tmp;
+ DL_FOREACH_SAFE(head, elt, tmp)
+ {
+ DL_DELETE(head, elt);
+ FREE(elt);
+ }
+ FREE(head);
+}
+
+int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);;
+ if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ struct session_mq_topic_schema *t_schema;
+ for(unsigned int i = 0; i < len; i++)
+ {
+ t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i);
+ if(strcmp(t_schema->topic_name, topic_name) == 0)
+ {
+ return i;
+ }
+ }
+ return -1;
+}
+
+int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
- // TODO
- return sess;
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->session_mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(len < (unsigned int)topic_id)return -1;
+ struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ if(t_schema == NULL)return -1;
+ t_schema->free_cb=msg_free_cb;
+ t_schema->free_cb_arg=msg_free_arg;
+ return 0;
}
-void plugin_manager_free_ctx(void *ctx)
+int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
- // TODO
- // struct session *sess = (struct session *)ctx;
- // char buff[4096] = {0};
- // session_to_json(sess, buff, sizeof(buff));
- // PLUGIN_MANAGER_LOG_DEBUG("=> SESSION : %s", buff);
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->session_mq_schema_array == NULL)
+ {
+ utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd);
+ }
+ unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(stellar_session_mq_get_topic_id(st, topic_name) >= 0)
+ {
+ return -1;
+ }
+ struct session_mq_topic_schema t_schema;
+ memset(&t_schema, 0, sizeof(struct session_mq_topic_schema));
+ t_schema.free_cb=msg_free_cb;
+ t_schema.topic_name=(char *)topic_name;
+ t_schema.topic_id=len;//topid_id equals arrary index
+ t_schema.free_cb_arg=msg_free_arg;
+ t_schema.subscribers=NULL;
+ t_schema.subscriber_cnt=0;
+ utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema);
+ plug_mgr->topic_num+=1;
+ return t_schema.topic_id;
}
-struct plugin_manager *plugin_manager_new(void)
+int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
{
- // TODO
- static struct plugin_manager mgr;
- return &mgr;
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->session_mq_schema_array==NULL)return 0;
+ unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)
+ return -1;
+ struct session_mq_topic_schema *topic =
+ (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ struct session_mq_subscriber *sub_elt, *sub_tmp;
+
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ DL_DELETE(topic->subscribers, sub_elt);
+ FREE(sub_elt);
+ }
+ }
+ return 0; // success
}
-void plugin_manager_free(struct plugin_manager *mgr)
+int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
- // TODO
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL || topic_id < 0)return -1;
+ if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct session_message *msg= ALLOC(struct session_message,1);
+ msg->topic_id = topic_id;
+ msg->msg_data = data;
+ DL_APPEND(plug_mgr_rt->pending_mq, msg);
+ return 0;
}
-void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt)
+static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
{
- // TODO
- // current implementation only for testing
+ if(bit_value!=0 && bit_value!=1)return -1;
+ if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
+ if(topic_id < 0 || plugin_id < 0)return -1;
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL)return -1;
+ if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range
+ struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
- struct tcp_segment *seg;
- enum session_state state = session_get_state(sess);
+ struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
+ if(session_plugin_schema==NULL)return -1;
+
+ unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
+ if(plug_mgr_rt->session_mq_status)
+ {
+ for(unsigned int i=0; i < plugin_subscriber_num; i++)
+ {
+ struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
+ if(topic_id==session_plugin_sub_info->topic_id)
+ {
+ bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
+ }
+ }
+ return 0;
+ }
+ return -1;
+}
+
+
+int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
+{
+ return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
+
+}
+
+int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id)
+{
+ return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
+}
+
+UT_icd session_mq_subscriber_info_icd = {sizeof(struct session_mq_subscriber_info), NULL, NULL, NULL};
+
+int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
+{
+ if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->session_mq_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
+ if(session_plugin_schema==NULL)return -1;
+
+ struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
+ {
+ utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd);
+ }
+
+ // if plugin already subscribe current topic, return 0
+ struct session_mq_subscriber_info *p=NULL;
+ while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ return 0;
+ };
+
+ struct session_mq_subscriber *new_subscriber = ALLOC(struct session_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->session_plugin_id = plugin_id;
+ new_subscriber->msg_cb = plugin_on_msg_cb;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ struct session_mq_subscriber_info sub_info;
+ memset(&sub_info, 0, sizeof(struct session_mq_subscriber_info));
+ sub_info.topic_id=topic_id;
+ sub_info.subscriber_idx=topic->subscriber_cnt;
+ utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info);
+ topic->subscriber_cnt+=1;
+ plug_mgr->subscriber_num+=1;
+ return 0;
+}
+
+static void plugin_manager_session_message_dispatch(struct session *sess)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL)return;
+
+ struct session_message *mq_elt=NULL, *mq_tmp=NULL;
+ struct session_mq_subscriber *sub_elt, *sub_tmp;
+ struct session_mq_topic_schema *topic;
+ struct registered_session_plugin_schema *session_plugin_schema;
+ struct session_plugin_ctx_runtime *plugin_ctx_rt;
+ while (plug_mgr_rt->pending_mq != NULL)
+ {
+ DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
+ {
+ topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
+ (unsigned int)(mq_elt->topic_id));
+ if (topic)
+ {
+ int cur_sub_idx = 0;
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
+ {
+ plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id);
+ if(plugin_ctx_rt->state==INIT)
+ {
+ if(session_plugin_schema->on_ctx_new)
+ {
+ plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
+ plugin_ctx_rt->state=ACTIVE;
+ }
+ }
+ if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ }
+ cur_sub_idx++;
+ }
+ if (topic->free_cb)
+ {
+ topic->free_cb(mq_elt->msg_data, topic->free_cb_arg);
+ }
+ }
+ DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
+ DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
+ }
+ }
+ return;
+}
+
+/*******************************
+ * PLUGIN MANAGER SESSION RUNTIME *
+ *******************************/
+
+static struct plugin_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
+{
+ struct plugin_exdata *exdata_rt = NULL;
+ if(plug_mgr->session_exdata_schema_array==NULL)return NULL;
+ unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
+ if(len > 0)
+ {
+ exdata_rt=ALLOC(struct plugin_exdata, len);
+ }
+ return exdata_rt;
+}
+
+static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct session *sess, struct plugin_exdata *exdata_rt)
+{
+ if(exdata_rt==NULL)return;
+ if(plug_mgr->session_exdata_schema_array==NULL)return;
+ unsigned int len=utarray_len(plug_mgr->session_exdata_schema_array);
+ for (unsigned int i = 0; i < len; i++)
+ {
+ void *exdata = (exdata_rt + i)->exdata;
+ struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
+ if (exdata)
+ {
+ if (schema->free_func)
+ {
+ schema->free_func(sess, i, exdata, schema->free_arg);
+ }
+ }
+ }
+}
+
+struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
+{
+ if(plug_mgr->registered_session_plugin_array==NULL)return NULL;
+ struct plugin_manager_runtime *rt = ALLOC(struct plugin_manager_runtime, 1);
+ rt->plug_mgr = plug_mgr;
+ rt->sess = sess;
+ rt->pending_mq = NULL;
+ rt->delivered_mq = NULL;
+ rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1);
+ rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr);
+ rt->plugin_ctx_array = ALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
+ return rt;
+
+}
+
+void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
+{
+ if(rt==NULL)return;
+ if(rt->pending_mq != NULL)
+ {
+ session_mq_free(rt->pending_mq);
+ rt->pending_mq=NULL;
+ }
+ if(rt->delivered_mq != NULL)
+ {
+ session_mq_free(rt->delivered_mq);
+ rt->delivered_mq=NULL;
+ }
+ if(rt->session_mq_status != NULL)
+ {
+ bitmap_free(rt->session_mq_status);
+ }
+ unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
+ for(unsigned int i = 0; i < len; i++)
+ {
+ struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i);
+ struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
+ if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE)
+ {
+ session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
+ }
+ }
+ FREE(rt->plugin_ctx_array);
+
+ session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array);
+ FREE(rt->plugin_exdata_array);
+ FREE(rt);
+}
+
+
+/*********************************************
+ * PLUGIN MANAGER PACKET PLUGIN *
+ *********************************************/
+
+
+UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
+
+int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->registered_packet_plugin_array == NULL)
+ {
+ utarray_new(plug_mgr->registered_packet_plugin_array, &registered_packet_plugin_array_icd);
+ }
+ struct registered_packet_plugin_schema packet_plugin_schema;
+ memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
+ packet_plugin_schema.ip_protocol = ip_protocol;
+ packet_plugin_schema.on_packet = on_packet;
+ packet_plugin_schema.plugin_env = plugin_env;
+ utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
+ return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id
+}
+
+void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+{
+ if(plug_mgr == NULL || pkt == NULL)return;
+ if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
+ struct registered_packet_plugin_schema *p=NULL;
+ //unsigned char ip_proto=packet_get_layers(pkt); // FIXME get ip_proto
+ unsigned char ip_proto=0;
+ while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
+ {
+ if(p->ip_protocol == ip_proto && p->on_packet)
+ {
+ p->on_packet(pkt, ip_proto, p->plugin_env);
+ }
+ }
+ return;
+}
+
+/*********************************************
+ * PLUGIN MANAGER POLLING PLUGIN *
+ *********************************************/
+
+
+UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
+
+int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->registered_polling_plugin_array == NULL)
+ {
+ utarray_new(plug_mgr->registered_polling_plugin_array, &registered_polling_plugin_array_icd);
+ }
+ struct registered_polling_plugin_schema polling_plugin_schema;
+ memset(&polling_plugin_schema, 0, sizeof(polling_plugin_schema));
+ polling_plugin_schema.on_polling = on_polling;
+ polling_plugin_schema.plugin_env = plugin_env;
+ utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
+ return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array));// return polling plugin_id
+}
+
+int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr->registered_polling_plugin_array == NULL)return 0;
+ struct registered_polling_plugin_schema *p=NULL;
+ int polling_state=0;
+ while ((p = (struct registered_polling_plugin_schema *)utarray_next(plug_mgr->registered_polling_plugin_array, p)))
+ {
+ if(p->on_polling)
+ {
+ if(p->on_polling(p->plugin_env)==1)
+ {
+ polling_state=1;
+ }
+ }
+ }
+ return polling_state;
+}
+
+/*********************************************
+ * PLUGIN MANAGER SESSION PLUGIN *
+ *********************************************/
+
+
+
+UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
+
+
+int stellar_session_plugin_register(struct stellar *st,
+ session_ctx_new_func session_ctx_new,
+ session_ctx_free_func session_ctx_free,
+ void *plugin_env)
+{
+ struct plugin_manager_schema *plug_mgr = stellar_plugin_manager_schema_get(st);
+ if(plug_mgr->registered_session_plugin_array == NULL)
+ {
+ utarray_new(plug_mgr->registered_session_plugin_array, &registered_session_plugin_schema_icd);
+ }
+ struct registered_session_plugin_schema session_plugin_schema;
+ memset(&session_plugin_schema, 0, sizeof(struct registered_session_plugin_schema));
+ session_plugin_schema.on_ctx_new = session_ctx_new;
+ session_plugin_schema.on_ctx_free = session_ctx_free;
+ session_plugin_schema.plugin_env = plugin_env;
+ utarray_push_back(plug_mgr->registered_session_plugin_array, &session_plugin_schema);
+ return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
+}
+
+void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt)
+{
+ if(sess == NULL || pkt == NULL)return;
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL)return;
+ plugin_manager_scratch_session_set(sess);
+#if 0
+ switch (packet_get_type(pkt))
+ {
+ case TCP:
+ topic_id=plug_mgr_rt->plug_mgr->tcp_topic_id;
+ break;
+ case TCP_STREAM:
+ topic_id=plug_mgr_rt->plug_mgr->tcp_stream_topic_id;
+ break;
+ case UDP:
+ topic_id=plug_mgr_rt->plug_mgr->udp_topic_id;
+ break;
+ case CONTROL:
+ topic_id=plug_mgr_rt->plug_mgr->control_packet_topic_id;
+ break;
+ default:
+ break;
+ }
+#endif
+ struct tcp_segment *seg;
enum session_type type = session_get_type(sess);
- PLUGIN_MANAGER_LOG_DEBUG("=> thread [%d] plugin dispatch session: %u %s %s %s", stellar_get_current_thread_index(), session_get_id(sess), session_get_tuple6_str(sess), session_type_to_str(type), session_state_to_str(state));
- // session_print(sess);
if (packet_is_ctrl(pkt))
{
- // trigger ctrl msg with (mgr, sess, pkt)
- // dispatch_plugin()
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt);
}
else
{
switch (type)
{
case SESSION_TYPE_TCP:
- // trigger TCP msg with (mgr, sess, pkt)
- // dispatch_plugin()
- while ((seg = session_get_tcp_segment(sess)) != NULL)
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt);
+ if((seg = session_get_tcp_segment(sess)) != NULL)
{
- // trigger TCP stream msg with (mgr, sess, seg->data, seg->len)
- // dispatch_plugin()
- session_free_tcp_segment(sess, seg);
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg);
+ //session_free_tcp_segment(sess, seg);
}
break;
case SESSION_TYPE_UDP:
- // trigger UDP msg with (mgr, sess, pkt)
- // dispatch_plugin()
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt);
break;
default:
assert(0);
break;
}
}
+ //TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
+ plugin_manager_session_message_dispatch(sess);
+ plugin_manager_scratch_session_set(NULL);
+ return;
}
-void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt)
+void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt)
{
- // TODO
-} \ No newline at end of file
+ if(sess == NULL || pkt == NULL)return;
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL)return;
+ plugin_manager_scratch_session_set(sess);
+ session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
+ plugin_manager_session_message_dispatch(sess);
+ session_mq_free(plug_mgr_rt->delivered_mq);
+ plug_mgr_rt->delivered_mq=NULL;
+ plugin_manager_scratch_session_set(NULL);
+ return;
+}
+
+void stellar_session_plugin_dettach_current_session(struct session *sess)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
+ if(session_plugin_schema==NULL)return;
+
+ unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
+ //TODO: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
+ if(plug_mgr_rt->session_mq_status)
+ {
+ for(unsigned int i=0; i < plugin_subscriber_num; i++)
+ {
+ struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
+ }
+ }
+
+ if(session_plugin_schema->on_ctx_free)
+ {
+ session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env);
+ }
+ (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL;
+ (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT;
+ return;
+}
+
diff --git a/src/plugin/plugin_manager.h b/src/plugin/plugin_manager.h
index 18eb038..ae7577d 100644
--- a/src/plugin/plugin_manager.h
+++ b/src/plugin/plugin_manager.h
@@ -11,16 +11,23 @@ extern "C"
#define PLUGIN_MANAGER_LOG_ERROR(format, ...) LOG_ERROR("plugin manager", format, ##__VA_ARGS__)
#define PLUGIN_MANAGER_LOG_DEBUG(format, ...) LOG_DEBUG("plugin manager", format, ##__VA_ARGS__)
-// per session context
-void *plugin_manager_new_ctx(struct session *sess);
-void plugin_manager_free_ctx(void *ctx);
+struct plugin_manager_schema;
+struct plugin_manager_runtime;
-struct plugin_manager;
-struct plugin_manager *plugin_manager_new(void);
-void plugin_manager_free(struct plugin_manager *mgr);
+struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
+void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
-void plugin_manager_dispatch_session(struct plugin_manager *mgr, struct session *sess, struct packet *pkt);
-void plugin_manager_dispatch_packet(struct plugin_manager *mgr, struct packet *pkt);
+void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
+
+//return polling work state, 0: idle, 1: working
+int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
+
+//publish and dispatch session msg(msg, pkt) on session_mq
+void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt);
+void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt);
+
+struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
+void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
#ifdef __cplusplus
}
diff --git a/src/session/session.cpp b/src/session/session.cpp
index 542dba1..d57b843 100644
--- a/src/session/session.cpp
+++ b/src/session/session.cpp
@@ -1,5 +1,7 @@
+#include "stellar/session.h"
#include <assert.h>
+#include "packet_priv.h"
#include "session_priv.h"
#include "tcp_utils.h"
#include "tcp_reassembly.h"
@@ -65,6 +67,11 @@ const char *session_get_tuple6_str(const struct session *sess)
return sess->tuple_str;
}
+const char *session_get0_readable_addr(struct session *sess)
+{
+ return sess->tuple_str;
+}
+
void session_set_direction(struct session *sess, enum session_direction dir)
{
sess->sess_dir = dir;
@@ -95,6 +102,11 @@ enum session_state session_get_state(const struct session *sess)
return sess->state;
}
+enum session_state session_get_current_state(const struct session *sess)
+{
+ return sess->state;
+}
+
void session_set_type(struct session *sess, enum session_type type)
{
sess->type = type;
@@ -195,6 +207,49 @@ const struct packet *session_get_current_packet(const struct session *sess)
return sess->curr_pkt;
}
+const inline struct packet *session_get0_current_packet(struct session *sess)
+{
+ return sess->curr_pkt;
+}
+
+const char *session_get0_current_payload(struct session *sess, size_t *payload_len)
+{
+ const struct packet *pkt=session_get_current_packet(sess);
+ if(pkt)
+ {
+ const struct packet_layer *pkt_layer=packet_get_innermost_layer(pkt, LAYER_TYPE_ALL);
+ if(pkt_layer)
+ {
+ *payload_len=pkt_layer->pld_len;
+ return pkt_layer->pld_ptr;
+ }
+ }
+ *payload_len=0;
+ return NULL;
+}
+
+int session_is_symmetric(struct session *sess, unsigned char *flag)
+{
+ int is_symmetric=0;
+ if (sess->first_pkt[FLOW_DIRECTION_C2S] && sess->first_pkt[FLOW_DIRECTION_S2C])
+ {
+ if (flag)
+ *flag = (SESSION_SEEN_C2S_FLOW | SESSION_SEEN_S2C_FLOW);
+ is_symmetric = 1;
+ }
+ else if (sess->first_pkt[FLOW_DIRECTION_C2S])
+ {
+ if (flag)
+ *flag = SESSION_SEEN_C2S_FLOW;
+ }
+ else if (sess->first_pkt[FLOW_DIRECTION_S2C])
+ {
+ if (flag)
+ *flag = SESSION_SEEN_S2C_FLOW;
+ }
+ return is_symmetric;
+}
+
void session_set_user_data(struct session *sess, void *user_data)
{
sess->user_data = user_data;
diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt
index a8ddb9e..61e1587 100644
--- a/src/stellar/CMakeLists.txt
+++ b/src/stellar/CMakeLists.txt
@@ -2,5 +2,7 @@ add_library(core config.cpp stat.cpp stellar.cpp inject.cpp)
target_link_libraries(core timestamp plugin_manager session_manager ip_reassembly packet_io pthread fieldstat4 toml)
add_executable(stellar main.cpp)
-target_link_libraries(stellar core plugin_manager)
-install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program) \ No newline at end of file
+target_link_libraries(stellar core)
+target_link_libraries(stellar "-rdynamic")
+install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT Program)
+
diff --git a/src/stellar/main.cpp b/src/stellar/main.cpp
index f16edc3..4511c86 100644
--- a/src/stellar/main.cpp
+++ b/src/stellar/main.cpp
@@ -65,6 +65,8 @@ static int all_session_have_freed(void)
int main(int argc, char **argv)
{
+ struct stellar st={runtime};
+
timestamp_update();
signal(SIGINT, signal_handler);
@@ -99,8 +101,7 @@ int main(int argc, char **argv)
STELLAR_LOG_ERROR("unable to create stellar stat");
goto error_out;
}
-
- runtime->plug_mgr = plugin_manager_new();
+ runtime->plug_mgr = plugin_manager_init(&st, "./stellar_plugin/spec.toml");
if (runtime->plug_mgr == NULL)
{
STELLAR_LOG_ERROR("unable to create plugin manager");
@@ -148,7 +149,7 @@ error_out:
stellar_thread_join(runtime, config);
stellar_thread_clean(runtime, config);
packet_io_free(runtime->packet_io);
- plugin_manager_free(runtime->plug_mgr);
+ plugin_manager_exit(runtime->plug_mgr);
stellar_stat_free(runtime->stat);
STELLAR_LOG_STATE("stellar exit !!!\n");
log_free();
diff --git a/src/stellar/stellar.cpp b/src/stellar/stellar.cpp
index 219fa22..333defd 100644
--- a/src/stellar/stellar.cpp
+++ b/src/stellar/stellar.cpp
@@ -50,7 +50,7 @@ static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
- plugin_manager_free_ctx(plugin_ctx);
+ plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
else
@@ -70,7 +70,7 @@ static inline void free_expired_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
- plugin_manager_free_ctx(plugin_ctx);
+ plugin_manager_session_runtime_free((struct plugin_manager_runtime*)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
else
@@ -101,7 +101,7 @@ static void *work_thread(void *arg)
struct packet packets[RX_BURST_MAX];
struct session *sess = NULL;
struct packet_io *packet_io = runtime->packet_io;
- struct plugin_manager *plug_mgr = runtime->plug_mgr;
+ struct plugin_manager_schema *plug_mgr = runtime->plug_mgr;
struct stellar_thread *thread = (struct stellar_thread *)arg;
struct ip_reassembly *ip_reass = thread->ip_mgr;
struct session_manager *sess_mgr = thread->sess_mgr;
@@ -136,7 +136,7 @@ static void *work_thread(void *arg)
defraged_pkt = NULL;
pkt = &packets[i];
- plugin_manager_dispatch_packet(plug_mgr, pkt);
+ plugin_manager_on_packet(plug_mgr, pkt);
if (packet_is_fragment(pkt))
{
defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now);
@@ -147,7 +147,7 @@ static void *work_thread(void *arg)
else
{
pkt = defraged_pkt;
- plugin_manager_dispatch_packet(plug_mgr, pkt);
+ plugin_manager_on_packet(plug_mgr, pkt);
}
}
@@ -159,7 +159,7 @@ static void *work_thread(void *arg)
{
goto fast_path;
}
- plugin_ctx = plugin_manager_new_ctx(sess);
+ plugin_ctx = plugin_manager_session_runtime_new(runtime->plug_mgr, sess);
session_set_user_data(sess, plugin_ctx);
}
else
@@ -173,9 +173,10 @@ static void *work_thread(void *arg)
{
packet_set_session_id(pkt, session_get_id(sess));
}
- plugin_manager_dispatch_session(plug_mgr, sess, pkt);
+ plugin_manager_on_session_ingress(sess, pkt);
fast_path:
+ plugin_manager_on_session_egress(sess, pkt);
update_session_stat(sess, pkt);
if (packet_get_action(pkt) == PACKET_ACTION_DROP)
{
@@ -218,7 +219,8 @@ static void *work_thread(void *arg)
ip_reassembly_expire(ip_reass, now);
// TODO
- // plugin_manager_cron();
+ plugin_manager_on_polling(runtime->plug_mgr);
+ // session_manager_cron();
// poll_non_packet_events();
if (nr_recv == 0)
diff --git a/src/stellar/stellar_priv.h b/src/stellar/stellar_priv.h
index 8a2f9b8..3f6faf4 100644
--- a/src/stellar/stellar_priv.h
+++ b/src/stellar/stellar_priv.h
@@ -30,10 +30,16 @@ struct stellar_runtime
uint64_t stat_last_output_ts;
struct stellar_stat *stat;
struct packet_io *packet_io;
- struct plugin_manager *plug_mgr;
+ struct plugin_manager_schema *plug_mgr;
struct stellar_thread threads[MAX_THREAD_NUM];
};
+//FIXME rename stellar_runtime to stellar
+struct stellar
+{
+ struct stellar_runtime *st_rt;
+};
+
extern struct stellar_runtime *runtime;
extern struct stellar_config *config;