summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-08-06 20:37:59 +0800
committerluwenpeng <[email protected]>2024-08-12 15:45:50 +0800
commit6786372449b70caaaf6824e1ed0e59800659b2d6 (patch)
tree869aaa46825ce60d224338f8040f1a1b35f5df00
parentee695957205fceb7990d123f6cccf7de3e58c12c (diff)
✨ feat(plugin manager integration): packet and session exdata&mq
-rw-r--r--deps/bitmap/bitmap.c17
-rw-r--r--deps/bitmap/bitmap.h6
-rw-r--r--include/CMakeLists.txt4
-rw-r--r--include/stellar/session_exdata.h17
-rw-r--r--include/stellar/session_mq.h33
-rw-r--r--include/stellar/stellar.h4
-rw-r--r--include/stellar/stellar_exdata.h23
-rw-r--r--include/stellar/stellar_mq.h51
-rw-r--r--include/stellar/utils.h8
-rw-r--r--src/id_generator/CMakeLists.txt3
-rw-r--r--src/id_generator/id_generator.cpp4
-rw-r--r--src/packet/packet_def.h1
-rw-r--r--src/packet/packet_utils.cpp10
-rw-r--r--src/packet/packet_utils.h3
-rw-r--r--src/plugin/CMakeLists.txt6
-rw-r--r--src/plugin/plugin_manager.cpp1049
-rw-r--r--src/plugin/plugin_manager.h13
-rw-r--r--src/plugin/plugin_manager_interna.h192
-rw-r--r--src/plugin/test/CMakeLists.txt17
-rw-r--r--src/plugin/test/plugin_manager_gtest_main.cpp2312
-rw-r--r--src/plugin/test/plugin_manager_gtest_mock.h116
-rw-r--r--src/stellar/CMakeLists.txt11
-rw-r--r--src/stellar/stellar_core.cpp20
-rw-r--r--src/stellar/stellar_core.h1
-rw-r--r--src/stellar/version.map8
-rw-r--r--test/packet_inject/CMakeLists.txt7
-rw-r--r--test/packet_inject/packet_inject.cpp8
27 files changed, 3437 insertions, 507 deletions
diff --git a/deps/bitmap/bitmap.c b/deps/bitmap/bitmap.c
index 59fb4b7..068bc3f 100644
--- a/deps/bitmap/bitmap.c
+++ b/deps/bitmap/bitmap.c
@@ -46,8 +46,21 @@ void bitmap_free(struct bitmap *bmp) {
}
}
-
-
+int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length) {
+ if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) {
+ return -1; // Return error code if coordinates are out of bounds
+ }
+ int idx = y * bmp->width + x;
+ if (idx + length > bmp->width * bmp->height) {
+ return -1; // Return error if range exceeds bitmap bounds
+ }
+ for (int i = 0; i < length; i++) {
+ if (bmp->data[(idx + i) / 8] & (1 << ((idx + i) % 8))) {
+ return 0; // Return 0 if any bit is not zero
+ }
+ }
+ return 1; // Return 1 if all bits are zero
+}
int test_bitmap() {
struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap
diff --git a/deps/bitmap/bitmap.h b/deps/bitmap/bitmap.h
index 210ea35..5fbf469 100644
--- a/deps/bitmap/bitmap.h
+++ b/deps/bitmap/bitmap.h
@@ -1,5 +1,9 @@
+#pragma once
+
struct bitmap;
struct bitmap * bitmap_new(int width, int height, int value);
int bitmap_set(struct bitmap *bmp, int x, int y, int value);
int bitmap_get(struct bitmap *bmp, int x, int y);
-void bitmap_free(struct bitmap *bmp); \ No newline at end of file
+void bitmap_free(struct bitmap *bmp);
+
+int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length);
diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt
index 2e5b316..12742bd 100644
--- a/include/CMakeLists.txt
+++ b/include/CMakeLists.txt
@@ -4,5 +4,5 @@ install(FILES stellar/tunnel.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
install(FILES stellar/packet.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
install(FILES stellar/session.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
install(FILES stellar/stellar.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
-install(FILES stellar/session_mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
-install(FILES stellar/session_exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES) \ No newline at end of file
+install(FILES stellar/stellar_mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
+install(FILES stellar/stellar_exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES) \ No newline at end of file
diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h
deleted file mode 100644
index a10139e..0000000
--- a/include/stellar/session_exdata.h
+++ /dev/null
@@ -1,17 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "stellar.h"
-
-typedef void session_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg);
-int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *arg);
-int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
-void *session_exdata_get(struct session *sess, int idx);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
deleted file mode 100644
index 2ad9f9f..0000000
--- a/include/stellar/session_mq.h
+++ /dev/null
@@ -1,33 +0,0 @@
-#pragma once
-
-#ifdef __cplusplus
-extern "C"
-{
-#endif
-
-#include "stellar.h"
-
-//session mq
-typedef void msg_free_cb_func(void *msg, void *msg_free_arg);
-typedef void on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
-
-//return topic_id
-int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name);
-
-int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg);
-
-int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id);
-
-//return 0 if success, otherwise return -1.
-int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id);
-
-int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
-
-int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
-int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
-
-#ifdef __cplusplus
-}
-#endif \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
index 3237455..9af384c 100644
--- a/include/stellar/stellar.h
+++ b/include/stellar/stellar.h
@@ -6,7 +6,6 @@ extern "C"
#endif
#include <stdint.h>
-#include "stellar/session.h"
struct session;
struct stellar;
@@ -48,6 +47,9 @@ typedef int plugin_on_polling_func(void *plugin_env);
//return polling plugin_id
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env);
+void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str);
+
+int stellar_get_worker_thread_num(struct stellar *st);
uint16_t stellar_get_current_thread_index();
// only send user crafted packet, can't send packet which come from network
void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt);
diff --git a/include/stellar/stellar_exdata.h b/include/stellar/stellar_exdata.h
new file mode 100644
index 0000000..605cb12
--- /dev/null
+++ b/include/stellar/stellar_exdata.h
@@ -0,0 +1,23 @@
+#pragma once
+
+#include "stellar.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+typedef void stellar_exdata_free(int idx, void *ex_ptr, void *arg);
+int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg);
+
+//packet exdata api
+int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr);
+void *packet_exdata_get(struct packet *pkt, int idx);
+
+//session exdata api
+int session_exdata_set(struct session *sess, int idx, void *ex_ptr);
+void *session_exdata_get(struct session *sess, int idx);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h
new file mode 100644
index 0000000..9267868
--- /dev/null
+++ b/include/stellar/stellar_mq.h
@@ -0,0 +1,51 @@
+#pragma once
+#include "stellar.h"
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+//topic api
+typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg);
+
+//return topic_id
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name);
+int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg);
+int stellar_mq_destroy_topic(struct stellar *st, int topic_id);
+
+
+enum stellar_mq_priority
+{
+ STELLAR_MQ_PRIORITY_LOW,
+ STELLAR_MQ_PRIORITY_NORMAL,
+ STELLAR_MQ_PRIORITY_HIGH,
+ STELLAR_MQ_PRIORITY_MAX,
+};
+
+//session mq api
+typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env);
+
+//return 0 if success, otherwise return -1.
+int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id);
+int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
+int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum stellar_mq_priority priority);
+
+int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id);
+int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id);
+
+int session_mq_topic_is_active(struct session *sess, int topic_id);
+
+
+//packet mq api
+
+typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env);
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg);
+int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *msg, enum stellar_mq_priority priority);
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/include/stellar/utils.h b/include/stellar/utils.h
index fb1dc5b..d6ed97e 100644
--- a/include/stellar/utils.h
+++ b/include/stellar/utils.h
@@ -31,6 +31,14 @@
(type *)( (char *)__mptr - offsetof(type,member) );})
#endif
+#ifndef likely
+#define likely(x) __builtin_expect((x),1)
+#endif /* likely */
+
+#ifndef unlikely
+#define unlikely(x) __builtin_expect((x),0)
+#endif /* unlikely */
+
#ifndef __unused
#define __unused __attribute__((__unused__))
#endif
diff --git a/src/id_generator/CMakeLists.txt b/src/id_generator/CMakeLists.txt
index b18e1d8..4f05afe 100644
--- a/src/id_generator/CMakeLists.txt
+++ b/src/id_generator/CMakeLists.txt
@@ -1,4 +1,5 @@
add_library(id_generator id_generator.cpp)
target_include_directories(id_generator PUBLIC ${CMAKE_CURRENT_LIST_DIR})
+target_include_directories(id_generator PUBLIC ${CMAKE_SOURCE_DIR}/include/stellar)
target_include_directories(id_generator PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar)
-target_link_libraries(id_generator log stellar_core) \ No newline at end of file
+target_link_libraries(id_generator PRIVATE stellar_core log) \ No newline at end of file
diff --git a/src/id_generator/id_generator.cpp b/src/id_generator/id_generator.cpp
index ec6e245..2c7e9f8 100644
--- a/src/id_generator/id_generator.cpp
+++ b/src/id_generator/id_generator.cpp
@@ -3,7 +3,7 @@
#include "log.h"
#include "macro.h"
-#include "stellar_core.h"
+#include "stellar.h"
#include "id_generator.h"
#define ID_GENERATOR_LOG_ERROR(format, ...) LOG_ERROR("id generator", format, ##__VA_ARGS__)
@@ -66,7 +66,7 @@ uint64_t id_generator_alloc(uint64_t now_sec)
#define MAX_ID_PER_THREAD (32768)
#define MAX_ID_BASE_TIME (268435456L)
- uint64_t thr_idx = stellar_get_current_thread_index();
+ uint64_t thr_idx = (uint16_t)stellar_get_current_thread_index();
uint64_t global_id = 0;
uint64_t id_per_thread = (global_id_generator.thread_volatile[thr_idx]++) % MAX_ID_PER_THREAD;
diff --git a/src/packet/packet_def.h b/src/packet/packet_def.h
index 9c4b977..cc65e19 100644
--- a/src/packet/packet_def.h
+++ b/src/packet/packet_def.h
@@ -44,6 +44,7 @@ struct raw_layer
struct packet
{
+ void * user_data;
struct raw_layer layers[PACKET_MAX_LAYERS];
struct raw_layer *frag_layer; // fragment layer
int8_t layers_used;
diff --git a/src/packet/packet_utils.cpp b/src/packet/packet_utils.cpp
index b2be075..873b224 100644
--- a/src/packet/packet_utils.cpp
+++ b/src/packet/packet_utils.cpp
@@ -546,3 +546,13 @@ void layer_convert(const struct raw_layer *in, struct layer *out)
out->hdr_len = in->hdr_len;
out->hdr.raw = (char *)in->hdr_ptr;
}
+
+void packet_set_user_data(struct packet *pkt, void *data)
+{
+ pkt->user_data=data;
+}
+
+void *packet_get_user_data(struct packet *pkt)
+{
+ return pkt->user_data;
+} \ No newline at end of file
diff --git a/src/packet/packet_utils.h b/src/packet/packet_utils.h
index bdafbc8..fcad090 100644
--- a/src/packet/packet_utils.h
+++ b/src/packet/packet_utils.h
@@ -36,6 +36,9 @@ void packet_set_action(struct packet *pkt, enum packet_action action);
enum packet_action packet_get_action(const struct packet *pkt);
+void *packet_get_user_data(struct packet *pkt);
+void packet_set_user_data(struct packet *pkt, void *data);
+
/******************************************************************************
* tuple uitls
******************************************************************************/
diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt
index 23c254b..3c08c28 100644
--- a/src/plugin/CMakeLists.txt
+++ b/src/plugin/CMakeLists.txt
@@ -1,4 +1,8 @@
add_library(plugin_manager plugin_manager.cpp)
target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR})
+target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/)
+target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/)
target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/)
-target_link_libraries(plugin_manager bitmap toml session_manager stellar_core ${CMAKE_DL_LIBS}) \ No newline at end of file
+target_link_libraries(plugin_manager bitmap toml ${CMAKE_DL_LIBS})
+
+add_subdirectory(test) \ No newline at end of file
diff --git a/src/plugin/plugin_manager.cpp b/src/plugin/plugin_manager.cpp
index 0a31e5b..96e1cc0 100644
--- a/src/plugin/plugin_manager.cpp
+++ b/src/plugin/plugin_manager.cpp
@@ -1,145 +1,17 @@
-#include <assert.h>
-#include "plugin_manager.h"
-#include "session_utils.h"
-#include "packet_utils.h"
-#include "stellar_core.h"
+#include "plugin_manager_interna.h"
+#include "stellar/session.h"
#include "stellar/utils.h"
-#include "stellar/session_exdata.h"
-#include "stellar/session_mq.h"
-#include "tcp_reassembly.h"
-
-extern "C"
-{
- #include "uthash/utlist.h"
- #include "uthash/utarray.h"
- #include "bitmap/bitmap.h"
-}
-
-struct plugin_manager_schema
-{
- struct stellar *st;
- UT_array *session_exdata_schema_array;
- UT_array *plugin_load_specs_array;
- UT_array *session_mq_schema_array;
- UT_array *registered_session_plugin_array;
- UT_array *registered_packet_plugin_array;
- UT_array *registered_polling_plugin_array;
- int topic_num;
- int subscriber_num;
- int tcp_topic_id;
- int udp_topic_id;
- int tcp_stream_topic_id;
- int egress_topic_id;
- int control_packet_topic_id;
-};
-
-
-struct session_exdata_schema
-{
- char *name;
- session_exdata_free *free_func;
- void *free_arg;
- int idx;
-};
-
-struct session_message
-{
- int topic_id;
- void *msg_data;
- struct session_message *next, *prev;
-};
-
-typedef struct session_mq_subscriber
-{
- int topic_subscriber_idx;
- int session_plugin_id;
- on_msg_cb_func *msg_cb;
- struct session_mq_subscriber *next, *prev;
-}session_mq_subscribers;
-
-struct session_mq_topic_schema
-{
- char *topic_name;
- msg_free_cb_func *free_cb;
- void *free_cb_arg;
- int topic_id;
- int subscriber_cnt;
- struct session_mq_subscriber *subscribers;
-};
-
-enum plugin_ctx_state
-{ INIT, ACTIVE, EXIT };
-
-struct session_plugin_ctx_runtime
-{
- enum plugin_ctx_state state;
- int session_plugin_id;
- void *plugin_ctx;
-};
-
-struct plugin_exdata
-{
- void *exdata;
-};
-
-struct plugin_manager_runtime
-{
- struct plugin_manager_schema *plug_mgr;
- struct session *sess;
- struct session_message *pending_mq;// message list
- struct session_message *delivered_mq;// message list
- struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
- struct plugin_exdata *plugin_exdata_array;
- struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
- int current_session_plugin_id;
-};
-
-struct registered_packet_plugin_schema
-{
- char ip_protocol;
- plugin_on_packet_func *on_packet;
- void *plugin_env;
-};
-
-struct registered_polling_plugin_schema
-{
- plugin_on_polling_func *on_polling;
- void *plugin_env;
-};
-
-struct session_mq_subscriber_info
-{
- int topic_id;
- int subscriber_idx;
-};
-
-struct registered_session_plugin_schema
-{
- session_ctx_new_func *on_ctx_new;
- session_ctx_free_func *on_ctx_free;
- void *plugin_env;
- UT_array *registed_session_mq_subscriber_info;
-};
-
-#define PACKET_PULGIN_ID_BASE 0x10000
-#define POLLING_PULGIN_ID_BASE 0x20000
-
-/*******************************
- * PLUGIN MANAGER INIT & EXIT *
- *******************************/
-
-#include <dlfcn.h>
#include "toml/toml.h"
+#include "uthash/utlist.h"
-struct plugin_specific
-{
- char plugin_name[256];
- plugin_on_load_func *load_cb;
- plugin_on_unload_func *unload_cb;
- void *plugin_ctx;
-};
+#include "stellar/stellar_core.h"
+#include "session/session_utils.h"
+#include "tuple/tuple.h"
+#include "packet/packet_utils.h"
+UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
+// TODO: set scratch_sess to per_thread_data
thread_local struct session *per_thread_scratch_sess;
inline static void plugin_manager_scratch_session_set(struct session *sess)
@@ -152,7 +24,6 @@ inline static struct session *plugin_manager_scratch_session_get()
return per_thread_scratch_sess;
}
-UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num)
{
@@ -169,7 +40,7 @@ static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int
toml_array_t* plugin_array = toml_array_in(conf, "plugin");
if(plugin_array==NULL)return NULL;
*spec_num = toml_array_nelem(plugin_array);
- struct plugin_specific* plugins = ALLOC(struct plugin_specific, *spec_num);
+ struct plugin_specific* plugins = CALLOC(struct plugin_specific, *spec_num);
for (int i = 0; i < *spec_num; i++) {
toml_table_t* plugin = toml_table_at(plugin_array, i);
@@ -212,6 +83,28 @@ PLUGIN_SPEC_LOAD_ERROR:
return NULL;
}
+static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st)
+{
+ if(st == NULL)return NULL;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num);
+ return per_thread_data;
+}
+
+static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st)
+{
+ if(per_thread_data == NULL || st == NULL)return;
+ int thread_num=stellar_get_worker_thread_num(st);
+ struct plugin_manger_per_thread_data *p_data;
+ for (int i = 0; i < thread_num; i++)
+ {
+ p_data=per_thread_data+i;
+ if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array);
+ }
+ FREE(per_thread_data);
+ return;
+}
+
static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg)
{
struct session *cur_sess = plugin_manager_scratch_session_get();
@@ -226,39 +119,39 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char
{
return NULL;
}
- struct plugin_manager_schema *pm = ALLOC(struct plugin_manager_schema, 1);
+ struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1);
+ plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH;
if(spec_num > 0)
{
- utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd);
- utarray_reserve(pm->plugin_load_specs_array, spec_num);
+ utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd);
+ utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num);
}
- pm->st = st;
- stellar_set_plugin_manger(st, pm);
- pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
- pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
- pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
- pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
- pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
+ plug_mgr->st = st;
+ stellar_set_plugin_manger(st, plug_mgr);
+
+
+ plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL);
+ plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL);
+ plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL);
+ plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL);
+ plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL);
for(int i = 0; i < spec_num; i++)
{
if (specs[i].load_cb != NULL)
{
specs[i].plugin_ctx=specs[i].load_cb(st);
- utarray_push_back(pm->plugin_load_specs_array, &specs[i]);
+ utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]);
}
}
FREE(specs);
- return pm;
+ plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st);
+ return plug_mgr;
}
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
{
- if (plug_mgr == NULL)
- {
- return;
- }
struct plugin_specific *p=NULL;
if (plug_mgr->plugin_load_specs_array)
{
@@ -269,17 +162,25 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->plugin_load_specs_array);
}
- if(plug_mgr->session_mq_schema_array)
+ if(plug_mgr->stellar_mq_schema_array)
{
- for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++)
+ for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++)
{
- stellar_session_mq_destroy_topic(plug_mgr->st, i);
+ stellar_mq_destroy_topic( plug_mgr->st, i);
}
- utarray_free(plug_mgr->session_mq_schema_array);
+ utarray_free(plug_mgr->stellar_mq_schema_array);
}
- if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array);
- if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array);
+ if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array);
if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array);
+ if(plug_mgr->registered_packet_plugin_array)
+ {
+ struct registered_packet_plugin_schema *s = NULL;
+ while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s)))
+ {
+ if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info);
+ }
+ utarray_free(plug_mgr->registered_packet_plugin_array);
+ }
if(plug_mgr->registered_session_plugin_array)
{
struct registered_session_plugin_schema *s = NULL;
@@ -289,94 +190,162 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr)
}
utarray_free(plug_mgr->registered_session_plugin_array);
}
+ plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st);
FREE(plug_mgr);
return;
}
-
/*******************************
- * SESSION EXDATA *
+ * STELLAR EXDATA *
*******************************/
-static void session_exdata_met_copy(void *_dst, const void *_src)
+static void stellar_exdata_met_copy(void *_dst, const void *_src)
{
- struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src;
+ struct stellar_exdata_schema *dst = (struct stellar_exdata_schema *)_dst, *src = (struct stellar_exdata_schema *)_src;
dst->free_func = src->free_func;
dst->free_arg = src->free_arg;
dst->idx = src->idx;
dst->name = src->name ? strdup(src->name) : NULL;
}
-static void session_exdata_met_dtor(void *_elt)
+static void stellar_exdata_met_dtor(void *_elt)
{
- struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt;
+ struct stellar_exdata_schema *elt = (struct stellar_exdata_schema *)_elt;
if (elt->name)
FREE(elt->name);
}
-UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor};
+UT_icd stellar_exdata_meta_icd = {sizeof(struct stellar_exdata_schema), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor};
-
-int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg)
+int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg)
{
+ if(st==NULL || name==NULL)return -1;
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->session_exdata_schema_array == NULL)
+ if(plug_mgr->stellar_exdata_schema_array == NULL)
{
- utarray_new(plug_mgr->session_exdata_schema_array, &session_exdata_meta_icd);
+ utarray_new(plug_mgr->stellar_exdata_schema_array, &stellar_exdata_meta_icd);
}
- if(plug_mgr->session_exdata_schema_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
- struct session_exdata_schema *t_schema;
+ if(plug_mgr->stellar_exdata_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
+ struct stellar_exdata_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
- t_schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
+ t_schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
if(strcmp(t_schema->name, name) == 0)
{
+ t_schema->free_func=free_func;
+ t_schema->free_arg=free_arg;
return t_schema->idx;
}
}
- struct session_exdata_schema new_schema;
- memset(&new_schema, 0, sizeof(struct session_exdata_schema));
+ struct stellar_exdata_schema new_schema;
+ memset(&new_schema, 0, sizeof(struct stellar_exdata_schema));
new_schema.free_func=free_func;
new_schema.name=(char *)name;
new_schema.idx=len;
new_schema.free_arg=free_arg;
- utarray_push_back(plug_mgr->session_exdata_schema_array, &new_schema);
+ utarray_push_back(plug_mgr->stellar_exdata_schema_array, &new_schema);
return new_schema.idx;
}
-int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
+int stellar_exdata_set(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx, void *ex_ptr)
{
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
- if(plug_mgr_rt == NULL)return -1;
- if(plug_mgr_rt->plug_mgr->session_exdata_schema_array == NULL)return -1;
- unsigned int len=utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
+ if(exdata_schema == NULL|| exdata_array == NULL)return -1;
+ unsigned int len=utarray_len(exdata_schema);
if(len < (unsigned int)idx)return -1;
- if(plug_mgr_rt->plugin_exdata_array==NULL)return -1;
- (plug_mgr_rt->plugin_exdata_array+idx)->exdata=ex_ptr;
+ if((exdata_array+idx)->state == EXIT)return -1;
+ (exdata_array+idx)->exdata=ex_ptr;
return 0;
}
-void *session_exdata_get(struct session *sess, int idx)
+void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx)
{
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
- if(plug_mgr_rt == NULL)return NULL;
- if(plug_mgr_rt->plug_mgr->session_exdata_schema_array==NULL)return NULL;
- unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array);
+ if(exdata_schema == NULL|| exdata_array == NULL)return NULL;
+ unsigned int len = utarray_len(exdata_schema);
if(len < (unsigned int)idx)return NULL;
- return (plug_mgr_rt->plugin_exdata_array+idx)->exdata;
+ if((exdata_array+idx)->state == EXIT)return NULL;
+ return (exdata_array+idx)->exdata;
}
/*******************************
- * SESSION MQ *
+ * PACKET EXDATA *
+ *******************************/
+static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return NULL;
+ int tid=stellar_get_current_thread_index();
+ if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL)
+ {
+ unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
+ plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len);
+ }
+ return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array;
+}
+
+static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr)
+{
+ if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return;
+ unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array);
+ struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr);
+ if(per_thread_pkt_exdata_arrary == NULL)return;
+ for (unsigned int i = 0; i < len; i++)
+ {
+ void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata;
+ (per_thread_pkt_exdata_arrary + i)->state=EXIT;
+ struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
+ if (exdata)
+ {
+ if (schema->free_func)
+ {
+ schema->free_func(i, exdata, schema->free_arg);
+ }
+ (per_thread_pkt_exdata_arrary + i)->exdata=NULL;
+ }
+ (per_thread_pkt_exdata_arrary + i)->state=INIT;
+ }
+}
+
+int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr)
+{
+ if(pkt == NULL)return -1;
+ struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
+ return stellar_exdata_set(plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr);
+}
+
+void *packet_exdata_get(struct packet *pkt, int idx)
+{
+ if(pkt == NULL)return NULL;
+ struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
+ return stellar_exdata_get( plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx);
+}
+
+/*******************************
+ * SESSION EXDATA *
*******************************/
+int session_exdata_set(struct session *sess, int idx, void *ex_ptr)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt == NULL)return -1;
+ if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array == NULL)return -1;
+ return stellar_exdata_set(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx, ex_ptr);
+}
+void *session_exdata_get(struct session *sess, int idx)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt == NULL)return NULL;
+ if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
+ return stellar_exdata_get(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx);
+}
-static void session_mq_topic_schema_copy(void *_dst, const void *_src)
+/*******************************
+ * STELLAR MQ *
+ *******************************/
+static void stellar_mq_topic_schema_copy(void *_dst, const void *_src)
{
- struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst,
- *src = (struct session_mq_topic_schema *)_src;
+ struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst,
+ *src = (struct stellar_mq_topic_schema *)_src;
dst->subscribers = src->subscribers;
dst->free_cb = src->free_cb;
dst->free_cb_arg = src->free_cb_arg;
@@ -385,36 +354,27 @@ static void session_mq_topic_schema_copy(void *_dst, const void *_src)
dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
-static void session_mq_topic_schema_dtor(void *_elt)
+static void stellar_mq_topic_schema_dtor(void *_elt)
{
- struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt;
+ struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt;
if (elt->topic_name)
FREE(elt->topic_name);
// FREE(elt); // free the item
}
-UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor};
+UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor};
-void session_mq_free(struct session_message *head)
+int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
- struct session_message *elt, *tmp;
- DL_FOREACH_SAFE(head, elt, tmp)
- {
- DL_DELETE(head, elt);
- FREE(elt);
- }
- FREE(head);
-}
+ struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
+ UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
-int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
-{
- struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);;
- if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
- struct session_mq_topic_schema *t_schema;
+ if(topic_name == NULL || mq_schema_array == NULL )return -1;
+ unsigned int len = utarray_len(mq_schema_array);
+ struct stellar_mq_topic_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
- t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i);
+ t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i);
if(strcmp(t_schema->topic_name, topic_name) == 0)
{
return i;
@@ -423,78 +383,338 @@ int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name)
return -1;
}
-int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->session_mq_schema_array == NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array;
+ if(mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(mq_schema_array);
if(len < (unsigned int)topic_id)return -1;
- struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id);
if(t_schema == NULL)return -1;
t_schema->free_cb=msg_free_cb;
t_schema->free_cb_arg=msg_free_arg;
return 0;
}
-int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg)
+int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->session_mq_schema_array == NULL)
+ if(plug_mgr->stellar_mq_schema_array == NULL)
{
- utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd);
+ utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd);
}
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
- if(stellar_session_mq_get_topic_id(st, topic_name) >= 0)
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
+ if(stellar_mq_get_topic_id(st, topic_name) >= 0)
{
return -1;
}
- struct session_mq_topic_schema t_schema;
- memset(&t_schema, 0, sizeof(struct session_mq_topic_schema));
+ struct stellar_mq_topic_schema t_schema;
+ memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema));
t_schema.free_cb=msg_free_cb;
t_schema.topic_name=(char *)topic_name;
t_schema.topic_id=len;//topid_id equals arrary index
t_schema.free_cb_arg=msg_free_arg;
t_schema.subscribers=NULL;
t_schema.subscriber_cnt=0;
- utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema);
- plug_mgr->topic_num+=1;
+ utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema);
+ plug_mgr->stellar_mq_topic_num+=1;
return t_schema.topic_id;
}
-int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id)
+int stellar_mq_destroy_topic(struct stellar *st, int topic_id)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->session_mq_schema_array==NULL)return 0;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
+ if(plug_mgr->stellar_mq_schema_array==NULL)return -1;
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
if (len <= (unsigned int)topic_id)
return -1;
- struct session_mq_topic_schema *topic =
- (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
- struct session_mq_subscriber *sub_elt, *sub_tmp;
+ struct stellar_mq_topic_schema *topic =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+
+ if(topic == NULL)return -1;
+
+ if (topic->is_destroyed == 1)return 0;
+
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ DL_DELETE(topic->subscribers, sub_elt);
+ FREE(sub_elt);
+ }
+ topic->is_destroyed = 1;
+ plug_mgr->stellar_mq_topic_num-=1;
+ return 1; // success
+}
+
+static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority)
+{
+ if(stellar_mq_schema==NULL || topic_id < 0)return -1;
+ unsigned int len = utarray_len(stellar_mq_schema);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct stellar_message *msg= CALLOC(struct stellar_message,1);
+ msg->header.topic_id = topic_id;
+ msg->header.type=type;
+ msg->header.priority = priority;
+ msg->body = data;
+ DL_APPEND(priority_mq[priority], msg);
+ return 0;
+}
+
+UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL};
+
+static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info)
+{
+ if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1;
+
+ unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ if(topic==NULL)return -1;
+
+ // if plugin already subscribe current topic, return 0
+ struct stellar_mq_subscriber_info *p=NULL;
+ while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p)))
+ {
+ if(p->topic_id==topic_id)
+ {
+ struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers;
+ int cnt=0;
+ while(tmp_subscriber)
+ {
+ if(cnt==p->subscriber_idx)
+ {
+ tmp_subscriber->msg_cb=plugin_on_msg_cb;
+ return 0;
+ }
+ cnt++;
+ tmp_subscriber=tmp_subscriber->next;
+ }
+ }
+ };
+
+ struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1);
+ new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
+ new_subscriber->plugin_idx = plugin_idx;
+ new_subscriber->msg_cb = plugin_on_msg_cb;
+ DL_APPEND(topic->subscribers, new_subscriber);
+
+ struct stellar_mq_subscriber_info sub_info;
+ memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info));
+ sub_info.topic_id=topic_id;
+ sub_info.subscriber_idx=topic->subscriber_cnt;
+ utarray_push_back(registed_mq_subscriber_info, &sub_info);
+ topic->subscriber_cnt+=1;
+ plug_mgr->session_topic_subscriber_num+=1;
+ return 0;
+}
+
+static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct registered_session_plugin_schema *session_plugin_schema;
+ struct session_plugin_ctx_runtime *plugin_ctx_rt;
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array,
+ (unsigned int)(mq_elt->header.topic_id));
if (topic)
{
+ int cur_sub_idx = 0;
DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
{
- DL_DELETE(topic->subscribers, sub_elt);
- FREE(sub_elt);
+ plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx;
+ if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0)
+ {
+ plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx);
+ session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(
+ plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (session_plugin_schema)
+ {
+ if (plugin_ctx_rt->state == INIT)
+ {
+ if (session_plugin_schema->on_ctx_new)
+ {
+ plugin_ctx_rt->plugin_ctx =
+ session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
+ if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free)
+ {
+ session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ plugin_ctx_rt->plugin_ctx = NULL;
+ }
+ else
+ {
+ plugin_ctx_rt->state = ACTIVE;
+ }
+ }
+ }
+ if (sub_elt->sess_msg_cb &&
+ bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) !=
+ 0) // ctx_new maybe call detach, need check again
+ {
+ sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ }
+ }
+ }
+ cur_sub_idx++;
}
+ if (cur_sub_idx == 0)
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0);
}
- return 0; // success
+}
+
+static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt)
+{
+ struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
+ struct stellar_mq_subscriber *sub_elt, *sub_tmp;
+ struct registered_packet_plugin_schema *packet_plugin_schema;
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,
+ (unsigned int)(mq_elt->header.topic_id));
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ if (sub_elt->pkt_msg_cb)
+ {
+ packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(
+ plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx);
+ if (packet_plugin_schema)
+ {
+ sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env);
+ }
+ }
+ }
+ }
+}
+
+static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt)
+{
+ struct stellar_message *mq_elt=NULL, *mq_tmp=NULL;
+ int cur_priority = STELLAR_MQ_PRIORITY_HIGH;
+ while(cur_priority >= STELLAR_MQ_PRIORITY_LOW)
+ {
+ if(priority_mq[cur_priority]==NULL)
+ {
+ cur_priority--;
+ continue;
+ }
+ DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp)
+ {
+ if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt);
+ if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt);
+ DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt);
+ DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list
+
+ cur_priority=STELLAR_MQ_PRIORITY_HIGH;
+ break;
+ }
+ }
+ return;
+}
+
+static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array)
+{
+ struct stellar_message *mq_elt, *tmp;
+ struct stellar_mq_topic_schema *topic;
+ DL_FOREACH_SAFE(*head, mq_elt, tmp)
+ {
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array,
+ (unsigned int)(mq_elt->header.topic_id));
+ if (topic && topic->free_cb)
+ {
+ topic->free_cb(mq_elt->body, topic->free_cb_arg);
+ }
+ DL_DELETE(*head, mq_elt);
+ FREE(mq_elt);
+ }
+}
+
+/*******************************
+ * PACKET MQ *
+ *******************************/
+
+//return 0 if success, otherwise return -1.
+int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id)
+{
+ if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin
+ int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE;
+
+ struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
+ if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1;
+
+ struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx);
+ if(packet_plugin_schema==NULL)return -1;
+
+ if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL)
+ {
+ utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
+ }
+
+ return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info);
+}
+
+int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *data, enum stellar_mq_priority priority)
+{
+ struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt);
+ int tid = stellar_get_current_thread_index();
+ if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1;
+ if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1;
+ if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
+ {
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1;
+ return 0;
+ }
+ return -1;
+}
+
+int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data)
+{
+ return packet_mq_publish_message_with_priority(pkt, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
+}
+
+/*******************************
+ * SESSION MQ *
+ *******************************/
+
+int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ assert(plug_mgr_rt);
+ if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1;
+ if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1;
+ int tid = stellar_get_current_thread_index();
+ if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0)
+ {
+ plug_mgr_rt->pub_session_msg_cnt+=1;
+ return 0;
+ }
+ return -1;
}
int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
- if(plug_mgr_rt==NULL || topic_id < 0)return -1;
- if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1;
- unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
- struct session_message *msg= ALLOC(struct session_message,1);
- msg->topic_id = topic_id;
- msg->msg_data = data;
- DL_APPEND(plug_mgr_rt->pending_mq, msg);
- return 0;
+ return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL);
+}
+
+static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic)
+{
+ //update topic status
+ switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt))
+ {
+ case 1:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0);
+ break;
+ case 0:
+ bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1);
+ break;
+ default:
+ break;
+ }
+ return;
}
static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value)
@@ -502,10 +722,10 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
if(bit_value!=0 && bit_value!=1)return -1;
if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
if(topic_id < 0 || plugin_id < 0)return -1;
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return -1;
- if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range
- struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
+ if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
if(topic==NULL)return -1;
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id);
@@ -516,18 +736,18 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
- struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
+ struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
if(topic_id==session_plugin_sub_info->topic_id)
{
- bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value);
}
}
+ session_mq_update_topic_status(plug_mgr_rt, topic);
return 0;
}
return -1;
}
-
int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id)
{
return session_mq_set_message_status(sess, topic_id, plugin_id, 0);
@@ -539,131 +759,63 @@ int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_i
return session_mq_set_message_status(sess, topic_id, plugin_id, 1);
}
-UT_icd session_mq_subscriber_info_icd = {sizeof(struct session_mq_subscriber_info), NULL, NULL, NULL};
-
-int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id)
+int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id)
{
- if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin
+ if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
- if(plug_mgr->session_mq_schema_array==NULL)return -1;
- unsigned int len = utarray_len(plug_mgr->session_mq_schema_array);
- if (len <= (unsigned int)topic_id)return -1;
-
- struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
- if(session_plugin_schema==NULL)return -1;
- struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id);
- if(topic==NULL)return -1;
+ if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1;
+ struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id);
+ if(session_plugin_schema==NULL)return -1;
if(session_plugin_schema->registed_session_mq_subscriber_info==NULL)
{
- utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd);
- }
-
- // if plugin already subscribe current topic, return 0
- struct session_mq_subscriber_info *p=NULL;
- while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p)))
- {
- if(p->topic_id==topic_id)
- return 0;
- };
-
- struct session_mq_subscriber *new_subscriber = ALLOC(struct session_mq_subscriber,1);
- new_subscriber->topic_subscriber_idx = topic->subscriber_cnt;
- new_subscriber->session_plugin_id = plugin_id;
- new_subscriber->msg_cb = plugin_on_msg_cb;
- DL_APPEND(topic->subscribers, new_subscriber);
-
- struct session_mq_subscriber_info sub_info;
- memset(&sub_info, 0, sizeof(struct session_mq_subscriber_info));
- sub_info.topic_id=topic_id;
- sub_info.subscriber_idx=topic->subscriber_cnt;
- utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info);
- topic->subscriber_cnt+=1;
- plug_mgr->subscriber_num+=1;
- return 0;
+ utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd);
+ }
+ //session plugin id equals to plugin idx
+ return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info);
}
-static void plugin_manager_session_message_dispatch(struct session *sess)
+int session_mq_topic_is_active(struct session *sess, int topic_id)
{
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
- if(plug_mgr_rt==NULL)return;
-
- struct session_message *mq_elt=NULL, *mq_tmp=NULL;
- struct session_mq_subscriber *sub_elt, *sub_tmp;
- struct session_mq_topic_schema *topic;
- struct registered_session_plugin_schema *session_plugin_schema;
- struct session_plugin_ctx_runtime *plugin_ctx_rt;
- while (plug_mgr_rt->pending_mq != NULL)
- {
- DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp)
- {
- topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array,
- (unsigned int)(mq_elt->topic_id));
- if (topic)
- {
- int cur_sub_idx = 0;
- DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
- {
- if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0)
- {
- plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id);
- session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id);
- if(plugin_ctx_rt->state==INIT)
- {
- if(session_plugin_schema->on_ctx_new)
- {
- plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env);
- plugin_ctx_rt->state=ACTIVE;
- }
- }
- if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx,
- session_plugin_schema->plugin_env);
- }
- cur_sub_idx++;
- }
- if (topic->free_cb)
- {
- topic->free_cb(mq_elt->msg_data, topic->free_cb_arg);
- }
- }
- DL_DELETE(plug_mgr_rt->pending_mq, mq_elt);
- DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list
- }
- }
- return;
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ assert(plug_mgr_rt);
+ if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message
+ if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range
+ if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0;
+ return 1;
}
/*******************************
* PLUGIN MANAGER SESSION RUNTIME *
*******************************/
-
-static struct plugin_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
+static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr)
{
- struct plugin_exdata *exdata_rt = NULL;
- if(plug_mgr->session_exdata_schema_array==NULL)return NULL;
- unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array);
+ struct stellar_exdata *exdata_rt = NULL;
+ if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL;
+ unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array);
if(len > 0)
{
- exdata_rt=ALLOC(struct plugin_exdata, len);
+ exdata_rt=CALLOC(struct stellar_exdata, len);
}
return exdata_rt;
}
-static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct session *sess, struct plugin_exdata *exdata_rt)
+static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt)
{
if(exdata_rt==NULL)return;
- if(plug_mgr->session_exdata_schema_array==NULL)return;
- unsigned int len=utarray_len(plug_mgr->session_exdata_schema_array);
+ if(plug_mgr->stellar_exdata_schema_array==NULL)return;
+ unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array);
for (unsigned int i = 0; i < len; i++)
{
void *exdata = (exdata_rt + i)->exdata;
- struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i);
+ (exdata_rt + i)->state=EXIT;
+ struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i);
if (exdata)
{
if (schema->free_func)
{
- schema->free_func(sess, i, exdata, schema->free_arg);
+ schema->free_func(i, exdata, schema->free_arg);
}
}
}
@@ -671,15 +823,14 @@ static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr,
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess)
{
- if(plug_mgr->registered_session_plugin_array==NULL)return NULL;
- struct plugin_manager_runtime *rt = ALLOC(struct plugin_manager_runtime, 1);
+ struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1);
rt->plug_mgr = plug_mgr;
rt->sess = sess;
- rt->pending_mq = NULL;
- rt->delivered_mq = NULL;
- rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1);
- rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr);
- rt->plugin_ctx_array = ALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
+ rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1);
+ rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1);
+ rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr);
+ if(plug_mgr->registered_session_plugin_array)
+ rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array));
return rt;
}
@@ -687,34 +838,35 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
{
if(rt==NULL)return;
- if(rt->pending_mq != NULL)
- {
- session_mq_free(rt->pending_mq);
- rt->pending_mq=NULL;
- }
- if(rt->delivered_mq != NULL)
- {
- session_mq_free(rt->delivered_mq);
- rt->delivered_mq=NULL;
- }
+
if(rt->session_mq_status != NULL)
{
bitmap_free(rt->session_mq_status);
+ rt->session_mq_status=NULL;
}
- unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
- for(unsigned int i = 0; i < len; i++)
+ if(rt->session_topic_status != NULL)
{
- struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i);
- struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
- if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE)
- {
- session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env);
- }
+ bitmap_free(rt->session_topic_status);
+ rt->session_topic_status=NULL;
}
- FREE(rt->plugin_ctx_array);
-
- session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array);
- FREE(rt->plugin_exdata_array);
+ if (rt->plug_mgr->registered_session_plugin_array)
+ {
+ unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array);
+ for (unsigned int i = 0; i < len; i++)
+ {
+ struct session_plugin_ctx_runtime *plugin_ctx_rt = (rt->plugin_ctx_array + i);
+ struct registered_session_plugin_schema *session_plugin_schema =
+ (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i);
+ if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE)
+ {
+ session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx,
+ session_plugin_schema->plugin_env);
+ }
+ }
+ FREE(rt->plugin_ctx_array);
+ }
+ session_exdata_runtime_free(rt->plug_mgr, rt->sess_exdata_array);
+ FREE(rt->sess_exdata_array);
FREE(rt);
}
@@ -722,11 +874,9 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt)
/*********************************************
* PLUGIN MANAGER PACKET PLUGIN *
*********************************************/
-
-
UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL};
-int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env)
+int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env)
{
struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);
if(plug_mgr->registered_packet_plugin_array == NULL)
@@ -735,20 +885,26 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol
}
struct registered_packet_plugin_schema packet_plugin_schema;
memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema));
- packet_plugin_schema.ip_protocol = ip_protocol;
- packet_plugin_schema.on_packet = on_packet;
+ packet_plugin_schema.ip_protocol = ip_proto;
+ packet_plugin_schema.on_packet = on_packet_cb;
packet_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema);
- return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id
+ return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE
}
-void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
{
- if(plug_mgr == NULL || pkt == NULL)return;
if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
struct registered_packet_plugin_schema *p=NULL;
- //unsigned char ip_proto=packet_get_layers(pkt); // FIXME get ip_proto
- unsigned char ip_proto=0;
+
+ //TODO: get innermost layer ip protocol by packet api
+ struct tuple6 t6;
+ packet_get_innermost_tuple6(pkt, &t6);
+ unsigned char ip_proto=t6.ip_proto;
+
+ int tid=stellar_get_current_thread_index();
+ //TODO : provide public api to reset pub_msg_cnt
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt
while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p)))
{
if(p->ip_protocol == ip_proto && p->on_packet)
@@ -756,14 +912,24 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac
p->on_packet(pkt, ip_proto, p->plugin_env);
}
}
+ stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
return;
}
+void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt)
+{
+ if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return;
+ int tid=stellar_get_current_thread_index();
+ stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt);
+ plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish
+ stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue,
+ plug_mgr->stellar_mq_schema_array);
+ per_thread_packet_exdata_arrary_clean(plug_mgr);
+}
+
/*********************************************
* PLUGIN MANAGER POLLING PLUGIN *
*********************************************/
-
-
UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL};
int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env)
@@ -778,7 +944,7 @@ int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func o
polling_plugin_schema.on_polling = on_polling;
polling_plugin_schema.plugin_env = plugin_env;
utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema);
- return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array));// return polling plugin_id
+ return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE
}
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
@@ -802,12 +968,8 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr)
/*********************************************
* PLUGIN MANAGER SESSION PLUGIN *
*********************************************/
-
-
-
UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL};
-
int stellar_session_plugin_register(struct stellar *st,
session_ctx_new_func session_ctx_new,
session_ctx_free_func session_ctx_free,
@@ -827,13 +989,13 @@ int stellar_session_plugin_register(struct stellar *st,
return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index
}
-void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt)
+void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt)
{
- if(sess == NULL || pkt == NULL)return;
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
- plugin_manager_scratch_session_set(sess);
-#if 0
+#if 0
+ int topic_id = -1;
+ //FIXME: get topic and tcp data by stellar api
switch (packet_get_type(pkt))
{
case TCP:
@@ -850,29 +1012,37 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet
break;
default:
break;
- }
+ }
+
+ plug_mgr_rt->pub_session_msg_cnt=0;
+ session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
+ int tid=stellar_get_current_thread_index();
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
#endif
+
+ plug_mgr_rt->pub_session_msg_cnt=0;
+ plugin_manager_scratch_session_set(sess);
struct tcp_segment *seg;
enum session_type type = session_get_type(sess);
if (packet_is_ctrl(pkt))
{
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
}
else
{
switch (type)
{
case SESSION_TYPE_TCP:
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
if((seg = session_get_tcp_segment(sess)) != NULL)
{
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg, STELLAR_MQ_PRIORITY_HIGH);
//session_free_tcp_segment(sess, seg);
}
break;
case SESSION_TYPE_UDP:
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH);
break;
default:
assert(0);
@@ -880,48 +1050,79 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet
}
}
//TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead
- plugin_manager_session_message_dispatch(sess);
+ int tid=stellar_get_current_thread_index();
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
plugin_manager_scratch_session_set(NULL);
+
+
+
return;
}
-void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt)
+void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt)
{
- if(sess == NULL || pkt == NULL)return;
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
if(plug_mgr_rt==NULL)return;
plugin_manager_scratch_session_set(sess);
- session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt);
- plugin_manager_session_message_dispatch(sess);
- session_mq_free(plug_mgr_rt->delivered_mq);
- plug_mgr_rt->delivered_mq=NULL;
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH);
+ int tid=stellar_get_current_thread_index();
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt);
+ plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
+ stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
plugin_manager_scratch_session_set(NULL);
+ return;
+}
+
+void plugin_manager_on_session_closing(struct session *sess)
+{
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ if(plug_mgr_rt==NULL)return;
+ plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt
+ switch (session_get_type(sess))
+ {
+ case SESSION_TYPE_TCP:
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH);
+ break;
+ case SESSION_TYPE_UDP:
+ session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH);
+ break;
+ default:
+ break;
+ }
+ int tid=stellar_get_current_thread_index();
+ stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL);
+ plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish
+ stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array);
return;
}
void stellar_session_plugin_dettach_current_session(struct session *sess)
{
- struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
+ struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess);
struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id);
if(session_plugin_schema==NULL)return;
-
+ struct stellar_mq_topic_schema *topic=NULL;
unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info);
- //TODO: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
+ //Won't Do: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch
+ //allow plugin register with null ctx_new and ctx_free
if(plug_mgr_rt->session_mq_status)
{
for(unsigned int i=0; i < plugin_subscriber_num; i++)
{
- struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
- bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0);
+ struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i);
+ bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0);
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id);
+ session_mq_update_topic_status(plug_mgr_rt, topic);
}
}
-
- if(session_plugin_schema->on_ctx_free)
+ //dettach in ctx INIT, do not call on_ctx_free immidiately
+ if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free))
{
- session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env);
+ session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env);
+ plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL;
}
- (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL;
- (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT;
- return;
-}
+ plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT;
+ return;
+} \ No newline at end of file
diff --git a/src/plugin/plugin_manager.h b/src/plugin/plugin_manager.h
index 236a1ff..1de5878 100644
--- a/src/plugin/plugin_manager.h
+++ b/src/plugin/plugin_manager.h
@@ -1,5 +1,7 @@
#pragma once
+#include "stellar/stellar.h"
+
#ifdef __cplusplus
extern "C"
{
@@ -11,18 +13,19 @@ struct plugin_manager_runtime;
struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path);
void plugin_manager_exit(struct plugin_manager_schema *plug_mgr);
-void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
-
+void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
+void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt);
//return polling work state, 0: idle, 1: working
int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr);
//publish and dispatch session msg(msg, pkt) on session_mq
-void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt);
-void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt);
+void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt);
+void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt);
+void plugin_manager_on_session_closing(struct session *sess);
struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess);
void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt);
#ifdef __cplusplus
}
-#endif
+#endif \ No newline at end of file
diff --git a/src/plugin/plugin_manager_interna.h b/src/plugin/plugin_manager_interna.h
new file mode 100644
index 0000000..07e462d
--- /dev/null
+++ b/src/plugin/plugin_manager_interna.h
@@ -0,0 +1,192 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "plugin_manager.h"
+
+#include "stellar/stellar.h"
+#include "stellar/stellar_mq.h"
+#include "stellar/stellar_exdata.h"
+
+#include "bitmap/bitmap.h"
+#include "uthash/utarray.h"
+
+struct per_thread_exdata_array
+{
+ struct stellar_exdata *exdata_array;
+};
+
+struct stellar_message;
+
+struct plugin_manger_per_thread_data
+{
+ struct per_thread_exdata_array per_thread_pkt_exdata_array;
+ struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list
+ struct stellar_message *dealth_letter_queue;// dlq list
+ long long pub_packet_msg_cnt;
+};
+
+
+
+struct plugin_manager_schema
+{
+ struct stellar *st;
+ UT_array *plugin_load_specs_array;
+ UT_array *stellar_exdata_schema_array;
+ UT_array *stellar_mq_schema_array;
+ UT_array *registered_session_plugin_array;
+ UT_array *registered_packet_plugin_array;
+ UT_array *registered_polling_plugin_array;
+ int stellar_mq_topic_num;
+ int packet_topic_subscriber_num;
+ int session_topic_subscriber_num;
+ int tcp_topic_id;
+ int tcp_stream_topic_id;
+ int udp_topic_id;
+ int egress_topic_id;
+ int control_packet_topic_id;
+ int max_message_dispatch;
+ struct plugin_manger_per_thread_data *per_thread_data;
+}__attribute__((aligned(sizeof(void*))));
+
+enum plugin_exdata_state
+{ INIT, ACTIVE, EXIT };
+
+struct stellar_exdata
+{
+ void *exdata;
+ enum plugin_exdata_state state;
+};
+
+
+
+struct stellar_exdata_schema
+{
+ char *name;
+ stellar_exdata_free *free_func;
+
+ void *free_arg;
+ int idx;
+}__attribute__((aligned(sizeof(void*))));
+
+
+enum stellar_topic_type
+{
+ ON_SESSION_TOPIC,
+ ON_PACKET_TOPIC,
+};
+
+struct stellar_message
+{
+ struct
+ {
+ int topic_id;
+ enum stellar_topic_type type;
+ enum stellar_mq_priority priority;
+ } header;
+ void *body;
+ struct stellar_message *next, *prev;
+} __attribute__((aligned(sizeof(void *))));
+
+typedef struct stellar_mq_subscriber
+{
+ int topic_subscriber_idx;
+ int plugin_idx;
+ union
+ {
+ on_session_msg_cb_func *sess_msg_cb;
+ on_packet_msg_cb_func *pkt_msg_cb;
+ void *msg_cb;
+ };
+ struct stellar_mq_subscriber *next, *prev;
+}stellar_mq_subscriber __attribute__((aligned(sizeof(void*))));
+
+
+struct stellar_mq_topic_schema
+{
+ char *topic_name;
+ void *free_cb_arg;
+ int topic_id;
+ int subscriber_cnt;
+ int is_destroyed;
+ stellar_msg_free_cb_func *free_cb;
+ struct stellar_mq_subscriber *subscribers;
+}__attribute__((aligned(sizeof(void*))));
+
+
+
+struct session_plugin_ctx_runtime
+{
+ enum plugin_exdata_state state;
+ int session_plugin_id;
+ void *plugin_ctx;
+}__attribute__((aligned(sizeof(void*))));
+
+
+
+struct plugin_manager_runtime
+{
+ struct plugin_manager_schema *plug_mgr;
+ struct session *sess;
+ struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber
+ struct bitmap *session_topic_status; //N bits, N topic
+ struct stellar_exdata *sess_exdata_array;
+ struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free
+ int current_session_plugin_id;
+ int pub_session_msg_cnt;
+}__attribute__((aligned(sizeof(void*))));
+
+struct registered_packet_plugin_schema
+{
+ char ip_protocol;
+ plugin_on_packet_func *on_packet;
+ void *plugin_env;
+ UT_array *registed_packet_mq_subscriber_info;
+}__attribute__((aligned(sizeof(void*))));
+
+struct registered_polling_plugin_schema
+{
+ plugin_on_polling_func *on_polling;
+ void *plugin_env;
+}__attribute__((aligned(sizeof(void*))));
+
+struct stellar_mq_subscriber_info
+{
+ int topic_id;
+ int subscriber_idx;
+}__attribute__((aligned(sizeof(void*))));
+
+struct registered_session_plugin_schema
+{
+ session_ctx_new_func *on_ctx_new;
+ session_ctx_free_func *on_ctx_free;
+ void *plugin_env;
+ UT_array *registed_session_mq_subscriber_info;
+}__attribute__((aligned(sizeof(void*))));
+
+#define SESSION_PULGIN_ID_BASE 0x00000
+#define PACKET_PULGIN_ID_BASE 0x10000
+#define POLLING_PULGIN_ID_BASE 0x20000
+
+/*******************************
+ * PLUGIN MANAGER INIT & EXIT *
+ *******************************/
+
+#define MAX_MSG_PER_DISPATCH 128
+
+#include <dlfcn.h>
+
+struct plugin_specific
+{
+ char plugin_name[256];
+ plugin_on_load_func *load_cb;
+ plugin_on_unload_func *unload_cb;
+ void *plugin_ctx;
+}__attribute__((aligned(sizeof(void*))));
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/src/plugin/test/CMakeLists.txt b/src/plugin/test/CMakeLists.txt
new file mode 100644
index 0000000..7698640
--- /dev/null
+++ b/src/plugin/test/CMakeLists.txt
@@ -0,0 +1,17 @@
+add_executable(gtest_plugin_manager
+plugin_manager_gtest_main.cpp
+)
+
+include_directories(${CMAKE_SOURCE_DIR}/src/plugin/)
+
+target_link_libraries(
+ gtest_plugin_manager
+ plugin_manager
+ dl
+ "-rdynamic"
+ gtest
+ gmock
+)
+
+include(GoogleTest)
+gtest_discover_tests(gtest_plugin_manager) \ No newline at end of file
diff --git a/src/plugin/test/plugin_manager_gtest_main.cpp b/src/plugin/test/plugin_manager_gtest_main.cpp
new file mode 100644
index 0000000..44cb3d4
--- /dev/null
+++ b/src/plugin/test/plugin_manager_gtest_main.cpp
@@ -0,0 +1,2312 @@
+#pragma GCC diagnostic ignored "-Wunused-parameter"
+
+#include <gtest/gtest.h>
+#include "stellar/utils.h"
+
+#include "plugin_manager_gtest_mock.h"
+
+#define STELLAR_INTRINSIC_TOPIC_NUM 5
+#define TOPIC_NAME_MAX 512
+
+void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr)
+{
+ SCOPED_TRACE("whitebox test intrisic metadata");
+
+ EXPECT_TRUE(plug_mgr!=NULL);
+
+ EXPECT_EQ(plug_mgr->st, st);
+
+ //load spec null
+ EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL);
+
+ //session exdata schema null
+ EXPECT_TRUE(plug_mgr->stellar_exdata_schema_array==NULL);
+
+ //session mq schema not null
+ EXPECT_TRUE(plug_mgr->stellar_mq_schema_array!=NULL);
+
+ //registered plugin array null
+ EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL);
+ EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL);
+ EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL);
+
+ int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array);
+ EXPECT_EQ(plug_mgr->stellar_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL
+
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id);
+ EXPECT_STREQ(topic->topic_name, TOPIC_TCP);
+
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id);
+ EXPECT_STREQ(topic->topic_name, TOPIC_TCP_STREAM);
+
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id);
+ EXPECT_STREQ(topic->topic_name, TOPIC_UDP);
+
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id);
+ EXPECT_STREQ(topic->topic_name, TOPIC_EGRESS);
+
+ topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id);
+ EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET);
+
+ //intrinsic topic
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_EGRESS), 0);
+ EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0);
+
+ EXPECT_TRUE(plug_mgr->per_thread_data!=NULL);
+ int thread_num=stellar_get_worker_thread_num(st);
+ for(int i=0; i<thread_num; i++)
+ {
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL);
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL);
+ for(int j=0; j<STELLAR_MQ_PRIORITY_MAX; j++)
+ EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL);
+ }
+}
+
+/***********************************
+ * TEST PLUGIN MANAGER INIT & EXIT *
+ ***********************************/
+
+//TODO: test plugin_specs_load
+
+TEST(plugin_manager_init, init_with_null_toml) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+ plugin_manager_exit(plug_mgr);
+}
+
+/******************************************
+ * TEST PLUGIN MANAGER PACKET PLUGIN INIT *
+ ******************************************/
+
+static void test_mock_packet_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+static void test_mock_overwrite_packet_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+TEST(plugin_manager_init, packet_exdata_new_index_overwrite) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *exdata_name="PACKET_EXDATA";
+ int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st);
+ EXPECT_GE(exdata_idx, 0);
+ int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr);
+ EXPECT_GE(overwrite_idx, 0);
+ EXPECT_EQ(overwrite_idx, exdata_idx);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr(
+ plug_mgr->stellar_exdata_schema_array, (unsigned int)exdata_idx);
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, plug_mgr);
+ EXPECT_EQ(exdata_schema->idx, exdata_idx);
+ EXPECT_STREQ(exdata_schema->name, exdata_name);
+
+ int exdata_num = utarray_len(plug_mgr->stellar_exdata_schema_array);
+ EXPECT_EQ(exdata_num, 1);
+ }
+
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_packet_msg_free(void *msg, void *msg_free_arg){}
+void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){}
+
+TEST(plugin_manager_init, packet_mq_topic_create_and_update) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="PACKET_TOPIC";
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name
+
+ int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+ struct stellar_mq_topic_schema *topic_schema = NULL;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr),
+ -1); // duplicate create, return error
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0;
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema
+ }
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){}
+
+void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){}
+
+TEST(plugin_manager_init, packet_mq_subscribe) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+
+ const char *topic_name="PACKET_TOPIC";
+
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0);
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+
+
+/*******************************************
+ * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME*
+ *******************************************/
+
+#define PACKET_PROTO_PLUGIN_NUM 128
+#define PACKET_EXDATA_NUM 2
+#define PACKET_TOPIC_NUM 2
+#define PACKET_MQ_SUB_NUM 2
+struct packet_plugin_env
+{
+ struct plugin_manager_schema *plug_mgr;
+ int basic_on_packet_called;
+ int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM];
+ int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM];
+ int exdata_set_on_packet_called;
+ int exdata_get_on_packet_called;
+ unsigned int packet_exdata_idx[PACKET_EXDATA_NUM];
+ int exdata_free_called[PACKET_EXDATA_NUM];
+ unsigned int packet_topic_id[PACKET_TOPIC_NUM];
+ unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM];
+ int msg_pub_cnt;
+ int msg_sub_cnt;
+ int msg_free_cnt;
+};
+
+static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+TEST(plugin_manager, packet_plugin_illegal_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array);
+ EXPECT_EQ(packet_plugin_num, 1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+}
+
+static void test_proto_filter_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set
+ EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get
+ env->proto_filter_plugin_called[ip_protocol]+=1;
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_with_proto_filter) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+
+ int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0]));
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, &env);
+ EXPECT_GE(env.proto_filter_plugin_id[i], PACKET_PULGIN_ID_BASE);
+
+
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num);
+ }
+
+ struct packet pkt={&st, IPv4, 0};
+
+ int N_packet=10;
+ for (int j = 0; j < N_packet; j++)
+ {
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ pkt.ip_proto = i;
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+ }
+ plugin_manager_exit(plug_mgr);
+
+ for (int i = 0; i < proto_filter_plugin_num; i++)
+ {
+ EXPECT_EQ(env.proto_filter_plugin_called[i], N_packet);
+ }
+}
+
+
+struct test_exdata
+{
+ struct packet *pkt;
+ struct session *sess;
+ long long value;
+};
+
+static void test_exdata_set_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->exdata_set_on_packet_called+=1;
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ struct test_exdata *exdata_val=CALLOC(struct test_exdata , 1);
+ exdata_val->value=i;
+ exdata_val->pkt=pkt;
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0);
+ }
+ return;
+}
+
+static void test_exdata_get_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+
+ int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0]));
+
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ struct test_exdata *exdata_val=(struct test_exdata *)packet_exdata_get(pkt, env->packet_exdata_idx[i]);
+ EXPECT_EQ(exdata_val->value, i);
+ }
+ env->exdata_get_on_packet_called+=1;
+ return;
+}
+
+static void test_packet_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ struct test_exdata *exdata_val=(struct test_exdata *)ex_ptr;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ EXPECT_EQ(exdata_val->value, idx);
+
+ EXPECT_EQ(packet_exdata_get(exdata_val->pkt, idx), nullptr);// illegal get in exdata_free callback
+ EXPECT_EQ(packet_exdata_set(exdata_val->pkt, idx, exdata_val->pkt), -1);// illegal set in exdata_free callback
+
+ FREE(ex_ptr);
+ env->exdata_free_called[idx]+=1;
+
+ return;
+}
+
+
+TEST(plugin_manager, packet_plugins_share_exdata) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+
+ char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX];
+ int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0]));
+ for(int i=0; i<exdata_idx_len; i++)
+ {
+ sprintf(exdata_name[i], "PACKET_EXDATA_%d", i);
+ env.packet_exdata_idx[i]=stellar_exdata_new_index(&st, exdata_name[i], test_packet_exdata_free, &env);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr(
+ plug_mgr->stellar_exdata_schema_array, env.packet_exdata_idx[i]);
+
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, &env);
+ EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]);
+ EXPECT_STREQ(exdata_schema->name, exdata_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_exdata_schema_array), exdata_idx_len);
+ }
+
+ int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env);
+ EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env);
+ EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+
+ for(int i=0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.exdata_set_on_packet_called, N_packet);
+ EXPECT_EQ(env.exdata_get_on_packet_called, N_packet);
+
+ for(int i=0; i < exdata_idx_len; i++)
+ {
+ EXPECT_EQ(env.exdata_free_called[i], N_packet);
+ }
+}
+
+static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ return;
+}
+
+static void test_mq_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ for(int i=0; i<topic_id_num; i++)
+ {
+ EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[i], pkt), 0);
+ env->msg_pub_cnt+=1;
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_mq_pub_sub) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env);
+ EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ FREE(msg);
+ return;
+}
+
+static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ env->msg_sub_cnt+=1;
+ return;
+}
+
+static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0]));
+ int cnt=0;
+ int *msg;
+ for(int i=0; i<topic_id_num; i++)
+ {
+ for(int j=0; j < MAX_MSG_PER_DISPATCH; j++)
+ {
+ msg=CALLOC(int, 1);
+ *msg=cnt;
+ int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg);
+ if(cnt < MAX_MSG_PER_DISPATCH)
+ {
+ ASSERT_EQ(pub_ret, 0);
+ env->msg_pub_cnt+=1;
+ }
+ else
+ {
+ ASSERT_EQ(pub_ret, -1);
+ }
+ if(pub_ret!=0)FREE(msg);
+ cnt+=1;
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, packet_plugins_pub_overlimit) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX];
+
+ int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0]));
+
+ for(int i=0; i<topic_id_num; i++)
+ {
+ sprintf(topic_name[i], "PACKET_TOPIC_%d", i);
+ env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env);
+ EXPECT_GE(env.packet_topic_id[i], 0);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(
+ plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]);
+ EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func);
+ EXPECT_EQ(topic->free_cb_arg, &env);
+ EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]);
+ EXPECT_STREQ(topic->topic_name, topic_name[i]);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM);
+ }
+
+ int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env);
+ EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE);
+
+ int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0]));
+
+ for (int i = 0; i < topic_sub_num; i++)
+ {
+ env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok
+ EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE);
+ for(int j = 0; j < topic_id_num; j++)
+ {
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0);
+ }
+ }
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1);
+ }
+
+ struct packet pkt={&st, IPv4, ip_proto};
+
+ int N_packet=10;
+ for (int i = 0; i < N_packet; i++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+ EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt);
+ EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num);
+}
+
+
+
+static void test_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)arg;
+ EXPECT_EQ(env->packet_exdata_idx[idx], idx);
+ env->exdata_free_called[idx]+=1;
+ EXPECT_EQ(packet_mq_publish_message((struct packet *)ex_ptr, env->packet_topic_id[0], (struct packet *)ex_ptr), -1);// publish message in packet exdata_free is illegal
+ env->msg_pub_cnt+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg;
+ env->msg_free_cnt+=1;
+ EXPECT_EQ(packet_mq_publish_message((struct packet *)msg, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_EQ(pkt->ip_proto, ip_protocol);
+ EXPECT_EQ(pkt->st, env->plug_mgr->st);
+ EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0);
+ env->basic_on_packet_called+=1;
+ return;
+}
+
+static void test_exdata_free_pub_msg_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env)
+{
+ struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env;
+ EXPECT_EQ(topic_id, env->packet_topic_id[0]);
+ env->msg_sub_cnt+=1;
+}
+
+TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct packet_plugin_env env;
+ memset(&env, 0, sizeof(struct packet_plugin_env));
+ env.plug_mgr=plug_mgr;
+ int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env);
+ EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE);
+
+ env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env);
+ env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env);
+
+ EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0);
+
+
+ struct packet pkt={&st, IPv4, ip_proto};
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_on_packet_called, 1);
+ EXPECT_EQ(env.msg_pub_cnt, 1);
+ EXPECT_EQ(env.msg_sub_cnt, 0);
+ EXPECT_EQ(env.msg_free_cnt, 0);
+ EXPECT_EQ(env.exdata_free_called[0], 1);
+}
+
+/**********************************************
+ * TEST PLUGIN MANAGER ON SESSION PLUGIN INIT *
+ **********************************************/
+
+static void test_mock_session_exdata_free(int idx, void *ex_ptr, void *arg){}
+static void test_mock_overwrite_session_exdata_free(int idx, void *ex_ptr, void *arg){}
+
+TEST(plugin_manager_init, session_exdata_new_index_overwrite) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *exdata_name="SESSION_EXDATA";
+ int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_session_exdata_free, &st);
+ EXPECT_GE(exdata_idx, 0);
+ int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_session_exdata_free, plug_mgr);
+ EXPECT_GE(overwrite_idx, 0);
+ EXPECT_EQ(overwrite_idx, exdata_idx);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr(
+ plug_mgr->stellar_exdata_schema_array, (unsigned int)exdata_idx);
+ EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_session_exdata_free);
+ EXPECT_EQ(exdata_schema->free_arg, plug_mgr);
+ EXPECT_EQ(exdata_schema->idx, exdata_idx);
+ EXPECT_STREQ(exdata_schema->name, exdata_name);
+
+ int exdata_num = utarray_len(plug_mgr->stellar_exdata_schema_array);
+ EXPECT_EQ(exdata_num, 1);
+ }
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_session_msg_free(void *msg, void *msg_free_arg){}
+void test_mock_overwrite_session_msg_free(void *msg, void *msg_free_arg){}
+
+TEST(plugin_manager_init, session_mq_topic_create_and_update) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="SESSION_TOPIC";
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name
+
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id);
+ EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0);
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema =
+ (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_session_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic
+ }
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id
+
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1);
+ EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0;
+
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema
+ }
+ EXPECT_EQ(plug_mgr->stellar_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number
+
+ plugin_manager_exit(plug_mgr);
+}
+
+void test_mock_on_session_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env){}
+void test_mock_overwrite_on_session_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env){}
+
+
+TEST(plugin_manager_init, session_mq_subscribe_overwrite) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ const char *topic_name="SESSION_TOPIC";
+
+ int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st);
+ EXPECT_GE(topic_id, 0);
+
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, 10, test_mock_on_session_msg, 10),-1);//illgeal topic_id & plugin_id
+
+ int plugin_id=stellar_session_plugin_register(&st, NULL, NULL, &st);
+ EXPECT_GE(plugin_id, 0);
+
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, plugin_id),0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_overwrite_on_session_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite
+
+ struct stellar_mq_topic_schema *topic_schema;
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id);
+ EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free);
+ EXPECT_EQ(topic_schema->free_cb_arg, &st);
+ EXPECT_EQ(topic_schema->topic_id, topic_id);
+ EXPECT_STREQ(topic_schema->topic_name, topic_name);
+ }
+
+ EXPECT_EQ(topic_schema->subscriber_cnt, 1);
+ EXPECT_EQ(topic_schema->subscribers->sess_msg_cb, (void *)test_mock_overwrite_on_session_msg);
+
+ plugin_manager_exit(plug_mgr);
+}
+/**********************************************
+ * TEST PLUGIN MANAGER ON SESSION PLUGIN RUNTIME *
+ **********************************************/
+
+struct session_plugin_env
+{
+ struct plugin_manager_schema *plug_mgr;
+ int N_session;
+ int N_per_session_pkt_cnt;
+ int intrinsc_tcp_topic_id;
+ int intrinsc_egress_topic_id;
+ int basic_exdata_idx;
+ int basic_exdata_free_called;
+ int basic_on_session_ingress_called;
+ int basic_on_session_egress_called;
+ int basic_ctx_new_called;
+ int basic_ctx_free_called;
+ int test_mq_pub_plugin_id;
+ int test_mq_sub_plugin_id;
+ int test_mq_pub_called;
+ int test_mq_sub_called;
+ int test_mq_free_called;
+ int test_mq_topic_id;
+ int plugin_id_1;
+ int plugin_id_2;
+ int plugin_id_1_called;
+ int plugin_id_2_called;
+};
+
+TEST(plugin_manager, no_plugin_register_runtime) {
+
+ struct stellar st={0};
+
+// init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// init plugin
+
+// prepare packet and session
+
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+//exit stage
+ plugin_manager_exit(plug_mgr);
+}
+
+
+struct test_basic_ctx
+{
+ int called;
+};
+
+static void *test_basic_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_new_called+=1;
+ struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
+ return ctx;
+}
+
+static void test_basic_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
+ EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress + closing
+ FREE(ctx);
+ return;
+}
+
+static void test_basic_on_session_ingress(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set
+ EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get
+ EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0);
+ if(msg)
+ {
+ env->basic_on_session_ingress_called+=1;
+ ctx->called+=1;
+ }
+ return;
+}
+
+static void test_basic_on_session_egress(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set
+ EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get
+ EXPECT_TRUE(msg!=NULL);
+ EXPECT_EQ(session_exdata_get(sess, env->basic_exdata_idx), sess);
+ env->basic_on_session_egress_called+=1;
+ ctx->called+=1;
+ return;
+}
+
+static void test_basic_session_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)arg;
+ EXPECT_EQ(env->basic_exdata_idx, idx);
+ EXPECT_EQ(session_exdata_get((struct session *)ex_ptr, idx), nullptr);// illegal get in exdata_free callback
+ EXPECT_EQ(session_exdata_set((struct session *)ex_ptr, idx, arg), -1);// illegal set in exdata_free callback
+ env->basic_exdata_free_called+=1;
+}
+
+TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=1;
+
+ int plugin_id=stellar_session_plugin_register(&st, test_basic_session_ctx_new, test_basic_session_ctx_free, &env);
+ EXPECT_GE(plugin_id, 0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0);
+
+ env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_EGRESS);
+ EXPECT_GE(env.intrinsc_egress_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_egress, plugin_id), 0);
+
+ env.basic_exdata_idx=stellar_exdata_new_index(&st, "SESSION_EXDATA", test_basic_session_exdata_free,&env);
+ EXPECT_GE(env.basic_exdata_idx, 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_TRUE(env.basic_on_session_ingress_called == env.basic_on_session_egress_called && env.basic_on_session_ingress_called == env.N_session*env.N_per_session_pkt_cnt);
+ EXPECT_TRUE(env.basic_ctx_new_called == env.basic_ctx_free_called && env.basic_ctx_new_called == env.N_session);
+ EXPECT_EQ(env.basic_exdata_free_called, env.N_session);
+
+}
+
+struct test_session_mq_ctx
+{
+ int called;
+};
+
+static void *test_mq_pub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
+ return ctx;
+}
+
+static void test_mq_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
+ EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt);
+ FREE(ctx);
+ return;
+}
+
+static void *test_mq_sub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
+ EXPECT_EQ(session_mq_ignore_message(sess, env->intrinsc_tcp_topic_id, env->test_mq_sub_plugin_id), 0);
+ return ctx;
+}
+
+static void test_mq_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
+ EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt);
+ FREE(ctx);
+ return;
+}
+
+static void test_mq_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ if (msg)
+ {
+ env->test_mq_pub_called += 1;
+ ctx->called += 1;
+ int *pub_msg = (int *)CALLOC(int, 1);
+ *pub_msg = env->test_mq_pub_called;
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0);
+ }
+ return;
+}
+
+static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(*(int *)msg, env->test_mq_pub_called);
+ env->test_mq_sub_called+=1;
+ ctx->called+=1;
+ return;
+}
+
+static void test_session_msg_free(void *msg, void *msg_free_arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
+ if(msg)
+ {
+ EXPECT_EQ(env->test_mq_pub_called, *(int *)msg);
+ env->test_mq_free_called+=1;
+ FREE(msg);
+ }
+ return;
+}
+
+TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_mq_pub_session_ctx_new, test_mq_pub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_pub_plugin_id, 0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+ env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_mq_sub_session_ctx_new, test_mq_sub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_sub_plugin_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_mq_on_sub_msg, env.test_mq_sub_plugin_id), 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.test_mq_free_called, env.test_mq_pub_called);
+ EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt);
+}
+
+struct test_overlimit_session_mq_ctx
+{
+ int pkt_cnt;
+ int pub_cnt;
+ int sub_cnt;
+};
+
+static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+
+static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt);
+ FREE(ctx);
+ return;
+}
+
+static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1);
+ return ctx;
+}
+
+static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx;
+ EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg
+ FREE(ctx);
+ return;
+}
+
+struct test_overlimit_msg
+{
+ struct session *sess;
+ int called;
+};
+
+static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ struct test_overlimit_msg *pub_msg;
+ if (msg)
+ {
+ env->test_mq_pub_called += 1;
+ ctx->pkt_cnt += 1;
+ for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++)
+ {
+ pub_msg = CALLOC(struct test_overlimit_msg, 1);
+ pub_msg->called = env->test_mq_pub_called;
+ pub_msg->sess=sess;
+ if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg
+ {
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0);
+ ctx->pub_cnt+=1;
+ }
+ else
+ {
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), -1);
+ FREE(pub_msg);
+ }
+ }
+ }
+ return;
+}
+
+static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
+ struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx;
+ EXPECT_TRUE(env!=NULL);
+ EXPECT_TRUE(ctx!=NULL);
+ EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(recv_msg->called, env->test_mq_pub_called);
+ env->test_mq_sub_called+=1;
+ ctx->sub_cnt+=1;
+ return;
+}
+
+static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
+ struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg;
+ if(recv_msg)
+ {
+ EXPECT_EQ(recv_msg->sess->plug_mgr_rt->plug_mgr, env->plug_mgr);
+ EXPECT_EQ(session_mq_publish_message(recv_msg->sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free
+ EXPECT_EQ(env->test_mq_pub_called, recv_msg->called);
+ env->test_mq_free_called+=1;
+ FREE(msg);
+ }
+ return;
+}
+
+TEST(plugin_manager, session_plugin_pub_msg_overlimt) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_pub_plugin_id, 0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+ env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env);
+ EXPECT_GE(env.test_mq_sub_plugin_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session);
+ EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+ EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1));
+
+}
+
+
+static void test_dettach_msg_free(void *msg, void *msg_free_arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg;
+ env->test_mq_free_called+=1;
+ return;
+}
+
+static void *test_dettach_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin
+
+ stellar_session_plugin_dettach_current_session(sess);
+ ctx->called+=1;
+
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin
+
+ return ctx;
+}
+
+static void test_dettach_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
+ EXPECT_EQ(sess->sess_pkt_cnt, 1);// first packet ingress, call ctx_free immediately
+ EXPECT_EQ(ctx->called, 1);
+
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin
+
+ FREE(ctx);
+}
+
+static void test_dettach_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+
+ EXPECT_EQ(topic_id, env->intrinsc_tcp_topic_id);
+
+ ctx->called+=1;
+}
+
+TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) {
+
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ unsigned char ip_proto=6;
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+
+ int plugin_id=stellar_session_plugin_register(&st, test_dettach_session_ctx_new, test_dettach_session_ctx_free, &env);
+ EXPECT_GE(plugin_id,0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0);
+
+ struct packet pkt={&st, TCP, ip_proto};
+
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+ EXPECT_EQ(env.test_mq_free_called,env.N_session*3);
+}
+
+
+
+static void *test_invalid_pub_msg_session_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1);
+ return ctx;
+}
+
+static void test_invalid_pub_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx;
+ EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt+1));
+
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing
+
+ FREE(ctx);
+}
+
+static void test_invalid_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx;
+ ctx->called+=1;
+}
+
+TEST(plugin_manager, session_plugin_pub_on_ctx_free) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env);
+ EXPECT_GE(plugin_id,0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+
+// pesudo packet and session
+
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+}
+
+
+struct test_session_closing_ctx
+{
+ int pkt_called;
+ int session_free_called;
+ int userdefine_on_msg_called;
+};
+
+
+static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1);
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_new_called+=1;
+ return ctx;
+}
+
+static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ env->basic_ctx_free_called+=1;
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx;
+ EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt);
+ EXPECT_EQ(ctx->session_free_called,1);
+ FREE(ctx);
+}
+
+static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ if(msg)ctx->pkt_called+=1;
+ if(sess->state==SESSION_STATE_CLOSING)
+ {
+ EXPECT_EQ(msg,nullptr);
+ ctx->session_free_called+=1;
+ session_mq_publish_message(sess, env->test_mq_topic_id, env);
+ env->test_mq_pub_called+=1;
+ }
+}
+
+static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx;
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ ctx->userdefine_on_msg_called+=1;
+ EXPECT_EQ(msg, plugin_env);
+ EXPECT_EQ(sess->state,SESSION_STATE_CLOSING);
+ env->test_mq_sub_called+=1;
+}
+
+TEST(plugin_manager, session_plugin_pub_msg_on_closing) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env);
+ EXPECT_GE(plugin_id,0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_ctx_new_called,env.N_session);
+ EXPECT_EQ(env.basic_ctx_free_called,env.N_session);
+ EXPECT_EQ(env.test_mq_pub_called,env.N_session);
+ EXPECT_EQ(env.test_mq_sub_called,env.N_session);
+}
+
+struct test_session_called_ctx
+{
+ int called;
+};
+
+static void *test_session_called_ctx_new(struct session *sess, void *plugin_env)
+{
+ struct test_session_called_ctx *ctx=CALLOC(struct test_session_called_ctx, 1);
+ return ctx;
+}
+
+static void test_session_called_ctx_free(struct session *sess, void *session_ctx, void *plugin_env)
+{
+ FREE(session_ctx);
+}
+
+//test session topic is active
+static void test_session_mq_topic_is_active_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_1_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ session_mq_ignore_message(sess, topic_id, env->plugin_id_1);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+ return;
+}
+
+static void test_session_mq_topic_is_active_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_2_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+
+ if(ctx->called > env->N_per_session_pkt_cnt/2)
+ {
+ session_mq_ignore_message(sess, topic_id, env->plugin_id_2);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0);
+ }
+ return;
+}
+
+TEST(plugin_manager, test_session_mq_topic_is_active) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore
+}
+
+//test dettach session
+static void test_session_dettach_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_1_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+ stellar_session_plugin_dettach_current_session(sess);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+ return;
+}
+
+static void test_session_dettach_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_2_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id);
+
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+
+ if(ctx->called > env->N_per_session_pkt_cnt/2)
+ {
+ stellar_session_plugin_dettach_current_session(sess);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0);
+ }
+
+ return;
+}
+
+TEST(plugin_manager, test_session_dettach) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore
+
+}
+
+
+//test dettach session
+static void test_session_mq_priority_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_1_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+ if(topic_id == env->intrinsc_tcp_topic_id)
+ {
+ EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority
+ EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0);
+ }
+ if(topic_id == env->test_mq_topic_id)
+ {
+ if(ctx->called%3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_2);
+ }
+ if(ctx->called%3 == 0)
+ {
+ EXPECT_EQ((int )(long)msg, env->plugin_id_1);
+ }
+ }
+ return;
+}
+
+static void test_session_mq_priority_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx;
+ env->plugin_id_2_called+=1;
+ ctx->called+=1;
+ EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id);
+ EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1);
+
+ if(topic_id == env->intrinsc_tcp_topic_id)
+ {
+ EXPECT_EQ(ctx->called%3, 1);
+ // publish msg has normal priority
+ EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0);
+ }
+ if(topic_id == env->test_mq_topic_id)
+ {
+ if(ctx->called%3 == 2)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_2);
+ }
+ if(ctx->called%3 == 0)
+ {
+ EXPECT_EQ((int)(long)msg, env->plugin_id_1);
+ }
+ }
+ return;
+}
+
+TEST(plugin_manager, test_session_mq_priority) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+ memset(&env, 0, sizeof(struct session_plugin_env));
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_1,0);
+
+ int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env);
+ EXPECT_GE(plugin_id_2,0);
+
+ env.plugin_id_1=plugin_id_1;
+ env.plugin_id_2=plugin_id_2;
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+
+ env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env);
+ EXPECT_GE(env.test_mq_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0);
+
+// pesudo packet and session
+
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_OPENING;
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ sess[i].state=SESSION_STATE_ACTIVE;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].state=SESSION_STATE_CLOSING;
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1)
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3));
+ EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3));
+
+}
+
+void test_session_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)arg;
+ EXPECT_EQ(session_mq_publish_message((struct session *)ex_ptr, env->intrinsc_tcp_topic_id, arg), -1);
+ env->basic_exdata_free_called+=1;
+}
+
+static void test_session_exdata_free_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env)
+{
+ struct session_plugin_env *env = (struct session_plugin_env *)plugin_env;
+ EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0);
+ if(msg)env->plugin_id_1_called+=1;
+}
+
+TEST(plugin_manager, session_exdata_free_pub_msg) {
+
+ struct stellar st={0};
+ struct session_plugin_env env;
+
+// pesudo init stage
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+
+ env.plugin_id_1=stellar_session_plugin_register(&st, NULL, NULL, &env);
+ EXPECT_GE(env.plugin_id_1,0);
+
+ env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP);
+ EXPECT_GE(env.intrinsc_tcp_topic_id, 0);
+ EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0);
+
+ env.basic_exdata_idx=stellar_exdata_new_index(&st, "BASIC_EXDATA", test_session_exdata_free_pub_msg_exdata_free, &env) ;
+ EXPECT_GE(env.basic_exdata_idx, 0);
+
+// pesudo packet and session
+
+ memset(&env, 0, sizeof(struct session_plugin_env));
+ env.plug_mgr=plug_mgr;
+ env.N_per_session_pkt_cnt=10;
+ env.N_session=10;
+
+ struct packet pkt={&st, TCP, 6};
+
+ struct session sess[env.N_session];
+ memset(&sess, 0, sizeof(sess));
+
+// pesudo running stage
+ for(int i=0; i < env.N_session; i++)
+ {
+ sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]);
+ sess[i].type=SESSION_TYPE_TCP;
+ }
+
+ for (int j = 0; j < env.N_per_session_pkt_cnt; j++)
+ {
+ plugin_manager_on_packet_ingress(plug_mgr, &pkt);
+
+ for (int i = 0; i < env.N_session; i++)
+ {
+ sess[i].sess_pkt_cnt+=1;
+ plugin_manager_on_session_ingress(&sess[i], &pkt);
+ plugin_manager_on_session_egress(&sess[i], &pkt);
+ }
+
+ plugin_manager_on_packet_egress(plug_mgr, &pkt);
+ }
+
+ for(int i=0; i < env.N_session; i++)
+ {
+ plugin_manager_on_session_closing(&sess[i]);
+ plugin_manager_session_runtime_free(sess[i].plug_mgr_rt);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.basic_exdata_free_called,env.N_session);
+ EXPECT_EQ(env.plugin_id_1_called,env.N_session*env.N_per_session_pkt_cnt);
+}
+
+
+/**********************************************
+ * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT *
+ **********************************************/
+
+int test_plugin_on_polling_func(void *plugin_env)
+{
+ return 1;
+}
+
+TEST(plugin_manager_init, polling_plugin_register) {
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ st.plug_mgr=plug_mgr;
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+ int plugin_id = stellar_polling_plugin_register(&st, test_plugin_on_polling_func, &st);
+ {
+ SCOPED_TRACE("White-box test, check stellar internal schema");
+ EXPECT_TRUE(plugin_id>=POLLING_PULGIN_ID_BASE);
+ struct registered_polling_plugin_schema *schema = (struct registered_polling_plugin_schema *)utarray_eltptr(
+ plug_mgr->registered_polling_plugin_array, (unsigned int)(plugin_id-POLLING_PULGIN_ID_BASE));
+ EXPECT_EQ(schema->on_polling, (void *)test_plugin_on_polling_func);
+ EXPECT_EQ(schema->plugin_env, &st);
+ EXPECT_EQ(utarray_len(plug_mgr->registered_polling_plugin_array), 1);
+ }
+
+ plugin_manager_exit(plug_mgr);
+}
+
+/**********************************************
+ * TEST PLUGIN MANAGER ON POLLING PLUGIN RUNTIME *
+ **********************************************/
+
+struct polling_plugin_env
+{
+ struct plugin_manager_schema *plug_mgr;
+ int N_polling;
+ int return0_polling_called;
+ int return1_polling_called;
+};
+
+int return1_plugin_on_polling(void *plugin_env)
+{
+ struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env;
+ env->return1_polling_called+=1;
+ return 1;
+}
+
+int return0_plugin_on_polling(void *plugin_env)
+{
+ struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env;
+ env->return0_polling_called+=1;
+ return 0;
+}
+
+TEST(plugin_manager, basic_polling_plugins) {
+
+// pesudo init stage
+ struct stellar st={0};
+ struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL);
+ st.plug_mgr=plug_mgr;
+ struct polling_plugin_env env;
+ memset(&env, 0, sizeof(struct polling_plugin_env));
+ env.plug_mgr=plug_mgr;
+ whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr);
+
+// plugin manager register plugin
+ int plugin_id = stellar_polling_plugin_register(&st, return0_plugin_on_polling, &env);
+ EXPECT_TRUE(plugin_id>=0);
+ plugin_id = stellar_polling_plugin_register(&st, return1_plugin_on_polling, &env);
+ EXPECT_TRUE(plugin_id>=0);
+
+// pesudo runtime stage
+
+ env.plug_mgr=plug_mgr;
+ env.N_polling=10;
+
+ for(int i=0; i<env.N_polling; i++)
+ {
+ EXPECT_EQ(plugin_manager_on_polling(plug_mgr), 1);
+ }
+
+// pesudo exit stage
+ plugin_manager_exit(plug_mgr);
+
+ EXPECT_EQ(env.return0_polling_called,env.N_polling);
+ EXPECT_EQ(env.return1_polling_called,env.N_polling);
+
+}
+
+
+/**********************************************
+ * GTEST MAIN *
+ **********************************************/
+
+int main(int argc, char ** argv)
+{
+ int ret=0;
+ ::testing::InitGoogleTest(&argc, argv);
+ ret=RUN_ALL_TESTS();
+ return ret;
+} \ No newline at end of file
diff --git a/src/plugin/test/plugin_manager_gtest_mock.h b/src/plugin/test/plugin_manager_gtest_mock.h
new file mode 100644
index 0000000..b25fb63
--- /dev/null
+++ b/src/plugin/test/plugin_manager_gtest_mock.h
@@ -0,0 +1,116 @@
+#pragma once
+
+#ifdef __cplusplus
+extern "C"
+{
+#endif
+
+#include "../plugin_manager_interna.h"
+
+#include "stellar/session.h"
+#include "tuple/tuple.h"
+
+//mock stellar
+struct stellar
+{
+ struct plugin_manager_schema *plug_mgr;
+};
+
+enum packet_type
+{
+ UNKNOWN,
+ IPv4,
+ IPv6,
+ UDP,
+ TCP,
+ TCP_STREAM,
+ CONTROL,
+};
+
+struct packet
+{
+ struct stellar *st;
+ enum packet_type type;
+ unsigned char ip_proto;
+};
+
+
+struct session
+{
+ struct plugin_manager_runtime *plug_mgr_rt;
+ enum session_type type;
+ enum session_state state;
+ int sess_pkt_cnt;
+};
+
+struct plugin_manager_schema * stellar_get_plugin_manager(struct stellar *st)
+{
+ return st->plug_mgr;
+}
+
+int stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *pm)
+{
+ st->plug_mgr=pm;
+ return 0;
+}
+
+int stellar_get_worker_thread_num(struct stellar *st)
+{
+ return 16;
+}
+
+uint16_t stellar_get_current_thread_index()
+{
+ return 0;
+}
+
+unsigned char packet_get_ip_protocol(struct packet *pkt)
+{
+ return pkt->ip_proto;
+}
+
+enum session_type session_get_type(const struct session *sess)
+{
+ return sess->type;
+
+}
+
+void session_set_user_data(struct session *sess, void *user_data)
+{
+ sess->plug_mgr_rt = (struct plugin_manager_runtime *)user_data;
+}
+
+void *session_get_user_data(const struct session *sess)
+{
+ return sess->plug_mgr_rt;
+}
+
+void *packet_get_user_data(const struct packet *pkt)
+{
+ return pkt->st->plug_mgr;
+}
+
+int packet_get_innermost_tuple6(const struct packet *pkt, struct tuple6 *tuple)
+{
+ tuple->ip_proto = pkt->ip_proto;
+ return 0;
+}
+
+uint8_t packet_is_ctrl(const struct packet *pkt)
+{
+ return 0;
+}
+
+struct tcp_segment *session_get_tcp_segment(struct session *sess)
+{
+ return NULL;
+}
+
+void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg)
+{
+ return;
+}
+
+#ifdef __cplusplus
+}
+#endif \ No newline at end of file
diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt
index 1185726..bfd8f65 100644
--- a/src/stellar/CMakeLists.txt
+++ b/src/stellar/CMakeLists.txt
@@ -1,16 +1,17 @@
set(SOURCE stellar_config.cpp stellar_stat.cpp stellar_core.cpp)
-set(LIBRARY times plugin_manager session_manager ip_reassembly packet_io pthread fieldstat4 toml)
+set(LIBRARY times session_manager plugin_manager ip_reassembly packet_io packet pthread fieldstat4 toml)
add_library(stellar_core STATIC ${SOURCE})
-target_link_libraries(stellar_core ${LIBRARY})
+target_link_libraries(stellar_core PRIVATE ${LIBRARY})
add_library(stellar_devel SHARED ${SOURCE})
-target_link_libraries(stellar_devel ${LIBRARY})
+#target_link_libraries(stellar_devel ${LIBRARY})
set_target_properties(stellar_devel PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map")
+target_link_libraries(stellar_devel PRIVATE -Wl,--whole-archive ${LIBRARY} -Wl,--no-whole-archive)
add_executable(stellar main.cpp)
-target_link_libraries(stellar stellar_core)
-target_link_libraries(stellar "-rdynamic")
+target_link_libraries(stellar PRIVATE -Wl,--whole-archive stellar_core ${LIBRARY} -Wl,--no-whole-archive)
+target_link_libraries(stellar PRIVATE "-rdynamic")
set_target_properties(stellar PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map")
install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT PROGRAM)
diff --git a/src/stellar/stellar_core.cpp b/src/stellar/stellar_core.cpp
index 7af49d9..f1ed79a 100644
--- a/src/stellar/stellar_core.cpp
+++ b/src/stellar/stellar_core.cpp
@@ -101,6 +101,7 @@ static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
+ plugin_manager_on_session_closing(sess);
plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
@@ -121,6 +122,7 @@ static inline void free_expired_sessions(struct session_manager *sess_mgr, uint6
if (sess)
{
plugin_ctx = session_get_user_data(sess);
+ plugin_manager_on_session_closing(sess);
plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx);
session_manager_free_session(sess_mgr, sess);
}
@@ -164,6 +166,11 @@ static void *work_thread(void *arg)
memset(packets, 0, sizeof(packets));
+ for(int i=0; i<RX_BURST_MAX; i++)
+ {
+ packet_set_user_data(&packets[i], (void *)plug_mgr);
+ }
+
snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx);
prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL);
@@ -200,7 +207,7 @@ static void *work_thread(void *arg)
defraged_pkt = NULL;
pkt = &packets[i];
- plugin_manager_on_packet(plug_mgr, pkt);
+ plugin_manager_on_packet_ingress(plug_mgr, pkt);
if (packet_is_fragment(pkt))
{
defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms);
@@ -211,10 +218,12 @@ static void *work_thread(void *arg)
else
{
pkt = defraged_pkt;
- plugin_manager_on_packet(plug_mgr, pkt);
+ plugin_manager_on_packet_ingress(plug_mgr, pkt);
+ plugin_manager_on_packet_egress(plug_mgr, pkt);
}
}
+ pkt = &packets[i];
sess = session_manager_lookup_session_by_packet(sess_mgr, pkt);
if (sess == NULL)
{
@@ -241,6 +250,8 @@ static void *work_thread(void *arg)
fast_path:
plugin_manager_on_session_egress(sess, pkt);
+ plugin_manager_on_packet_egress(plug_mgr, pkt);
+
if (sess && session_get_current_state(sess) == SESSION_STATE_DISCARD)
{
packet_set_action(pkt, PACKET_ACTION_DROP);
@@ -567,4 +578,9 @@ void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt)
{
packet_io_inject(packet_io, thr_idx, pkt, 1);
}
+}
+
+int stellar_get_worker_thread_num(struct stellar *st)
+{
+ return st->config.pkt_io_opts.nr_threads;
} \ No newline at end of file
diff --git a/src/stellar/stellar_core.h b/src/stellar/stellar_core.h
index 2291203..788ef1d 100644
--- a/src/stellar/stellar_core.h
+++ b/src/stellar/stellar_core.h
@@ -15,7 +15,6 @@ void stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema
// only send user crafted packet, can't send packet which come from network
void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt);
-int stellar_run(int argc, char **argv);
#ifdef __cplusplus
}
diff --git a/src/stellar/version.map b/src/stellar/version.map
index 6581205..a46e8f1 100644
--- a/src/stellar/version.map
+++ b/src/stellar/version.map
@@ -26,10 +26,10 @@ global:
session_exdata_set;
session_exdata_get;
- stellar_session_mq_create_topic;
- stellar_session_mq_get_topic_id;
- stellar_session_mq_update_topic;
- stellar_session_mq_destroy_topic;
+ stellar_mq_create_topic;
+ stellar_mq_get_topic_id;
+ stellar_mq_update_topic;
+ stellar_mq_destroy_topic;
stellar_session_mq_subscribe;
session_mq_publish_message;
session_mq_ignore_message;
diff --git a/test/packet_inject/CMakeLists.txt b/test/packet_inject/CMakeLists.txt
index 61fc165..3bfb3c2 100644
--- a/test/packet_inject/CMakeLists.txt
+++ b/test/packet_inject/CMakeLists.txt
@@ -1,14 +1,15 @@
# build libpacket_inject.so
add_library(packet_inject SHARED packet_inject.cpp)
-target_link_libraries(packet_inject stellar_devel toml)
+target_link_libraries(packet_inject toml)
target_include_directories(packet_inject PUBLIC ${CMAKE_SOURCE_DIR}/include/)
set_target_properties(packet_inject PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map")
# build gtest
function(packet_inject_add_case EXEC_NAME)
add_executable(${EXEC_NAME} ${EXEC_NAME}.cpp)
- target_link_libraries(${EXEC_NAME} "-rdynamic")
- target_link_libraries(${EXEC_NAME} stellar_devel gtest)
+ target_include_directories(${EXEC_NAME} PUBLIC ${CMAKE_SOURCE_DIR}/include/)
+ target_link_libraries(${EXEC_NAME} PRIVATE "-rdynamic")
+ target_link_libraries(${EXEC_NAME} PRIVATE stellar_devel gtest)
gtest_discover_tests(${EXEC_NAME})
endfunction()
diff --git a/test/packet_inject/packet_inject.cpp b/test/packet_inject/packet_inject.cpp
index 193cbd5..0ffdc61 100644
--- a/test/packet_inject/packet_inject.cpp
+++ b/test/packet_inject/packet_inject.cpp
@@ -5,8 +5,10 @@
#include <arpa/inet.h>
#include "toml.h"
+#include "stellar/stellar.h"
#include "stellar/layer.h"
-#include "stellar/session_mq.h"
+#include "stellar/session.h"
+#include "stellar/stellar_mq.h"
#define LOG_ERR(fmt, ...) printf("ERROR [packet inject] " fmt, ##__VA_ARGS__)
#define LOG_INFO(fmt, ...) printf("INFO [packet inject] " fmt, ##__VA_ARGS__)
@@ -545,8 +547,8 @@ extern "C"
ctx->st = st;
ctx->sess_plug_id = stellar_session_plugin_register(st, on_sess_new, on_sess_free, ctx);
- ctx->tcp_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP);
- ctx->udp_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_UDP);
+ ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP);
+ ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP);
stellar_session_mq_subscribe(st, ctx->tcp_topic_id, on_sess_msg, ctx->sess_plug_id);
stellar_session_mq_subscribe(st, ctx->udp_topic_id, on_sess_msg, ctx->sess_plug_id);