diff options
| author | yangwei <[email protected]> | 2024-08-06 20:37:59 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-08-12 15:45:50 +0800 |
| commit | 6786372449b70caaaf6824e1ed0e59800659b2d6 (patch) | |
| tree | 869aaa46825ce60d224338f8040f1a1b35f5df00 | |
| parent | ee695957205fceb7990d123f6cccf7de3e58c12c (diff) | |
✨ feat(plugin manager integration): packet and session exdata&mq
27 files changed, 3437 insertions, 507 deletions
diff --git a/deps/bitmap/bitmap.c b/deps/bitmap/bitmap.c index 59fb4b7..068bc3f 100644 --- a/deps/bitmap/bitmap.c +++ b/deps/bitmap/bitmap.c @@ -46,8 +46,21 @@ void bitmap_free(struct bitmap *bmp) { } } - - +int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length) { + if (x < 0 || y < 0 || x >= bmp->width || y >= bmp->height) { + return -1; // Return error code if coordinates are out of bounds + } + int idx = y * bmp->width + x; + if (idx + length > bmp->width * bmp->height) { + return -1; // Return error if range exceeds bitmap bounds + } + for (int i = 0; i < length; i++) { + if (bmp->data[(idx + i) / 8] & (1 << ((idx + i) % 8))) { + return 0; // Return 0 if any bit is not zero + } + } + return 1; // Return 1 if all bits are zero +} int test_bitmap() { struct bitmap *bmp = bitmap_new(10, 5, 1); // Create a 10x5 bitmap diff --git a/deps/bitmap/bitmap.h b/deps/bitmap/bitmap.h index 210ea35..5fbf469 100644 --- a/deps/bitmap/bitmap.h +++ b/deps/bitmap/bitmap.h @@ -1,5 +1,9 @@ +#pragma once + struct bitmap; struct bitmap * bitmap_new(int width, int height, int value); int bitmap_set(struct bitmap *bmp, int x, int y, int value); int bitmap_get(struct bitmap *bmp, int x, int y); -void bitmap_free(struct bitmap *bmp);
\ No newline at end of file +void bitmap_free(struct bitmap *bmp); + +int bitmap_is_all_zero(struct bitmap *bmp, int x, int y, int length); diff --git a/include/CMakeLists.txt b/include/CMakeLists.txt index 2e5b316..12742bd 100644 --- a/include/CMakeLists.txt +++ b/include/CMakeLists.txt @@ -4,5 +4,5 @@ install(FILES stellar/tunnel.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/packet.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/session.h DESTINATION include/stellar/ COMPONENT LIBRARIES) install(FILES stellar/stellar.h DESTINATION include/stellar/ COMPONENT LIBRARIES) -install(FILES stellar/session_mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES) -install(FILES stellar/session_exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
\ No newline at end of file +install(FILES stellar/stellar_mq.h DESTINATION include/stellar/ COMPONENT LIBRARIES) +install(FILES stellar/stellar_exdata.h DESTINATION include/stellar/ COMPONENT LIBRARIES)
\ No newline at end of file diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h deleted file mode 100644 index a10139e..0000000 --- a/include/stellar/session_exdata.h +++ /dev/null @@ -1,17 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar.h" - -typedef void session_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg); -int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *arg); -int session_exdata_set(struct session *sess, int idx, void *ex_ptr); -void *session_exdata_get(struct session *sess, int idx); - -#ifdef __cplusplus -} -#endif
\ No newline at end of file diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h deleted file mode 100644 index 2ad9f9f..0000000 --- a/include/stellar/session_mq.h +++ /dev/null @@ -1,33 +0,0 @@ -#pragma once - -#ifdef __cplusplus -extern "C" -{ -#endif - -#include "stellar.h" - -//session mq -typedef void msg_free_cb_func(void *msg, void *msg_free_arg); -typedef void on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env); - -//return topic_id -int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name); - -int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg); - -int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id); - -//return 0 if success, otherwise return -1. -int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id); - -int session_mq_publish_message(struct session *sess, int topic_id, void *msg); - -int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); -int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); - -#ifdef __cplusplus -} -#endif
\ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h index 3237455..9af384c 100644 --- a/include/stellar/stellar.h +++ b/include/stellar/stellar.h @@ -6,7 +6,6 @@ extern "C" #endif #include <stdint.h> -#include "stellar/session.h" struct session; struct stellar; @@ -48,6 +47,9 @@ typedef int plugin_on_polling_func(void *plugin_env); //return polling plugin_id int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env); +void stellar_emit_datapath_telemetry(struct packet *pkt, const char * module, const char *str); + +int stellar_get_worker_thread_num(struct stellar *st); uint16_t stellar_get_current_thread_index(); // only send user crafted packet, can't send packet which come from network void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt); diff --git a/include/stellar/stellar_exdata.h b/include/stellar/stellar_exdata.h new file mode 100644 index 0000000..605cb12 --- /dev/null +++ b/include/stellar/stellar_exdata.h @@ -0,0 +1,23 @@ +#pragma once + +#include "stellar.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +typedef void stellar_exdata_free(int idx, void *ex_ptr, void *arg); +int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *arg); + +//packet exdata api +int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr); +void *packet_exdata_get(struct packet *pkt, int idx); + +//session exdata api +int session_exdata_set(struct session *sess, int idx, void *ex_ptr); +void *session_exdata_get(struct session *sess, int idx); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/include/stellar/stellar_mq.h b/include/stellar/stellar_mq.h new file mode 100644 index 0000000..9267868 --- /dev/null +++ b/include/stellar/stellar_mq.h @@ -0,0 +1,51 @@ +#pragma once +#include "stellar.h" + +#ifdef __cplusplus +extern "C" +{ +#endif + +//topic api +typedef void stellar_msg_free_cb_func(void *msg, void *msg_free_arg); + +//return topic_id +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name); +int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg); +int stellar_mq_destroy_topic(struct stellar *st, int topic_id); + + +enum stellar_mq_priority +{ + STELLAR_MQ_PRIORITY_LOW, + STELLAR_MQ_PRIORITY_NORMAL, + STELLAR_MQ_PRIORITY_HIGH, + STELLAR_MQ_PRIORITY_MAX, +}; + +//session mq api +typedef void on_session_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env); + +//return 0 if success, otherwise return -1. +int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id); +int session_mq_publish_message(struct session *sess, int topic_id, void *msg); +int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *msg, enum stellar_mq_priority priority); + +int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id); +int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_id); + +int session_mq_topic_is_active(struct session *sess, int topic_id); + + +//packet mq api + +typedef void on_packet_msg_cb_func(struct packet *pkt, int topic_id, const void *msg, void *plugin_env); +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id); //packet plugin only +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *msg); +int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *msg, enum stellar_mq_priority priority); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/include/stellar/utils.h b/include/stellar/utils.h index fb1dc5b..d6ed97e 100644 --- a/include/stellar/utils.h +++ b/include/stellar/utils.h @@ -31,6 +31,14 @@ (type *)( (char *)__mptr - offsetof(type,member) );}) #endif +#ifndef likely +#define likely(x) __builtin_expect((x),1) +#endif /* likely */ + +#ifndef unlikely +#define unlikely(x) __builtin_expect((x),0) +#endif /* unlikely */ + #ifndef __unused #define __unused __attribute__((__unused__)) #endif diff --git a/src/id_generator/CMakeLists.txt b/src/id_generator/CMakeLists.txt index b18e1d8..4f05afe 100644 --- a/src/id_generator/CMakeLists.txt +++ b/src/id_generator/CMakeLists.txt @@ -1,4 +1,5 @@ add_library(id_generator id_generator.cpp) target_include_directories(id_generator PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_include_directories(id_generator PUBLIC ${CMAKE_SOURCE_DIR}/include/stellar) target_include_directories(id_generator PUBLIC ${CMAKE_SOURCE_DIR}/src/stellar) -target_link_libraries(id_generator log stellar_core)
\ No newline at end of file +target_link_libraries(id_generator PRIVATE stellar_core log)
\ No newline at end of file diff --git a/src/id_generator/id_generator.cpp b/src/id_generator/id_generator.cpp index ec6e245..2c7e9f8 100644 --- a/src/id_generator/id_generator.cpp +++ b/src/id_generator/id_generator.cpp @@ -3,7 +3,7 @@ #include "log.h" #include "macro.h" -#include "stellar_core.h" +#include "stellar.h" #include "id_generator.h" #define ID_GENERATOR_LOG_ERROR(format, ...) LOG_ERROR("id generator", format, ##__VA_ARGS__) @@ -66,7 +66,7 @@ uint64_t id_generator_alloc(uint64_t now_sec) #define MAX_ID_PER_THREAD (32768) #define MAX_ID_BASE_TIME (268435456L) - uint64_t thr_idx = stellar_get_current_thread_index(); + uint64_t thr_idx = (uint16_t)stellar_get_current_thread_index(); uint64_t global_id = 0; uint64_t id_per_thread = (global_id_generator.thread_volatile[thr_idx]++) % MAX_ID_PER_THREAD; diff --git a/src/packet/packet_def.h b/src/packet/packet_def.h index 9c4b977..cc65e19 100644 --- a/src/packet/packet_def.h +++ b/src/packet/packet_def.h @@ -44,6 +44,7 @@ struct raw_layer struct packet { + void * user_data; struct raw_layer layers[PACKET_MAX_LAYERS]; struct raw_layer *frag_layer; // fragment layer int8_t layers_used; diff --git a/src/packet/packet_utils.cpp b/src/packet/packet_utils.cpp index b2be075..873b224 100644 --- a/src/packet/packet_utils.cpp +++ b/src/packet/packet_utils.cpp @@ -546,3 +546,13 @@ void layer_convert(const struct raw_layer *in, struct layer *out) out->hdr_len = in->hdr_len; out->hdr.raw = (char *)in->hdr_ptr; } + +void packet_set_user_data(struct packet *pkt, void *data) +{ + pkt->user_data=data; +} + +void *packet_get_user_data(struct packet *pkt) +{ + return pkt->user_data; +}
\ No newline at end of file diff --git a/src/packet/packet_utils.h b/src/packet/packet_utils.h index bdafbc8..fcad090 100644 --- a/src/packet/packet_utils.h +++ b/src/packet/packet_utils.h @@ -36,6 +36,9 @@ void packet_set_action(struct packet *pkt, enum packet_action action); enum packet_action packet_get_action(const struct packet *pkt); +void *packet_get_user_data(struct packet *pkt); +void packet_set_user_data(struct packet *pkt, void *data); + /****************************************************************************** * tuple uitls ******************************************************************************/ diff --git a/src/plugin/CMakeLists.txt b/src/plugin/CMakeLists.txt index 23c254b..3c08c28 100644 --- a/src/plugin/CMakeLists.txt +++ b/src/plugin/CMakeLists.txt @@ -1,4 +1,8 @@ add_library(plugin_manager plugin_manager.cpp) target_include_directories(plugin_manager PUBLIC ${CMAKE_CURRENT_LIST_DIR}) +target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/include/) +target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/src/) target_include_directories(plugin_manager PUBLIC ${CMAKE_SOURCE_DIR}/deps/) -target_link_libraries(plugin_manager bitmap toml session_manager stellar_core ${CMAKE_DL_LIBS})
\ No newline at end of file +target_link_libraries(plugin_manager bitmap toml ${CMAKE_DL_LIBS}) + +add_subdirectory(test)
\ No newline at end of file diff --git a/src/plugin/plugin_manager.cpp b/src/plugin/plugin_manager.cpp index 0a31e5b..96e1cc0 100644 --- a/src/plugin/plugin_manager.cpp +++ b/src/plugin/plugin_manager.cpp @@ -1,145 +1,17 @@ -#include <assert.h> -#include "plugin_manager.h" -#include "session_utils.h" -#include "packet_utils.h" -#include "stellar_core.h" +#include "plugin_manager_interna.h" +#include "stellar/session.h" #include "stellar/utils.h" -#include "stellar/session_exdata.h" -#include "stellar/session_mq.h" -#include "tcp_reassembly.h" - -extern "C" -{ - #include "uthash/utlist.h" - #include "uthash/utarray.h" - #include "bitmap/bitmap.h" -} - -struct plugin_manager_schema -{ - struct stellar *st; - UT_array *session_exdata_schema_array; - UT_array *plugin_load_specs_array; - UT_array *session_mq_schema_array; - UT_array *registered_session_plugin_array; - UT_array *registered_packet_plugin_array; - UT_array *registered_polling_plugin_array; - int topic_num; - int subscriber_num; - int tcp_topic_id; - int udp_topic_id; - int tcp_stream_topic_id; - int egress_topic_id; - int control_packet_topic_id; -}; - - -struct session_exdata_schema -{ - char *name; - session_exdata_free *free_func; - void *free_arg; - int idx; -}; - -struct session_message -{ - int topic_id; - void *msg_data; - struct session_message *next, *prev; -}; - -typedef struct session_mq_subscriber -{ - int topic_subscriber_idx; - int session_plugin_id; - on_msg_cb_func *msg_cb; - struct session_mq_subscriber *next, *prev; -}session_mq_subscribers; - -struct session_mq_topic_schema -{ - char *topic_name; - msg_free_cb_func *free_cb; - void *free_cb_arg; - int topic_id; - int subscriber_cnt; - struct session_mq_subscriber *subscribers; -}; - -enum plugin_ctx_state -{ INIT, ACTIVE, EXIT }; - -struct session_plugin_ctx_runtime -{ - enum plugin_ctx_state state; - int session_plugin_id; - void *plugin_ctx; -}; - -struct plugin_exdata -{ - void *exdata; -}; - -struct plugin_manager_runtime -{ - struct plugin_manager_schema *plug_mgr; - struct session *sess; - struct session_message *pending_mq;// message list - struct session_message *delivered_mq;// message list - struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber - struct plugin_exdata *plugin_exdata_array; - struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free - int current_session_plugin_id; -}; - -struct registered_packet_plugin_schema -{ - char ip_protocol; - plugin_on_packet_func *on_packet; - void *plugin_env; -}; - -struct registered_polling_plugin_schema -{ - plugin_on_polling_func *on_polling; - void *plugin_env; -}; - -struct session_mq_subscriber_info -{ - int topic_id; - int subscriber_idx; -}; - -struct registered_session_plugin_schema -{ - session_ctx_new_func *on_ctx_new; - session_ctx_free_func *on_ctx_free; - void *plugin_env; - UT_array *registed_session_mq_subscriber_info; -}; - -#define PACKET_PULGIN_ID_BASE 0x10000 -#define POLLING_PULGIN_ID_BASE 0x20000 - -/******************************* - * PLUGIN MANAGER INIT & EXIT * - *******************************/ - -#include <dlfcn.h> #include "toml/toml.h" +#include "uthash/utlist.h" -struct plugin_specific -{ - char plugin_name[256]; - plugin_on_load_func *load_cb; - plugin_on_unload_func *unload_cb; - void *plugin_ctx; -}; +#include "stellar/stellar_core.h" +#include "session/session_utils.h" +#include "tuple/tuple.h" +#include "packet/packet_utils.h" +UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; +// TODO: set scratch_sess to per_thread_data thread_local struct session *per_thread_scratch_sess; inline static void plugin_manager_scratch_session_set(struct session *sess) @@ -152,7 +24,6 @@ inline static struct session *plugin_manager_scratch_session_get() return per_thread_scratch_sess; } -UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int *spec_num) { @@ -169,7 +40,7 @@ static struct plugin_specific *plugin_specs_load(const char *toml_conf_path, int toml_array_t* plugin_array = toml_array_in(conf, "plugin"); if(plugin_array==NULL)return NULL; *spec_num = toml_array_nelem(plugin_array); - struct plugin_specific* plugins = ALLOC(struct plugin_specific, *spec_num); + struct plugin_specific* plugins = CALLOC(struct plugin_specific, *spec_num); for (int i = 0; i < *spec_num; i++) { toml_table_t* plugin = toml_table_at(plugin_array, i); @@ -212,6 +83,28 @@ PLUGIN_SPEC_LOAD_ERROR: return NULL; } +static struct plugin_manger_per_thread_data *plugin_manager_per_thread_data_new(struct stellar *st) +{ + if(st == NULL)return NULL; + int thread_num=stellar_get_worker_thread_num(st); + struct plugin_manger_per_thread_data *per_thread_data = CALLOC(struct plugin_manger_per_thread_data, thread_num); + return per_thread_data; +} + +static void plugin_manager_per_thread_data_free(struct plugin_manger_per_thread_data *per_thread_data, struct stellar *st) +{ + if(per_thread_data == NULL || st == NULL)return; + int thread_num=stellar_get_worker_thread_num(st); + struct plugin_manger_per_thread_data *p_data; + for (int i = 0; i < thread_num; i++) + { + p_data=per_thread_data+i; + if(p_data->per_thread_pkt_exdata_array.exdata_array)FREE(p_data->per_thread_pkt_exdata_array.exdata_array); + } + FREE(per_thread_data); + return; +} + static void tcp_stream_msg_free_fn(void *msg, void *msg_free_arg) { struct session *cur_sess = plugin_manager_scratch_session_get(); @@ -226,39 +119,39 @@ struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char { return NULL; } - struct plugin_manager_schema *pm = ALLOC(struct plugin_manager_schema, 1); + struct plugin_manager_schema *plug_mgr = CALLOC(struct plugin_manager_schema, 1); + plug_mgr->max_message_dispatch=MAX_MSG_PER_DISPATCH; if(spec_num > 0) { - utarray_new(pm->plugin_load_specs_array,&plugin_specs_icd); - utarray_reserve(pm->plugin_load_specs_array, spec_num); + utarray_new(plug_mgr->plugin_load_specs_array,&plugin_specs_icd); + utarray_reserve(plug_mgr->plugin_load_specs_array, spec_num); } - pm->st = st; - stellar_set_plugin_manger(st, pm); - pm->tcp_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP, NULL, NULL); - pm->tcp_stream_topic_id=stellar_session_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL); - pm->udp_topic_id=stellar_session_mq_create_topic(st, TOPIC_UDP, NULL, NULL); - pm->egress_topic_id=stellar_session_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); - pm->control_packet_topic_id=stellar_session_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); + plug_mgr->st = st; + stellar_set_plugin_manger(st, plug_mgr); + + + plug_mgr->tcp_topic_id=stellar_mq_create_topic(st, TOPIC_TCP, NULL, NULL); + plug_mgr->tcp_stream_topic_id=stellar_mq_create_topic(st, TOPIC_TCP_STREAM, tcp_stream_msg_free_fn, NULL); + plug_mgr->udp_topic_id=stellar_mq_create_topic(st, TOPIC_UDP, NULL, NULL); + plug_mgr->egress_topic_id=stellar_mq_create_topic(st, TOPIC_EGRESS, NULL, NULL); + plug_mgr->control_packet_topic_id=stellar_mq_create_topic(st, TOPIC_CONTROL_PACKET, NULL, NULL); for(int i = 0; i < spec_num; i++) { if (specs[i].load_cb != NULL) { specs[i].plugin_ctx=specs[i].load_cb(st); - utarray_push_back(pm->plugin_load_specs_array, &specs[i]); + utarray_push_back(plug_mgr->plugin_load_specs_array, &specs[i]); } } FREE(specs); - return pm; + plug_mgr->per_thread_data = plugin_manager_per_thread_data_new(st); + return plug_mgr; } void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) { - if (plug_mgr == NULL) - { - return; - } struct plugin_specific *p=NULL; if (plug_mgr->plugin_load_specs_array) { @@ -269,17 +162,25 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->plugin_load_specs_array); } - if(plug_mgr->session_mq_schema_array) + if(plug_mgr->stellar_mq_schema_array) { - for(unsigned int i = 0; i < utarray_len(plug_mgr->session_mq_schema_array); i++) + for(unsigned int i = 0; i < utarray_len(plug_mgr->stellar_mq_schema_array); i++) { - stellar_session_mq_destroy_topic(plug_mgr->st, i); + stellar_mq_destroy_topic( plug_mgr->st, i); } - utarray_free(plug_mgr->session_mq_schema_array); + utarray_free(plug_mgr->stellar_mq_schema_array); } - if(plug_mgr->session_exdata_schema_array)utarray_free(plug_mgr->session_exdata_schema_array); - if(plug_mgr->registered_packet_plugin_array)utarray_free(plug_mgr->registered_packet_plugin_array); + if(plug_mgr->stellar_exdata_schema_array)utarray_free(plug_mgr->stellar_exdata_schema_array); if(plug_mgr->registered_polling_plugin_array)utarray_free(plug_mgr->registered_polling_plugin_array); + if(plug_mgr->registered_packet_plugin_array) + { + struct registered_packet_plugin_schema *s = NULL; + while ((s = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, s))) + { + if(s->registed_packet_mq_subscriber_info)utarray_free(s->registed_packet_mq_subscriber_info); + } + utarray_free(plug_mgr->registered_packet_plugin_array); + } if(plug_mgr->registered_session_plugin_array) { struct registered_session_plugin_schema *s = NULL; @@ -289,94 +190,162 @@ void plugin_manager_exit(struct plugin_manager_schema *plug_mgr) } utarray_free(plug_mgr->registered_session_plugin_array); } + plugin_manager_per_thread_data_free(plug_mgr->per_thread_data, plug_mgr->st); FREE(plug_mgr); return; } - /******************************* - * SESSION EXDATA * + * STELLAR EXDATA * *******************************/ -static void session_exdata_met_copy(void *_dst, const void *_src) +static void stellar_exdata_met_copy(void *_dst, const void *_src) { - struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src; + struct stellar_exdata_schema *dst = (struct stellar_exdata_schema *)_dst, *src = (struct stellar_exdata_schema *)_src; dst->free_func = src->free_func; dst->free_arg = src->free_arg; dst->idx = src->idx; dst->name = src->name ? strdup(src->name) : NULL; } -static void session_exdata_met_dtor(void *_elt) +static void stellar_exdata_met_dtor(void *_elt) { - struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt; + struct stellar_exdata_schema *elt = (struct stellar_exdata_schema *)_elt; if (elt->name) FREE(elt->name); } -UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor}; +UT_icd stellar_exdata_meta_icd = {sizeof(struct stellar_exdata_schema), NULL, stellar_exdata_met_copy, stellar_exdata_met_dtor}; - -int stellar_session_exdata_new_index(struct stellar *st, const char *name, session_exdata_free *free_func,void *free_arg) +int stellar_exdata_new_index(struct stellar *st, const char *name, stellar_exdata_free *free_func,void *free_arg) { + if(st==NULL || name==NULL)return -1; struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->session_exdata_schema_array == NULL) + if(plug_mgr->stellar_exdata_schema_array == NULL) { - utarray_new(plug_mgr->session_exdata_schema_array, &session_exdata_meta_icd); + utarray_new(plug_mgr->stellar_exdata_schema_array, &stellar_exdata_meta_icd); } - if(plug_mgr->session_exdata_schema_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array); - struct session_exdata_schema *t_schema; + if(plug_mgr->stellar_exdata_schema_array == NULL)return -1; + unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array); + struct stellar_exdata_schema *t_schema; for(unsigned int i = 0; i < len; i++) { - t_schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i); + t_schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i); if(strcmp(t_schema->name, name) == 0) { + t_schema->free_func=free_func; + t_schema->free_arg=free_arg; return t_schema->idx; } } - struct session_exdata_schema new_schema; - memset(&new_schema, 0, sizeof(struct session_exdata_schema)); + struct stellar_exdata_schema new_schema; + memset(&new_schema, 0, sizeof(struct stellar_exdata_schema)); new_schema.free_func=free_func; new_schema.name=(char *)name; new_schema.idx=len; new_schema.free_arg=free_arg; - utarray_push_back(plug_mgr->session_exdata_schema_array, &new_schema); + utarray_push_back(plug_mgr->stellar_exdata_schema_array, &new_schema); return new_schema.idx; } -int session_exdata_set(struct session *sess, int idx, void *ex_ptr) +int stellar_exdata_set(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx, void *ex_ptr) { - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); - if(plug_mgr_rt == NULL)return -1; - if(plug_mgr_rt->plug_mgr->session_exdata_schema_array == NULL)return -1; - unsigned int len=utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array); + if(exdata_schema == NULL|| exdata_array == NULL)return -1; + unsigned int len=utarray_len(exdata_schema); if(len < (unsigned int)idx)return -1; - if(plug_mgr_rt->plugin_exdata_array==NULL)return -1; - (plug_mgr_rt->plugin_exdata_array+idx)->exdata=ex_ptr; + if((exdata_array+idx)->state == EXIT)return -1; + (exdata_array+idx)->exdata=ex_ptr; return 0; } -void *session_exdata_get(struct session *sess, int idx) +void *stellar_exdata_get(UT_array *exdata_schema, struct stellar_exdata *exdata_array, int idx) { - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); - if(plug_mgr_rt == NULL)return NULL; - if(plug_mgr_rt->plug_mgr->session_exdata_schema_array==NULL)return NULL; - unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_exdata_schema_array); + if(exdata_schema == NULL|| exdata_array == NULL)return NULL; + unsigned int len = utarray_len(exdata_schema); if(len < (unsigned int)idx)return NULL; - return (plug_mgr_rt->plugin_exdata_array+idx)->exdata; + if((exdata_array+idx)->state == EXIT)return NULL; + return (exdata_array+idx)->exdata; } /******************************* - * SESSION MQ * + * PACKET EXDATA * + *******************************/ +static struct stellar_exdata *per_thread_packet_exdata_arrary_get(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return NULL; + int tid=stellar_get_current_thread_index(); + if(plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array == NULL) + { + unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array); + plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array = CALLOC(struct stellar_exdata, len); + } + return plug_mgr->per_thread_data[tid].per_thread_pkt_exdata_array.exdata_array; +} + +static void per_thread_packet_exdata_arrary_clean(struct plugin_manager_schema *plug_mgr) +{ + if(plug_mgr==NULL || plug_mgr->stellar_exdata_schema_array == NULL)return; + unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array); + struct stellar_exdata *per_thread_pkt_exdata_arrary = per_thread_packet_exdata_arrary_get(plug_mgr); + if(per_thread_pkt_exdata_arrary == NULL)return; + for (unsigned int i = 0; i < len; i++) + { + void *exdata = (per_thread_pkt_exdata_arrary + i)->exdata; + (per_thread_pkt_exdata_arrary + i)->state=EXIT; + struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i); + if (exdata) + { + if (schema->free_func) + { + schema->free_func(i, exdata, schema->free_arg); + } + (per_thread_pkt_exdata_arrary + i)->exdata=NULL; + } + (per_thread_pkt_exdata_arrary + i)->state=INIT; + } +} + +int packet_exdata_set(struct packet *pkt, int idx, void *ex_ptr) +{ + if(pkt == NULL)return -1; + struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); + return stellar_exdata_set(plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx, ex_ptr); +} + +void *packet_exdata_get(struct packet *pkt, int idx) +{ + if(pkt == NULL)return NULL; + struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); + return stellar_exdata_get( plug_mgr->stellar_exdata_schema_array, per_thread_packet_exdata_arrary_get(plug_mgr), idx); +} + +/******************************* + * SESSION EXDATA * *******************************/ +int session_exdata_set(struct session *sess, int idx, void *ex_ptr) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt == NULL)return -1; + if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array == NULL)return -1; + return stellar_exdata_set(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx, ex_ptr); +} +void *session_exdata_get(struct session *sess, int idx) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt == NULL)return NULL; + if(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array==NULL)return NULL; + return stellar_exdata_get(plug_mgr_rt->plug_mgr->stellar_exdata_schema_array, plug_mgr_rt->sess_exdata_array, idx); +} -static void session_mq_topic_schema_copy(void *_dst, const void *_src) +/******************************* + * STELLAR MQ * + *******************************/ +static void stellar_mq_topic_schema_copy(void *_dst, const void *_src) { - struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst, - *src = (struct session_mq_topic_schema *)_src; + struct stellar_mq_topic_schema *dst = (struct stellar_mq_topic_schema *)_dst, + *src = (struct stellar_mq_topic_schema *)_src; dst->subscribers = src->subscribers; dst->free_cb = src->free_cb; dst->free_cb_arg = src->free_cb_arg; @@ -385,36 +354,27 @@ static void session_mq_topic_schema_copy(void *_dst, const void *_src) dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; } -static void session_mq_topic_schema_dtor(void *_elt) +static void stellar_mq_topic_schema_dtor(void *_elt) { - struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt; + struct stellar_mq_topic_schema *elt = (struct stellar_mq_topic_schema *)_elt; if (elt->topic_name) FREE(elt->topic_name); // FREE(elt); // free the item } -UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor}; +UT_icd stellar_mq_topic_schema_icd = {sizeof(struct stellar_mq_topic_schema), NULL, stellar_mq_topic_schema_copy, stellar_mq_topic_schema_dtor}; -void session_mq_free(struct session_message *head) +int stellar_mq_get_topic_id(struct stellar *st, const char *topic_name) { - struct session_message *elt, *tmp; - DL_FOREACH_SAFE(head, elt, tmp) - { - DL_DELETE(head, elt); - FREE(elt); - } - FREE(head); -} + struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); + UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; -int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) -{ - struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st);; - if(topic_name == NULL || plug_mgr == NULL || plug_mgr->session_mq_schema_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); - struct session_mq_topic_schema *t_schema; + if(topic_name == NULL || mq_schema_array == NULL )return -1; + unsigned int len = utarray_len(mq_schema_array); + struct stellar_mq_topic_schema *t_schema; for(unsigned int i = 0; i < len; i++) { - t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, i); + t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, i); if(strcmp(t_schema->topic_name, topic_name) == 0) { return i; @@ -423,78 +383,338 @@ int stellar_session_mq_get_topic_id(struct stellar *st, const char *topic_name) return -1; } -int stellar_session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_update_topic(struct stellar *st, int topic_id, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->session_mq_schema_array == NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + UT_array *mq_schema_array=plug_mgr->stellar_mq_schema_array; + if(mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(mq_schema_array); if(len < (unsigned int)topic_id)return -1; - struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + struct stellar_mq_topic_schema *t_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, (unsigned int)topic_id); if(t_schema == NULL)return -1; t_schema->free_cb=msg_free_cb; t_schema->free_cb_arg=msg_free_arg; return 0; } -int stellar_session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *msg_free_cb, void *msg_free_arg) +int stellar_mq_create_topic(struct stellar *st, const char *topic_name, stellar_msg_free_cb_func *msg_free_cb, void *msg_free_arg) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->session_mq_schema_array == NULL) + if(plug_mgr->stellar_mq_schema_array == NULL) { - utarray_new(plug_mgr->session_mq_schema_array, &session_mq_topic_schema_icd); + utarray_new(plug_mgr->stellar_mq_schema_array, &stellar_mq_topic_schema_icd); } - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); - if(stellar_session_mq_get_topic_id(st, topic_name) >= 0) + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); + if(stellar_mq_get_topic_id(st, topic_name) >= 0) { return -1; } - struct session_mq_topic_schema t_schema; - memset(&t_schema, 0, sizeof(struct session_mq_topic_schema)); + struct stellar_mq_topic_schema t_schema; + memset(&t_schema, 0, sizeof(struct stellar_mq_topic_schema)); t_schema.free_cb=msg_free_cb; t_schema.topic_name=(char *)topic_name; t_schema.topic_id=len;//topid_id equals arrary index t_schema.free_cb_arg=msg_free_arg; t_schema.subscribers=NULL; t_schema.subscriber_cnt=0; - utarray_push_back(plug_mgr->session_mq_schema_array, &t_schema); - plug_mgr->topic_num+=1; + utarray_push_back(plug_mgr->stellar_mq_schema_array, &t_schema); + plug_mgr->stellar_mq_topic_num+=1; return t_schema.topic_id; } -int stellar_session_mq_destroy_topic(struct stellar *st, int topic_id) +int stellar_mq_destroy_topic(struct stellar *st, int topic_id) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->session_mq_schema_array==NULL)return 0; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); + if(plug_mgr->stellar_mq_schema_array==NULL)return -1; + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); if (len <= (unsigned int)topic_id) return -1; - struct session_mq_topic_schema *topic = - (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); - struct session_mq_subscriber *sub_elt, *sub_tmp; + struct stellar_mq_topic_schema *topic = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + + if(topic == NULL)return -1; + + if (topic->is_destroyed == 1)return 0; + + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + DL_DELETE(topic->subscribers, sub_elt); + FREE(sub_elt); + } + topic->is_destroyed = 1; + plug_mgr->stellar_mq_topic_num-=1; + return 1; // success +} + +static int stellar_mq_publish_message(enum stellar_topic_type type, int topic_id, void *data, UT_array *stellar_mq_schema, struct stellar_message *priority_mq[], enum stellar_mq_priority priority) +{ + if(stellar_mq_schema==NULL || topic_id < 0)return -1; + unsigned int len = utarray_len(stellar_mq_schema); + if (len <= (unsigned int)topic_id)return -1; + struct stellar_message *msg= CALLOC(struct stellar_message,1); + msg->header.topic_id = topic_id; + msg->header.type=type; + msg->header.priority = priority; + msg->body = data; + DL_APPEND(priority_mq[priority], msg); + return 0; +} + +UT_icd stellar_mq_subscriber_info_icd = {sizeof(struct stellar_mq_subscriber_info), NULL, NULL, NULL}; + +static int stellar_mq_subscribe(struct plugin_manager_schema *plug_mgr, int topic_id, void *plugin_on_msg_cb, int plugin_idx, UT_array *registed_mq_subscriber_info) +{ + if(plug_mgr == NULL || plug_mgr->stellar_mq_schema_array==NULL || registed_mq_subscriber_info == NULL)return -1; + + unsigned int len = utarray_len(plug_mgr->stellar_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + if(topic==NULL)return -1; + + // if plugin already subscribe current topic, return 0 + struct stellar_mq_subscriber_info *p=NULL; + while( (p=(struct stellar_mq_subscriber_info *)utarray_next(registed_mq_subscriber_info,p))) + { + if(p->topic_id==topic_id) + { + struct stellar_mq_subscriber *tmp_subscriber=topic->subscribers; + int cnt=0; + while(tmp_subscriber) + { + if(cnt==p->subscriber_idx) + { + tmp_subscriber->msg_cb=plugin_on_msg_cb; + return 0; + } + cnt++; + tmp_subscriber=tmp_subscriber->next; + } + } + }; + + struct stellar_mq_subscriber *new_subscriber = CALLOC(struct stellar_mq_subscriber,1); + new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; + new_subscriber->plugin_idx = plugin_idx; + new_subscriber->msg_cb = plugin_on_msg_cb; + DL_APPEND(topic->subscribers, new_subscriber); + + struct stellar_mq_subscriber_info sub_info; + memset(&sub_info, 0, sizeof(struct stellar_mq_subscriber_info)); + sub_info.topic_id=topic_id; + sub_info.subscriber_idx=topic->subscriber_cnt; + utarray_push_back(registed_mq_subscriber_info, &sub_info); + topic->subscriber_cnt+=1; + plug_mgr->session_topic_subscriber_num+=1; + return 0; +} + +static void stellar_mq_dispatch_one_session_message(struct session *sess, struct stellar_message *mq_elt) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct registered_session_plugin_schema *session_plugin_schema; + struct session_plugin_ctx_runtime *plugin_ctx_rt; + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, + (unsigned int)(mq_elt->header.topic_id)); if (topic) { + int cur_sub_idx = 0; DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) { - DL_DELETE(topic->subscribers, sub_elt); - FREE(sub_elt); + plug_mgr_rt->current_session_plugin_id = sub_elt->plugin_idx; + if (bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != 0) + { + plugin_ctx_rt = (plug_mgr_rt->plugin_ctx_array + sub_elt->plugin_idx); + session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr( + plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (session_plugin_schema) + { + if (plugin_ctx_rt->state == INIT) + { + if (session_plugin_schema->on_ctx_new) + { + plugin_ctx_rt->plugin_ctx = + session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); + if (plugin_ctx_rt->state == EXIT && session_plugin_schema->on_ctx_free) + { + session_plugin_schema->on_ctx_free(sess, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + plugin_ctx_rt->plugin_ctx = NULL; + } + else + { + plugin_ctx_rt->state = ACTIVE; + } + } + } + if (sub_elt->sess_msg_cb && + bitmap_get(plug_mgr_rt->session_mq_status, cur_sub_idx, mq_elt->header.topic_id) != + 0) // ctx_new maybe call detach, need check again + { + sub_elt->sess_msg_cb(sess, mq_elt->header.topic_id, mq_elt->body, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + } + } + } + cur_sub_idx++; } + if (cur_sub_idx == 0) + bitmap_set(plug_mgr_rt->session_topic_status, 0, mq_elt->header.topic_id, 0); } - return 0; // success +} + +static void stellar_mq_dispatch_one_packet_message(struct packet *pkt, struct stellar_message *mq_elt) +{ + struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); + struct stellar_mq_subscriber *sub_elt, *sub_tmp; + struct registered_packet_plugin_schema *packet_plugin_schema; + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, + (unsigned int)(mq_elt->header.topic_id)); + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + if (sub_elt->pkt_msg_cb) + { + packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr( + plug_mgr->registered_packet_plugin_array, (unsigned int)sub_elt->plugin_idx); + if (packet_plugin_schema) + { + sub_elt->pkt_msg_cb(pkt, mq_elt->header.topic_id, mq_elt->body, packet_plugin_schema->plugin_env); + } + } + } + } +} + +static void stellar_mq_dispatch(struct stellar_message *priority_mq[], struct stellar_message ** dealth_letter_queue, struct session *sess, struct packet *pkt) +{ + struct stellar_message *mq_elt=NULL, *mq_tmp=NULL; + int cur_priority = STELLAR_MQ_PRIORITY_HIGH; + while(cur_priority >= STELLAR_MQ_PRIORITY_LOW) + { + if(priority_mq[cur_priority]==NULL) + { + cur_priority--; + continue; + } + DL_FOREACH_SAFE(priority_mq[cur_priority], mq_elt, mq_tmp) + { + if(mq_elt->header.type==ON_SESSION_TOPIC)stellar_mq_dispatch_one_session_message(sess, mq_elt); + if(mq_elt->header.type==ON_PACKET_TOPIC)stellar_mq_dispatch_one_packet_message(pkt, mq_elt); + DL_DELETE(priority_mq[mq_elt->header.priority], mq_elt); + DL_APPEND(*dealth_letter_queue, mq_elt); // move to dlq list + + cur_priority=STELLAR_MQ_PRIORITY_HIGH; + break; + } + } + return; +} + +static void stellar_mq_free(struct stellar_message **head, UT_array *mq_schema_array) +{ + struct stellar_message *mq_elt, *tmp; + struct stellar_mq_topic_schema *topic; + DL_FOREACH_SAFE(*head, mq_elt, tmp) + { + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(mq_schema_array, + (unsigned int)(mq_elt->header.topic_id)); + if (topic && topic->free_cb) + { + topic->free_cb(mq_elt->body, topic->free_cb_arg); + } + DL_DELETE(*head, mq_elt); + FREE(mq_elt); + } +} + +/******************************* + * PACKET MQ * + *******************************/ + +//return 0 if success, otherwise return -1. +int stellar_packet_mq_subscribe(struct stellar *st, int topic_id, on_packet_msg_cb_func *plugin_on_msg_cb, int plugin_id) +{ + if(plugin_id < PACKET_PULGIN_ID_BASE || plugin_id >= POLLING_PULGIN_ID_BASE)return -1;// ignore session or polling plugin + int plugin_idx=plugin_id-PACKET_PULGIN_ID_BASE; + + struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); + if(plug_mgr == NULL || plug_mgr->registered_packet_plugin_array == NULL)return -1; + + struct registered_packet_plugin_schema *packet_plugin_schema = (struct registered_packet_plugin_schema *)utarray_eltptr(plug_mgr->registered_packet_plugin_array, (unsigned)plugin_idx); + if(packet_plugin_schema==NULL)return -1; + + if(packet_plugin_schema->registed_packet_mq_subscriber_info==NULL) + { + utarray_new(packet_plugin_schema->registed_packet_mq_subscriber_info, &stellar_mq_subscriber_info_icd); + } + + return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_idx, packet_plugin_schema->registed_packet_mq_subscriber_info); +} + +int packet_mq_publish_message_with_priority(struct packet *pkt, int topic_id, void *data, enum stellar_mq_priority priority) +{ + struct plugin_manager_schema *plug_mgr = (struct plugin_manager_schema *)packet_get_user_data(pkt); + int tid = stellar_get_current_thread_index(); + if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt == -1)return -1; + if(plug_mgr->per_thread_data[tid].pub_packet_msg_cnt >= plug_mgr->max_message_dispatch)return -1; + if(stellar_mq_publish_message(ON_PACKET_TOPIC ,topic_id, data, plug_mgr->stellar_mq_schema_array, plug_mgr->per_thread_data[tid].priority_mq,priority)==0) + { + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt+=1; + return 0; + } + return -1; +} + +int packet_mq_publish_message(struct packet *pkt, int topic_id, void *data) +{ + return packet_mq_publish_message_with_priority(pkt, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL); +} + +/******************************* + * SESSION MQ * + *******************************/ + +int session_mq_publish_message_with_priority(struct session *sess, int topic_id, void *data, enum stellar_mq_priority priority) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + assert(plug_mgr_rt); + if(plug_mgr_rt->session_mq_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(plug_mgr_rt->pub_session_msg_cnt == -1)return -1; + if(plug_mgr_rt->pub_session_msg_cnt >= plug_mgr_rt->plug_mgr->max_message_dispatch)return -1; + int tid = stellar_get_current_thread_index(); + if(stellar_mq_publish_message(ON_SESSION_TOPIC ,topic_id, data, plug_mgr_rt->plug_mgr->stellar_mq_schema_array,plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq,priority)==0) + { + plug_mgr_rt->pub_session_msg_cnt+=1; + return 0; + } + return -1; } int session_mq_publish_message(struct session *sess, int topic_id, void *data) { - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); - if(plug_mgr_rt==NULL || topic_id < 0)return -1; - if(plug_mgr_rt->plug_mgr->session_mq_schema_array==NULL)return -1; - unsigned int len = utarray_len(plug_mgr_rt->plug_mgr->session_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - struct session_message *msg= ALLOC(struct session_message,1); - msg->topic_id = topic_id; - msg->msg_data = data; - DL_APPEND(plug_mgr_rt->pending_mq, msg); - return 0; + return session_mq_publish_message_with_priority(sess, topic_id, data, STELLAR_MQ_PRIORITY_NORMAL); +} + +static void session_mq_update_topic_status(struct plugin_manager_runtime *plug_mgr_rt, struct stellar_mq_topic_schema *topic) +{ + //update topic status + switch (bitmap_is_all_zero(plug_mgr_rt->session_mq_status, 0, topic->topic_id, topic->subscriber_cnt)) + { + case 1: + bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 0); + break; + case 0: + bitmap_set(plug_mgr_rt->session_topic_status, 0, topic->topic_id, 1); + break; + default: + break; + } + return; } static int session_mq_set_message_status(struct session *sess, int topic_id, int plugin_id, int bit_value) @@ -502,10 +722,10 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int if(bit_value!=0 && bit_value!=1)return -1; if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin if(topic_id < 0 || plugin_id < 0)return -1; - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); if(plug_mgr_rt==NULL)return -1; - if(topic_id >= plug_mgr_rt->plug_mgr->topic_num)return -1;// topic_id out of range - struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, (unsigned int)topic_id); + if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); if(topic==NULL)return -1; struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plugin_id); @@ -516,18 +736,18 @@ static int session_mq_set_message_status(struct session *sess, int topic_id, int { for(unsigned int i=0; i < plugin_subscriber_num; i++) { - struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); + struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); if(topic_id==session_plugin_sub_info->topic_id) { - bitmap_set(plug_mgr_rt->session_mq_status, topic_id, session_plugin_sub_info->subscriber_idx, bit_value); + bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx, topic_id, bit_value); } } + session_mq_update_topic_status(plug_mgr_rt, topic); return 0; } return -1; } - int session_mq_ignore_message(struct session *sess, int topic_id, int plugin_id) { return session_mq_set_message_status(sess, topic_id, plugin_id, 0); @@ -539,131 +759,63 @@ int session_mq_unignore_message(struct session *sess, int topic_id, int plugin_i return session_mq_set_message_status(sess, topic_id, plugin_id, 1); } -UT_icd session_mq_subscriber_info_icd = {sizeof(struct session_mq_subscriber_info), NULL, NULL, NULL}; - -int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_msg_cb_func *plugin_on_msg_cb, int plugin_id) +int stellar_session_mq_subscribe(struct stellar *st, int topic_id, on_session_msg_cb_func *plugin_on_msg_cb, int plugin_id) { - if(plugin_id >= PACKET_PULGIN_ID_BASE)return -1;// ignore packet plugin + if(plugin_id >= PACKET_PULGIN_ID_BASE || plugin_on_msg_cb == NULL)return -1;// ignore packet plugin struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); - if(plug_mgr->session_mq_schema_array==NULL)return -1; - unsigned int len = utarray_len(plug_mgr->session_mq_schema_array); - if (len <= (unsigned int)topic_id)return -1; - - struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id); - if(session_plugin_schema==NULL)return -1; - struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr->session_mq_schema_array, (unsigned int)topic_id); - if(topic==NULL)return -1; + if(plug_mgr == NULL || plug_mgr->registered_session_plugin_array == NULL)return -1; + struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr->registered_session_plugin_array, (unsigned)plugin_id); + if(session_plugin_schema==NULL)return -1; if(session_plugin_schema->registed_session_mq_subscriber_info==NULL) { - utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &session_mq_subscriber_info_icd); - } - - // if plugin already subscribe current topic, return 0 - struct session_mq_subscriber_info *p=NULL; - while( (p=(struct session_mq_subscriber_info *)utarray_next(session_plugin_schema->registed_session_mq_subscriber_info,p))) - { - if(p->topic_id==topic_id) - return 0; - }; - - struct session_mq_subscriber *new_subscriber = ALLOC(struct session_mq_subscriber,1); - new_subscriber->topic_subscriber_idx = topic->subscriber_cnt; - new_subscriber->session_plugin_id = plugin_id; - new_subscriber->msg_cb = plugin_on_msg_cb; - DL_APPEND(topic->subscribers, new_subscriber); - - struct session_mq_subscriber_info sub_info; - memset(&sub_info, 0, sizeof(struct session_mq_subscriber_info)); - sub_info.topic_id=topic_id; - sub_info.subscriber_idx=topic->subscriber_cnt; - utarray_push_back(session_plugin_schema->registed_session_mq_subscriber_info, &sub_info); - topic->subscriber_cnt+=1; - plug_mgr->subscriber_num+=1; - return 0; + utarray_new(session_plugin_schema->registed_session_mq_subscriber_info, &stellar_mq_subscriber_info_icd); + } + //session plugin id equals to plugin idx + return stellar_mq_subscribe(plug_mgr,topic_id, (void *)plugin_on_msg_cb, plugin_id, session_plugin_schema->registed_session_mq_subscriber_info); } -static void plugin_manager_session_message_dispatch(struct session *sess) +int session_mq_topic_is_active(struct session *sess, int topic_id) { - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); - if(plug_mgr_rt==NULL)return; - - struct session_message *mq_elt=NULL, *mq_tmp=NULL; - struct session_mq_subscriber *sub_elt, *sub_tmp; - struct session_mq_topic_schema *topic; - struct registered_session_plugin_schema *session_plugin_schema; - struct session_plugin_ctx_runtime *plugin_ctx_rt; - while (plug_mgr_rt->pending_mq != NULL) - { - DL_FOREACH_SAFE(plug_mgr_rt->pending_mq, mq_elt, mq_tmp) - { - topic = (struct session_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->session_mq_schema_array, - (unsigned int)(mq_elt->topic_id)); - if (topic) - { - int cur_sub_idx = 0; - DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) - { - if (bitmap_get(plug_mgr_rt->session_mq_status, mq_elt->topic_id, cur_sub_idx) != 0) - { - plugin_ctx_rt=(plug_mgr_rt->plugin_ctx_array+sub_elt->session_plugin_id); - session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)sub_elt->session_plugin_id); - if(plugin_ctx_rt->state==INIT) - { - if(session_plugin_schema->on_ctx_new) - { - plugin_ctx_rt->plugin_ctx=session_plugin_schema->on_ctx_new(sess, session_plugin_schema->plugin_env); - plugin_ctx_rt->state=ACTIVE; - } - } - if(sub_elt->msg_cb)sub_elt->msg_cb(sess, mq_elt->topic_id, mq_elt->msg_data, plugin_ctx_rt->plugin_ctx, - session_plugin_schema->plugin_env); - } - cur_sub_idx++; - } - if (topic->free_cb) - { - topic->free_cb(mq_elt->msg_data, topic->free_cb_arg); - } - } - DL_DELETE(plug_mgr_rt->pending_mq, mq_elt); - DL_APPEND(plug_mgr_rt->delivered_mq, mq_elt);// move to delivered message list - } - } - return; + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + assert(plug_mgr_rt); + if(plug_mgr_rt->session_topic_status==NULL)return -1;//runtime free stage , mq_status alaway null, ignore publish message + if(topic_id >= plug_mgr_rt->plug_mgr->stellar_mq_topic_num)return -1;// topic_id out of range + if(bitmap_get(plug_mgr_rt->session_topic_status, 0, topic_id) == 0)return 0; + return 1; } /******************************* * PLUGIN MANAGER SESSION RUNTIME * *******************************/ - -static struct plugin_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr) +static struct stellar_exdata *session_exdata_runtime_new(struct plugin_manager_schema *plug_mgr) { - struct plugin_exdata *exdata_rt = NULL; - if(plug_mgr->session_exdata_schema_array==NULL)return NULL; - unsigned int len = utarray_len(plug_mgr->session_exdata_schema_array); + struct stellar_exdata *exdata_rt = NULL; + if(plug_mgr->stellar_exdata_schema_array==NULL)return NULL; + unsigned int len = utarray_len(plug_mgr->stellar_exdata_schema_array); if(len > 0) { - exdata_rt=ALLOC(struct plugin_exdata, len); + exdata_rt=CALLOC(struct stellar_exdata, len); } return exdata_rt; } -static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct session *sess, struct plugin_exdata *exdata_rt) +static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct stellar_exdata *exdata_rt) { if(exdata_rt==NULL)return; - if(plug_mgr->session_exdata_schema_array==NULL)return; - unsigned int len=utarray_len(plug_mgr->session_exdata_schema_array); + if(plug_mgr->stellar_exdata_schema_array==NULL)return; + unsigned int len=utarray_len(plug_mgr->stellar_exdata_schema_array); for (unsigned int i = 0; i < len; i++) { void *exdata = (exdata_rt + i)->exdata; - struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(plug_mgr->session_exdata_schema_array, i); + (exdata_rt + i)->state=EXIT; + struct stellar_exdata_schema *schema = (struct stellar_exdata_schema *)utarray_eltptr(plug_mgr->stellar_exdata_schema_array, i); if (exdata) { if (schema->free_func) { - schema->free_func(sess, i, exdata, schema->free_arg); + schema->free_func(i, exdata, schema->free_arg); } } } @@ -671,15 +823,14 @@ static void session_exdata_runtime_free(struct plugin_manager_schema *plug_mgr, struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess) { - if(plug_mgr->registered_session_plugin_array==NULL)return NULL; - struct plugin_manager_runtime *rt = ALLOC(struct plugin_manager_runtime, 1); + struct plugin_manager_runtime *rt = CALLOC(struct plugin_manager_runtime, 1); rt->plug_mgr = plug_mgr; rt->sess = sess; - rt->pending_mq = NULL; - rt->delivered_mq = NULL; - rt->session_mq_status=bitmap_new(plug_mgr->topic_num, plug_mgr->subscriber_num, 1); - rt->plugin_exdata_array = (struct plugin_exdata *)session_exdata_runtime_new(plug_mgr); - rt->plugin_ctx_array = ALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); + rt->session_mq_status=bitmap_new(plug_mgr->session_topic_subscriber_num, plug_mgr->stellar_mq_topic_num, 1); + rt->session_topic_status=bitmap_new(1, plug_mgr->stellar_mq_topic_num, 1); + rt->sess_exdata_array = (struct stellar_exdata *)session_exdata_runtime_new(plug_mgr); + if(plug_mgr->registered_session_plugin_array) + rt->plugin_ctx_array = CALLOC(struct session_plugin_ctx_runtime, utarray_len(plug_mgr->registered_session_plugin_array)); return rt; } @@ -687,34 +838,35 @@ struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) { if(rt==NULL)return; - if(rt->pending_mq != NULL) - { - session_mq_free(rt->pending_mq); - rt->pending_mq=NULL; - } - if(rt->delivered_mq != NULL) - { - session_mq_free(rt->delivered_mq); - rt->delivered_mq=NULL; - } + if(rt->session_mq_status != NULL) { bitmap_free(rt->session_mq_status); + rt->session_mq_status=NULL; } - unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array); - for(unsigned int i = 0; i < len; i++) + if(rt->session_topic_status != NULL) { - struct session_plugin_ctx_runtime *plugin_ctx_rt=(rt->plugin_ctx_array+i); - struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i); - if(session_plugin_schema->on_ctx_free && plugin_ctx_rt->state==ACTIVE) - { - session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, session_plugin_schema->plugin_env); - } + bitmap_free(rt->session_topic_status); + rt->session_topic_status=NULL; } - FREE(rt->plugin_ctx_array); - - session_exdata_runtime_free(rt->plug_mgr, rt->sess, rt->plugin_exdata_array); - FREE(rt->plugin_exdata_array); + if (rt->plug_mgr->registered_session_plugin_array) + { + unsigned int len = utarray_len(rt->plug_mgr->registered_session_plugin_array); + for (unsigned int i = 0; i < len; i++) + { + struct session_plugin_ctx_runtime *plugin_ctx_rt = (rt->plugin_ctx_array + i); + struct registered_session_plugin_schema *session_plugin_schema = + (struct registered_session_plugin_schema *)utarray_eltptr(rt->plug_mgr->registered_session_plugin_array, i); + if (session_plugin_schema->on_ctx_free && plugin_ctx_rt->state == ACTIVE) + { + session_plugin_schema->on_ctx_free(rt->sess, plugin_ctx_rt->plugin_ctx, + session_plugin_schema->plugin_env); + } + } + FREE(rt->plugin_ctx_array); + } + session_exdata_runtime_free(rt->plug_mgr, rt->sess_exdata_array); + FREE(rt->sess_exdata_array); FREE(rt); } @@ -722,11 +874,9 @@ void plugin_manager_session_runtime_free(struct plugin_manager_runtime *rt) /********************************************* * PLUGIN MANAGER PACKET PLUGIN * *********************************************/ - - UT_icd registered_packet_plugin_array_icd = {sizeof(struct registered_packet_plugin_schema), NULL, NULL, NULL}; -int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol, plugin_on_packet_func on_packet, void *plugin_env) +int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_proto, plugin_on_packet_func on_packet_cb, void *plugin_env) { struct plugin_manager_schema *plug_mgr = stellar_get_plugin_manager(st); if(plug_mgr->registered_packet_plugin_array == NULL) @@ -735,20 +885,26 @@ int stellar_packet_plugin_register(struct stellar *st, unsigned char ip_protocol } struct registered_packet_plugin_schema packet_plugin_schema; memset(&packet_plugin_schema, 0, sizeof(packet_plugin_schema)); - packet_plugin_schema.ip_protocol = ip_protocol; - packet_plugin_schema.on_packet = on_packet; + packet_plugin_schema.ip_protocol = ip_proto; + packet_plugin_schema.on_packet = on_packet_cb; packet_plugin_schema.plugin_env = plugin_env; utarray_push_back(plug_mgr->registered_packet_plugin_array, &packet_plugin_schema); - return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array));// return packet plugin_id + return (PACKET_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_packet_plugin_array)-1);// return packet plugin_id, equals to packet plugin arrary index + PACKET_PULGIN_ID_BASE } -void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) { - if(plug_mgr == NULL || pkt == NULL)return; if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; struct registered_packet_plugin_schema *p=NULL; - //unsigned char ip_proto=packet_get_layers(pkt); // FIXME get ip_proto - unsigned char ip_proto=0; + + //TODO: get innermost layer ip protocol by packet api + struct tuple6 t6; + packet_get_innermost_tuple6(pkt, &t6); + unsigned char ip_proto=t6.ip_proto; + + int tid=stellar_get_current_thread_index(); + //TODO : provide public api to reset pub_msg_cnt + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=0;//reset pub_msg_cnt while ((p = (struct registered_packet_plugin_schema *)utarray_next(plug_mgr->registered_packet_plugin_array, p))) { if(p->ip_protocol == ip_proto && p->on_packet) @@ -756,14 +912,24 @@ void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct pac p->on_packet(pkt, ip_proto, p->plugin_env); } } + stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); return; } +void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt) +{ + if(plug_mgr->registered_packet_plugin_array == NULL || pkt == NULL)return; + int tid=stellar_get_current_thread_index(); + stellar_mq_dispatch(plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr->per_thread_data[tid].dealth_letter_queue, NULL, pkt); + plug_mgr->per_thread_data[tid].pub_packet_msg_cnt=-1;//disable packet message publish + stellar_mq_free(&plug_mgr->per_thread_data[tid].dealth_letter_queue, + plug_mgr->stellar_mq_schema_array); + per_thread_packet_exdata_arrary_clean(plug_mgr); +} + /********************************************* * PLUGIN MANAGER POLLING PLUGIN * *********************************************/ - - UT_icd registered_polling_plugin_array_icd = {sizeof(struct registered_polling_plugin_schema), NULL, NULL, NULL}; int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func on_polling, void *plugin_env) @@ -778,7 +944,7 @@ int stellar_polling_plugin_register(struct stellar *st, plugin_on_polling_func o polling_plugin_schema.on_polling = on_polling; polling_plugin_schema.plugin_env = plugin_env; utarray_push_back(plug_mgr->registered_polling_plugin_array, &polling_plugin_schema); - return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array));// return polling plugin_id + return (POLLING_PULGIN_ID_BASE+utarray_len(plug_mgr->registered_polling_plugin_array)-1);// return polling plugin_id, equals to polling plugin arrary index + POLLING_PULGIN_ID_BASE } int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) @@ -802,12 +968,8 @@ int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr) /********************************************* * PLUGIN MANAGER SESSION PLUGIN * *********************************************/ - - - UT_icd registered_session_plugin_schema_icd = {sizeof(struct registered_session_plugin_schema), NULL, NULL, NULL}; - int stellar_session_plugin_register(struct stellar *st, session_ctx_new_func session_ctx_new, session_ctx_free_func session_ctx_free, @@ -827,13 +989,13 @@ int stellar_session_plugin_register(struct stellar *st, return (utarray_len(plug_mgr->registered_session_plugin_array)-1);// return session plugin_id, equals to session plugin arrary index } -void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt) +void plugin_manager_on_session_ingress(struct session *sess, struct packet *pkt) { - if(sess == NULL || pkt == NULL)return; - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); if(plug_mgr_rt==NULL)return; - plugin_manager_scratch_session_set(sess); -#if 0 +#if 0 + int topic_id = -1; + //FIXME: get topic and tcp data by stellar api switch (packet_get_type(pkt)) { case TCP: @@ -850,29 +1012,37 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet break; default: break; - } + } + + plug_mgr_rt->pub_session_msg_cnt=0; + session_mq_publish_message_with_priority(sess, topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); + int tid=stellar_get_current_thread_index(); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); #endif + + plug_mgr_rt->pub_session_msg_cnt=0; + plugin_manager_scratch_session_set(sess); struct tcp_segment *seg; enum session_type type = session_get_type(sess); if (packet_is_ctrl(pkt)) { - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->control_packet_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); } else { switch (type) { case SESSION_TYPE_TCP: - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); if((seg = session_get_tcp_segment(sess)) != NULL) { - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id ,(void *)seg, STELLAR_MQ_PRIORITY_HIGH); //session_free_tcp_segment(sess, seg); } break; case SESSION_TYPE_UDP: - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,(void *)pkt, STELLAR_MQ_PRIORITY_HIGH); break; default: assert(0); @@ -880,48 +1050,79 @@ void plugin_manager_on_session_ingress(struct session *sess,const struct packet } } //TODO: check TCP topic active subscirber num, if 0, return disable assembler state, to reduce tcp reassemble overhead - plugin_manager_session_message_dispatch(sess); + int tid=stellar_get_current_thread_index(); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); plugin_manager_scratch_session_set(NULL); + + + return; } -void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt) +void plugin_manager_on_session_egress(struct session *sess, struct packet *pkt) { - if(sess == NULL || pkt == NULL)return; - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); if(plug_mgr_rt==NULL)return; plugin_manager_scratch_session_set(sess); - session_mq_publish_message(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,(void *)pkt); - plugin_manager_session_message_dispatch(sess); - session_mq_free(plug_mgr_rt->delivered_mq); - plug_mgr_rt->delivered_mq=NULL; + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->egress_topic_id ,pkt, STELLAR_MQ_PRIORITY_HIGH); + int tid=stellar_get_current_thread_index(); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, pkt); + plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish + stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); plugin_manager_scratch_session_set(NULL); + return; +} + +void plugin_manager_on_session_closing(struct session *sess) +{ + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + if(plug_mgr_rt==NULL)return; + plug_mgr_rt->pub_session_msg_cnt=0;// reset pub_msg_cnt + switch (session_get_type(sess)) + { + case SESSION_TYPE_TCP: + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->tcp_stream_topic_id , NULL, STELLAR_MQ_PRIORITY_HIGH); + break; + case SESSION_TYPE_UDP: + session_mq_publish_message_with_priority(sess, plug_mgr_rt->plug_mgr->udp_topic_id ,NULL, STELLAR_MQ_PRIORITY_HIGH); + break; + default: + break; + } + int tid=stellar_get_current_thread_index(); + stellar_mq_dispatch(plug_mgr_rt->plug_mgr->per_thread_data[tid].priority_mq, &plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, sess, NULL); + plug_mgr_rt->pub_session_msg_cnt=-1;//disable session message publish + stellar_mq_free(&plug_mgr_rt->plug_mgr->per_thread_data[tid].dealth_letter_queue, plug_mgr_rt->plug_mgr->stellar_mq_schema_array); return; } void stellar_session_plugin_dettach_current_session(struct session *sess) { - struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); + struct plugin_manager_runtime *plug_mgr_rt = (struct plugin_manager_runtime *)session_get_user_data(sess); struct registered_session_plugin_schema *session_plugin_schema = (struct registered_session_plugin_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->registered_session_plugin_array, (unsigned int)plug_mgr_rt->current_session_plugin_id); if(session_plugin_schema==NULL)return; - + struct stellar_mq_topic_schema *topic=NULL; unsigned int plugin_subscriber_num = utarray_len(session_plugin_schema->registed_session_mq_subscriber_info); - //TODO: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch + //Won't Do: maybe no need to clear session_mq_status, check plugin_ctx before message dispatch + //allow plugin register with null ctx_new and ctx_free if(plug_mgr_rt->session_mq_status) { for(unsigned int i=0; i < plugin_subscriber_num; i++) { - struct session_mq_subscriber_info *session_plugin_sub_info = (struct session_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); - bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->topic_id, session_plugin_sub_info->subscriber_idx, 0); + struct stellar_mq_subscriber_info *session_plugin_sub_info = (struct stellar_mq_subscriber_info *)utarray_eltptr(session_plugin_schema->registed_session_mq_subscriber_info, i); + bitmap_set(plug_mgr_rt->session_mq_status, session_plugin_sub_info->subscriber_idx,session_plugin_sub_info->topic_id, 0); + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr_rt->plug_mgr->stellar_mq_schema_array, (unsigned int)session_plugin_sub_info->topic_id); + session_mq_update_topic_status(plug_mgr_rt, topic); } } - - if(session_plugin_schema->on_ctx_free) + //dettach in ctx INIT, do not call on_ctx_free immidiately + if(plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state != INIT && (session_plugin_schema->on_ctx_free)) { - session_plugin_schema->on_ctx_free(sess, (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx, session_plugin_schema->plugin_env); + session_plugin_schema->on_ctx_free(sess, plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx, session_plugin_schema->plugin_env); + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].plugin_ctx=NULL; } - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->plugin_ctx=NULL; - (plug_mgr_rt->plugin_ctx_array+plug_mgr_rt->current_session_plugin_id)->state=EXIT; - return; -} + plug_mgr_rt->plugin_ctx_array[plug_mgr_rt->current_session_plugin_id].state=EXIT; + return; +}
\ No newline at end of file diff --git a/src/plugin/plugin_manager.h b/src/plugin/plugin_manager.h index 236a1ff..1de5878 100644 --- a/src/plugin/plugin_manager.h +++ b/src/plugin/plugin_manager.h @@ -1,5 +1,7 @@ #pragma once +#include "stellar/stellar.h" + #ifdef __cplusplus extern "C" { @@ -11,18 +13,19 @@ struct plugin_manager_runtime; struct plugin_manager_schema *plugin_manager_init(struct stellar *st, const char *plugin_spec_file_path); void plugin_manager_exit(struct plugin_manager_schema *plug_mgr); -void plugin_manager_on_packet(struct plugin_manager_schema *plug_mgr, struct packet *pkt); - +void plugin_manager_on_packet_ingress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); +void plugin_manager_on_packet_egress(struct plugin_manager_schema *plug_mgr, struct packet *pkt); //return polling work state, 0: idle, 1: working int plugin_manager_on_polling(struct plugin_manager_schema *plug_mgr); //publish and dispatch session msg(msg, pkt) on session_mq -void plugin_manager_on_session_ingress(struct session *sess,const struct packet *pkt); -void plugin_manager_on_session_egress(struct session *sess,const struct packet *pkt); +void plugin_manager_on_session_ingress(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_egress(struct session *sess,struct packet *pkt); +void plugin_manager_on_session_closing(struct session *sess); struct plugin_manager_runtime *plugin_manager_session_runtime_new(struct plugin_manager_schema *plug_mgr, struct session *sess); void plugin_manager_session_runtime_free(struct plugin_manager_runtime *plug_mgr_rt); #ifdef __cplusplus } -#endif +#endif
\ No newline at end of file diff --git a/src/plugin/plugin_manager_interna.h b/src/plugin/plugin_manager_interna.h new file mode 100644 index 0000000..07e462d --- /dev/null +++ b/src/plugin/plugin_manager_interna.h @@ -0,0 +1,192 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "plugin_manager.h" + +#include "stellar/stellar.h" +#include "stellar/stellar_mq.h" +#include "stellar/stellar_exdata.h" + +#include "bitmap/bitmap.h" +#include "uthash/utarray.h" + +struct per_thread_exdata_array +{ + struct stellar_exdata *exdata_array; +}; + +struct stellar_message; + +struct plugin_manger_per_thread_data +{ + struct per_thread_exdata_array per_thread_pkt_exdata_array; + struct stellar_message *priority_mq[STELLAR_MQ_PRIORITY_MAX];// message list + struct stellar_message *dealth_letter_queue;// dlq list + long long pub_packet_msg_cnt; +}; + + + +struct plugin_manager_schema +{ + struct stellar *st; + UT_array *plugin_load_specs_array; + UT_array *stellar_exdata_schema_array; + UT_array *stellar_mq_schema_array; + UT_array *registered_session_plugin_array; + UT_array *registered_packet_plugin_array; + UT_array *registered_polling_plugin_array; + int stellar_mq_topic_num; + int packet_topic_subscriber_num; + int session_topic_subscriber_num; + int tcp_topic_id; + int tcp_stream_topic_id; + int udp_topic_id; + int egress_topic_id; + int control_packet_topic_id; + int max_message_dispatch; + struct plugin_manger_per_thread_data *per_thread_data; +}__attribute__((aligned(sizeof(void*)))); + +enum plugin_exdata_state +{ INIT, ACTIVE, EXIT }; + +struct stellar_exdata +{ + void *exdata; + enum plugin_exdata_state state; +}; + + + +struct stellar_exdata_schema +{ + char *name; + stellar_exdata_free *free_func; + + void *free_arg; + int idx; +}__attribute__((aligned(sizeof(void*)))); + + +enum stellar_topic_type +{ + ON_SESSION_TOPIC, + ON_PACKET_TOPIC, +}; + +struct stellar_message +{ + struct + { + int topic_id; + enum stellar_topic_type type; + enum stellar_mq_priority priority; + } header; + void *body; + struct stellar_message *next, *prev; +} __attribute__((aligned(sizeof(void *)))); + +typedef struct stellar_mq_subscriber +{ + int topic_subscriber_idx; + int plugin_idx; + union + { + on_session_msg_cb_func *sess_msg_cb; + on_packet_msg_cb_func *pkt_msg_cb; + void *msg_cb; + }; + struct stellar_mq_subscriber *next, *prev; +}stellar_mq_subscriber __attribute__((aligned(sizeof(void*)))); + + +struct stellar_mq_topic_schema +{ + char *topic_name; + void *free_cb_arg; + int topic_id; + int subscriber_cnt; + int is_destroyed; + stellar_msg_free_cb_func *free_cb; + struct stellar_mq_subscriber *subscribers; +}__attribute__((aligned(sizeof(void*)))); + + + +struct session_plugin_ctx_runtime +{ + enum plugin_exdata_state state; + int session_plugin_id; + void *plugin_ctx; +}__attribute__((aligned(sizeof(void*)))); + + + +struct plugin_manager_runtime +{ + struct plugin_manager_schema *plug_mgr; + struct session *sess; + struct bitmap *session_mq_status; //N * M bits, N topic, M subscriber + struct bitmap *session_topic_status; //N bits, N topic + struct stellar_exdata *sess_exdata_array; + struct session_plugin_ctx_runtime *plugin_ctx_array;//N plugins TODO: call alloc and free + int current_session_plugin_id; + int pub_session_msg_cnt; +}__attribute__((aligned(sizeof(void*)))); + +struct registered_packet_plugin_schema +{ + char ip_protocol; + plugin_on_packet_func *on_packet; + void *plugin_env; + UT_array *registed_packet_mq_subscriber_info; +}__attribute__((aligned(sizeof(void*)))); + +struct registered_polling_plugin_schema +{ + plugin_on_polling_func *on_polling; + void *plugin_env; +}__attribute__((aligned(sizeof(void*)))); + +struct stellar_mq_subscriber_info +{ + int topic_id; + int subscriber_idx; +}__attribute__((aligned(sizeof(void*)))); + +struct registered_session_plugin_schema +{ + session_ctx_new_func *on_ctx_new; + session_ctx_free_func *on_ctx_free; + void *plugin_env; + UT_array *registed_session_mq_subscriber_info; +}__attribute__((aligned(sizeof(void*)))); + +#define SESSION_PULGIN_ID_BASE 0x00000 +#define PACKET_PULGIN_ID_BASE 0x10000 +#define POLLING_PULGIN_ID_BASE 0x20000 + +/******************************* + * PLUGIN MANAGER INIT & EXIT * + *******************************/ + +#define MAX_MSG_PER_DISPATCH 128 + +#include <dlfcn.h> + +struct plugin_specific +{ + char plugin_name[256]; + plugin_on_load_func *load_cb; + plugin_on_unload_func *unload_cb; + void *plugin_ctx; +}__attribute__((aligned(sizeof(void*)))); + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/plugin/test/CMakeLists.txt b/src/plugin/test/CMakeLists.txt new file mode 100644 index 0000000..7698640 --- /dev/null +++ b/src/plugin/test/CMakeLists.txt @@ -0,0 +1,17 @@ +add_executable(gtest_plugin_manager +plugin_manager_gtest_main.cpp +) + +include_directories(${CMAKE_SOURCE_DIR}/src/plugin/) + +target_link_libraries( + gtest_plugin_manager + plugin_manager + dl + "-rdynamic" + gtest + gmock +) + +include(GoogleTest) +gtest_discover_tests(gtest_plugin_manager)
\ No newline at end of file diff --git a/src/plugin/test/plugin_manager_gtest_main.cpp b/src/plugin/test/plugin_manager_gtest_main.cpp new file mode 100644 index 0000000..44cb3d4 --- /dev/null +++ b/src/plugin/test/plugin_manager_gtest_main.cpp @@ -0,0 +1,2312 @@ +#pragma GCC diagnostic ignored "-Wunused-parameter" + +#include <gtest/gtest.h> +#include "stellar/utils.h" + +#include "plugin_manager_gtest_mock.h" + +#define STELLAR_INTRINSIC_TOPIC_NUM 5 +#define TOPIC_NAME_MAX 512 + +void whitebox_test_plugin_manager_intrisic_metadata(struct stellar *st, struct plugin_manager_schema *plug_mgr) +{ + SCOPED_TRACE("whitebox test intrisic metadata"); + + EXPECT_TRUE(plug_mgr!=NULL); + + EXPECT_EQ(plug_mgr->st, st); + + //load spec null + EXPECT_TRUE(plug_mgr->plugin_load_specs_array==NULL); + + //session exdata schema null + EXPECT_TRUE(plug_mgr->stellar_exdata_schema_array==NULL); + + //session mq schema not null + EXPECT_TRUE(plug_mgr->stellar_mq_schema_array!=NULL); + + //registered plugin array null + EXPECT_TRUE(plug_mgr->registered_polling_plugin_array==NULL); + EXPECT_TRUE(plug_mgr->registered_packet_plugin_array==NULL); + EXPECT_TRUE(plug_mgr->registered_session_plugin_array==NULL); + + int intrinsic_topic_num=utarray_len(plug_mgr->stellar_mq_schema_array); + EXPECT_EQ(plug_mgr->stellar_mq_topic_num, intrinsic_topic_num);//TCP,UDP,TCP_STREAM,EGRESS,CONTROL + + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_TCP); + + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->tcp_stream_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_TCP_STREAM); + + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->udp_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_UDP); + + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->egress_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_EGRESS); + + topic = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)plug_mgr->control_packet_topic_id); + EXPECT_STREQ(topic->topic_name, TOPIC_CONTROL_PACKET); + + //intrinsic topic + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_TCP_STREAM), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_UDP), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_EGRESS), 0); + EXPECT_GE(stellar_mq_get_topic_id(st, TOPIC_CONTROL_PACKET), 0); + + EXPECT_TRUE(plug_mgr->per_thread_data!=NULL); + int thread_num=stellar_get_worker_thread_num(st); + for(int i=0; i<thread_num; i++) + { + EXPECT_TRUE(plug_mgr->per_thread_data[i].per_thread_pkt_exdata_array.exdata_array==NULL); + EXPECT_TRUE(plug_mgr->per_thread_data[i].dealth_letter_queue==NULL); + for(int j=0; j<STELLAR_MQ_PRIORITY_MAX; j++) + EXPECT_TRUE(plug_mgr->per_thread_data[i].priority_mq[j]==NULL); + } +} + +/*********************************** + * TEST PLUGIN MANAGER INIT & EXIT * + ***********************************/ + +//TODO: test plugin_specs_load + +TEST(plugin_manager_init, init_with_null_toml) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + plugin_manager_exit(plug_mgr); +} + +/****************************************** + * TEST PLUGIN MANAGER PACKET PLUGIN INIT * + ******************************************/ + +static void test_mock_packet_exdata_free(int idx, void *ex_ptr, void *arg){} + +static void test_mock_overwrite_packet_exdata_free(int idx, void *ex_ptr, void *arg){} + +TEST(plugin_manager_init, packet_exdata_new_index_overwrite) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *exdata_name="PACKET_EXDATA"; + int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_packet_exdata_free, &st); + EXPECT_GE(exdata_idx, 0); + int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_packet_exdata_free, plug_mgr); + EXPECT_GE(overwrite_idx, 0); + EXPECT_EQ(overwrite_idx, exdata_idx); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr( + plug_mgr->stellar_exdata_schema_array, (unsigned int)exdata_idx); + EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, plug_mgr); + EXPECT_EQ(exdata_schema->idx, exdata_idx); + EXPECT_STREQ(exdata_schema->name, exdata_name); + + int exdata_num = utarray_len(plug_mgr->stellar_exdata_schema_array); + EXPECT_EQ(exdata_num, 1); + } + + plugin_manager_exit(plug_mgr); +} + +void test_mock_packet_msg_free(void *msg, void *msg_free_arg){} +void test_mock_overwrite_packet_msg_free(void *msg, void *msg_free_arg){} + +TEST(plugin_manager_init, packet_mq_topic_create_and_update) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="PACKET_TOPIC"; + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1); // illegal topic_name + + int topic_id = stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + struct stellar_mq_topic_schema *topic_schema = NULL; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_packet_msg_free, plug_mgr), + -1); // duplicate create, return error + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_packet_msg_free, plug_mgr), 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); + } + + EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1); // illgeal topic_id + + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0); // duplicate destroy, return 0; + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), 1+STELLAR_INTRINSIC_TOPIC_NUM); // destory won't delete the topic schema + } + plugin_manager_exit(plug_mgr); +} + +void test_mock_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){} + +void test_mock_overwrite_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env){} + +TEST(plugin_manager_init, packet_mq_subscribe) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + + const char *topic_name="PACKET_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_packet_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal plugin_id + EXPECT_EQ(stellar_packet_mq_subscribe(&st, 10, test_mock_on_packet_msg, 10+PACKET_PULGIN_ID_BASE),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_packet_plugin_register(&st, 6, NULL, &st); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_on_packet_msg, plugin_id),0); + EXPECT_EQ(stellar_packet_mq_subscribe(&st, topic_id, test_mock_overwrite_on_packet_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_packet_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->pkt_msg_cb, (void *)test_mock_overwrite_on_packet_msg); + + plugin_manager_exit(plug_mgr); +} + + +/******************************************* + * TEST PLUGIN MANAGER PACKET PLUGIN RUNTIME* + *******************************************/ + +#define PACKET_PROTO_PLUGIN_NUM 128 +#define PACKET_EXDATA_NUM 2 +#define PACKET_TOPIC_NUM 2 +#define PACKET_MQ_SUB_NUM 2 +struct packet_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int basic_on_packet_called; + int proto_filter_plugin_id[PACKET_PROTO_PLUGIN_NUM]; + int proto_filter_plugin_called[PACKET_PROTO_PLUGIN_NUM]; + int exdata_set_on_packet_called; + int exdata_get_on_packet_called; + unsigned int packet_exdata_idx[PACKET_EXDATA_NUM]; + int exdata_free_called[PACKET_EXDATA_NUM]; + unsigned int packet_topic_id[PACKET_TOPIC_NUM]; + unsigned int packet_mq_sub_plugin_id[PACKET_MQ_SUB_NUM]; + int msg_pub_cnt; + int msg_sub_cnt; + int msg_free_cnt; +}; + +static void test_basic_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + env->basic_on_packet_called+=1; + return; +} + +TEST(plugin_manager, packet_plugin_illegal_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_basic_on_packet, &env); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + int packet_plugin_num = utarray_len(plug_mgr->registered_packet_plugin_array); + EXPECT_EQ(packet_plugin_num, 1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); +} + +static void test_proto_filter_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, 2, pkt), -1);// illegal set + EXPECT_EQ(packet_exdata_get(pkt, 2), nullptr);// illegal get + env->proto_filter_plugin_called[ip_protocol]+=1; + return; +} + +TEST(plugin_manager, packet_plugins_with_proto_filter) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + int proto_filter_plugin_num=(int)(sizeof(env.proto_filter_plugin_id) / sizeof(env.proto_filter_plugin_id[0])); + for (int i = 0; i < proto_filter_plugin_num; i++) + { + env.proto_filter_plugin_id[i] = stellar_packet_plugin_register(&st, i, test_proto_filter_on_packet, &env); + EXPECT_GE(env.proto_filter_plugin_id[i], PACKET_PULGIN_ID_BASE); + + + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), proto_filter_plugin_num); + } + + struct packet pkt={&st, IPv4, 0}; + + int N_packet=10; + for (int j = 0; j < N_packet; j++) + { + for (int i = 0; i < proto_filter_plugin_num; i++) + { + pkt.ip_proto = i; + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + } + plugin_manager_exit(plug_mgr); + + for (int i = 0; i < proto_filter_plugin_num; i++) + { + EXPECT_EQ(env.proto_filter_plugin_called[i], N_packet); + } +} + + +struct test_exdata +{ + struct packet *pkt; + struct session *sess; + long long value; +}; + +static void test_exdata_set_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->exdata_set_on_packet_called+=1; + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; i<exdata_idx_len; i++) + { + struct test_exdata *exdata_val=CALLOC(struct test_exdata , 1); + exdata_val->value=i; + exdata_val->pkt=pkt; + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[i], exdata_val), 0); + } + return; +} + +static void test_exdata_get_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + + int exdata_idx_len=(int)(sizeof(env->packet_exdata_idx) / sizeof(env->packet_exdata_idx[0])); + + for(int i=0; i<exdata_idx_len; i++) + { + struct test_exdata *exdata_val=(struct test_exdata *)packet_exdata_get(pkt, env->packet_exdata_idx[i]); + EXPECT_EQ(exdata_val->value, i); + } + env->exdata_get_on_packet_called+=1; + return; +} + +static void test_packet_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + struct test_exdata *exdata_val=(struct test_exdata *)ex_ptr; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + EXPECT_EQ(exdata_val->value, idx); + + EXPECT_EQ(packet_exdata_get(exdata_val->pkt, idx), nullptr);// illegal get in exdata_free callback + EXPECT_EQ(packet_exdata_set(exdata_val->pkt, idx, exdata_val->pkt), -1);// illegal set in exdata_free callback + + FREE(ex_ptr); + env->exdata_free_called[idx]+=1; + + return; +} + + +TEST(plugin_manager, packet_plugins_share_exdata) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + + char exdata_name[PACKET_EXDATA_NUM][TOPIC_NAME_MAX]; + int exdata_idx_len=(int)(sizeof(env.packet_exdata_idx) / sizeof(env.packet_exdata_idx[0])); + for(int i=0; i<exdata_idx_len; i++) + { + sprintf(exdata_name[i], "PACKET_EXDATA_%d", i); + env.packet_exdata_idx[i]=stellar_exdata_new_index(&st, exdata_name[i], test_packet_exdata_free, &env); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr( + plug_mgr->stellar_exdata_schema_array, env.packet_exdata_idx[i]); + + EXPECT_EQ(exdata_schema->free_func, (void *)test_packet_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, &env); + EXPECT_EQ(exdata_schema->idx, env.packet_exdata_idx[i]); + EXPECT_STREQ(exdata_schema->name, exdata_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_exdata_schema_array), exdata_idx_len); + } + + int exdata_set_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_set_on_packet, &env); + EXPECT_GE(exdata_set_plugin_id, PACKET_PULGIN_ID_BASE); + + int exdata_get_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_get_on_packet, &env); + EXPECT_GE(exdata_get_plugin_id, PACKET_PULGIN_ID_BASE); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), 2); // Fix plugin number + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + + for(int i=0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.exdata_set_on_packet_called, N_packet); + EXPECT_EQ(env.exdata_get_on_packet_called, N_packet); + + for(int i=0; i < exdata_idx_len; i++) + { + EXPECT_EQ(env.exdata_free_called[i], N_packet); + } +} + +static void test_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + return; +} + +static void test_mq_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->msg_sub_cnt+=1; + return; +} + +static void test_mq_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + for(int i=0; i<topic_id_num; i++) + { + EXPECT_EQ(packet_mq_publish_message(pkt, env->packet_topic_id[i], pkt), 0); + env->msg_pub_cnt+=1; + } + return; +} + +TEST(plugin_manager, packet_plugins_mq_pub_sub) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; i<topic_id_num; i++) + { + sprintf(topic_name[i], "PACKET_TOPIC_%d", i); + env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], test_packet_msg_free_cb_func, &env); + EXPECT_GE(env.packet_topic_id[i], 0); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, test_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_mq_pub_on_packet, &env); + EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], test_mq_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*topic_id_num, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + +static void overlimit_packet_msg_free_cb_func(void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + FREE(msg); + return; +} + +static void overlimit_sub_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + env->msg_sub_cnt+=1; + return; +} + +static void overlimit_pub_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + int topic_id_num=(int)(sizeof(env->packet_topic_id) / sizeof(env->packet_topic_id[0])); + int cnt=0; + int *msg; + for(int i=0; i<topic_id_num; i++) + { + for(int j=0; j < MAX_MSG_PER_DISPATCH; j++) + { + msg=CALLOC(int, 1); + *msg=cnt; + int pub_ret=packet_mq_publish_message(pkt, env->packet_topic_id[i], msg); + if(cnt < MAX_MSG_PER_DISPATCH) + { + ASSERT_EQ(pub_ret, 0); + env->msg_pub_cnt+=1; + } + else + { + ASSERT_EQ(pub_ret, -1); + } + if(pub_ret!=0)FREE(msg); + cnt+=1; + } + } + return; +} + +TEST(plugin_manager, packet_plugins_pub_overlimit) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + char topic_name[PACKET_TOPIC_NUM][TOPIC_NAME_MAX]; + + int topic_id_num=(int)(sizeof(env.packet_topic_id) / sizeof(env.packet_topic_id[0])); + + for(int i=0; i<topic_id_num; i++) + { + sprintf(topic_name[i], "PACKET_TOPIC_%d", i); + env.packet_topic_id[i]=stellar_mq_create_topic(&st, topic_name[i], overlimit_packet_msg_free_cb_func, &env); + EXPECT_GE(env.packet_topic_id[i], 0); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_mq_topic_schema *topic = (struct stellar_mq_topic_schema *)utarray_eltptr( + plug_mgr->stellar_mq_schema_array, env.packet_topic_id[i]); + EXPECT_EQ(topic->free_cb, overlimit_packet_msg_free_cb_func); + EXPECT_EQ(topic->free_cb_arg, &env); + EXPECT_EQ(topic->topic_id, env.packet_topic_id[i]); + EXPECT_STREQ(topic->topic_name, topic_name[i]); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), topic_id_num+STELLAR_INTRINSIC_TOPIC_NUM); + } + + int pub_plugin_id=stellar_packet_plugin_register(&st, ip_proto, overlimit_pub_on_packet, &env); + EXPECT_GE(pub_plugin_id, PACKET_PULGIN_ID_BASE); + + int topic_sub_num=(int)(sizeof(env.packet_mq_sub_plugin_id) / sizeof(env.packet_mq_sub_plugin_id[0])); + + for (int i = 0; i < topic_sub_num; i++) + { + env.packet_mq_sub_plugin_id[i] = stellar_packet_plugin_register(&st, ip_proto, NULL, &env);// empty on_packet is ok + EXPECT_GE(env.packet_mq_sub_plugin_id[i], PACKET_PULGIN_ID_BASE); + for(int j = 0; j < topic_id_num; j++) + { + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[j], overlimit_sub_on_packet_msg, env.packet_mq_sub_plugin_id[i]), 0); + } + } + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->registered_packet_plugin_array), topic_sub_num+1); + } + + struct packet pkt={&st, IPv4, ip_proto}; + + int N_packet=10; + for (int i = 0; i < N_packet; i++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + plugin_manager_exit(plug_mgr); + EXPECT_EQ(N_packet*MAX_MSG_PER_DISPATCH, env.msg_pub_cnt); + EXPECT_EQ(env.msg_free_cnt, env.msg_pub_cnt); + EXPECT_EQ(env.msg_sub_cnt, env.msg_pub_cnt*topic_sub_num); +} + + + +static void test_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)arg; + EXPECT_EQ(env->packet_exdata_idx[idx], idx); + env->exdata_free_called[idx]+=1; + EXPECT_EQ(packet_mq_publish_message((struct packet *)ex_ptr, env->packet_topic_id[0], (struct packet *)ex_ptr), -1);// publish message in packet exdata_free is illegal + env->msg_pub_cnt+=1; + return; +} + +static void test_exdata_free_pub_msg_free( void *msg, void *msg_free_arg) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)msg_free_arg; + env->msg_free_cnt+=1; + EXPECT_EQ(packet_mq_publish_message((struct packet *)msg, env->packet_topic_id[0], msg), -1 );// publish message in packet msg_free is illegal + return; +} + +static void test_exdata_free_pub_msg_on_packet(struct packet *pkt, unsigned char ip_protocol, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_TRUE(env!=NULL); + EXPECT_EQ(pkt->ip_proto, ip_protocol); + EXPECT_EQ(pkt->st, env->plug_mgr->st); + EXPECT_EQ(packet_exdata_set(pkt, env->packet_exdata_idx[0], pkt), 0); + env->basic_on_packet_called+=1; + return; +} + +static void test_exdata_free_pub_msg_on_packet_msg(struct packet *pkt, int topic_id, const void *msg, void *plugin_env) +{ + struct packet_plugin_env *env = (struct packet_plugin_env *)plugin_env; + EXPECT_EQ(topic_id, env->packet_topic_id[0]); + env->msg_sub_cnt+=1; +} + +TEST(plugin_manager, packet_plugin_exdata_free_pub_msg) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct packet_plugin_env env; + memset(&env, 0, sizeof(struct packet_plugin_env)); + env.plug_mgr=plug_mgr; + int plugin_id=stellar_packet_plugin_register(&st, ip_proto, test_exdata_free_pub_msg_on_packet, &env); + EXPECT_GE(plugin_id, PACKET_PULGIN_ID_BASE); + + env.packet_exdata_idx[0]=stellar_exdata_new_index(&st, "PACKET_EXDATA", test_exdata_free_pub_msg_exdata_free, &env); + env.packet_topic_id[0]=stellar_mq_create_topic(&st, "PACKET_TOPIC", test_exdata_free_pub_msg_free, &env); + + EXPECT_EQ(stellar_packet_mq_subscribe(&st, env.packet_topic_id[0], test_exdata_free_pub_msg_on_packet_msg, plugin_id),0); + + + struct packet pkt={&st, IPv4, ip_proto}; + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_on_packet_called, 1); + EXPECT_EQ(env.msg_pub_cnt, 1); + EXPECT_EQ(env.msg_sub_cnt, 0); + EXPECT_EQ(env.msg_free_cnt, 0); + EXPECT_EQ(env.exdata_free_called[0], 1); +} + +/********************************************** + * TEST PLUGIN MANAGER ON SESSION PLUGIN INIT * + **********************************************/ + +static void test_mock_session_exdata_free(int idx, void *ex_ptr, void *arg){} +static void test_mock_overwrite_session_exdata_free(int idx, void *ex_ptr, void *arg){} + +TEST(plugin_manager_init, session_exdata_new_index_overwrite) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *exdata_name="SESSION_EXDATA"; + int exdata_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_session_exdata_free, &st); + EXPECT_GE(exdata_idx, 0); + int overwrite_idx=stellar_exdata_new_index(&st,exdata_name, test_mock_overwrite_session_exdata_free, plug_mgr); + EXPECT_GE(overwrite_idx, 0); + EXPECT_EQ(overwrite_idx, exdata_idx); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + struct stellar_exdata_schema *exdata_schema = (struct stellar_exdata_schema *)utarray_eltptr( + plug_mgr->stellar_exdata_schema_array, (unsigned int)exdata_idx); + EXPECT_EQ(exdata_schema->free_func, (void *)test_mock_overwrite_session_exdata_free); + EXPECT_EQ(exdata_schema->free_arg, plug_mgr); + EXPECT_EQ(exdata_schema->idx, exdata_idx); + EXPECT_STREQ(exdata_schema->name, exdata_name); + + int exdata_num = utarray_len(plug_mgr->stellar_exdata_schema_array); + EXPECT_EQ(exdata_num, 1); + } + plugin_manager_exit(plug_mgr); +} + +void test_mock_session_msg_free(void *msg, void *msg_free_arg){} +void test_mock_overwrite_session_msg_free(void *msg, void *msg_free_arg){} + +TEST(plugin_manager_init, session_mq_topic_create_and_update) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="SESSION_TOPIC"; + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), -1);// illegal topic_name + + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + EXPECT_GE(topic_id, 0); + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_get_topic_id(&st, topic_name), topic_id); + EXPECT_EQ(stellar_mq_create_topic(&st, topic_name, test_mock_overwrite_session_msg_free, plug_mgr), -1); // duplicate create, return error + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(stellar_mq_update_topic(&st, topic_id, test_mock_overwrite_session_msg_free, plug_mgr), 0); + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = + (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array, (unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_overwrite_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, plug_mgr); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1); // 5 intrinsic topic + 1 created topic + } + + EXPECT_EQ(stellar_mq_destroy_topic(&st, 10), -1);// illgeal session topic_id + + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 1); + EXPECT_EQ(stellar_mq_destroy_topic(&st, topic_id), 0);//duplicate destroy, return 0; + + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_EQ(utarray_len(plug_mgr->stellar_mq_schema_array), STELLAR_INTRINSIC_TOPIC_NUM+1);//destory won't delete the topic schema + } + EXPECT_EQ(plug_mgr->stellar_mq_topic_num, STELLAR_INTRINSIC_TOPIC_NUM);//intrinsic topic number + + plugin_manager_exit(plug_mgr); +} + +void test_mock_on_session_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env){} +void test_mock_overwrite_on_session_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env){} + + +TEST(plugin_manager_init, session_mq_subscribe_overwrite) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + const char *topic_name="SESSION_TOPIC"; + + int topic_id=stellar_mq_create_topic(&st, topic_name, test_mock_session_msg_free, &st); + EXPECT_GE(topic_id, 0); + + EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, 10),-1);//illgeal plugin_id + EXPECT_EQ(stellar_session_mq_subscribe(&st, 10, test_mock_on_session_msg, 10),-1);//illgeal topic_id & plugin_id + + int plugin_id=stellar_session_plugin_register(&st, NULL, NULL, &st); + EXPECT_GE(plugin_id, 0); + + EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_on_session_msg, plugin_id),0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, topic_id, test_mock_overwrite_on_session_msg, plugin_id),0);//duplicate subscribe, return 0, won't overwrite + + struct stellar_mq_topic_schema *topic_schema; + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + topic_schema = (struct stellar_mq_topic_schema *)utarray_eltptr(plug_mgr->stellar_mq_schema_array,(unsigned int)topic_id); + EXPECT_EQ(topic_schema->free_cb, (void *)test_mock_session_msg_free); + EXPECT_EQ(topic_schema->free_cb_arg, &st); + EXPECT_EQ(topic_schema->topic_id, topic_id); + EXPECT_STREQ(topic_schema->topic_name, topic_name); + } + + EXPECT_EQ(topic_schema->subscriber_cnt, 1); + EXPECT_EQ(topic_schema->subscribers->sess_msg_cb, (void *)test_mock_overwrite_on_session_msg); + + plugin_manager_exit(plug_mgr); +} +/********************************************** + * TEST PLUGIN MANAGER ON SESSION PLUGIN RUNTIME * + **********************************************/ + +struct session_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int N_session; + int N_per_session_pkt_cnt; + int intrinsc_tcp_topic_id; + int intrinsc_egress_topic_id; + int basic_exdata_idx; + int basic_exdata_free_called; + int basic_on_session_ingress_called; + int basic_on_session_egress_called; + int basic_ctx_new_called; + int basic_ctx_free_called; + int test_mq_pub_plugin_id; + int test_mq_sub_plugin_id; + int test_mq_pub_called; + int test_mq_sub_called; + int test_mq_free_called; + int test_mq_topic_id; + int plugin_id_1; + int plugin_id_2; + int plugin_id_1_called; + int plugin_id_2_called; +}; + +TEST(plugin_manager, no_plugin_register_runtime) { + + struct stellar st={0}; + +// init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// init plugin + +// prepare packet and session + + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +//exit stage + plugin_manager_exit(plug_mgr); +} + + +struct test_basic_ctx +{ + int called; +}; + +static void *test_basic_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_new_called+=1; + struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); + return ctx; +} + +static void test_basic_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; + EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt*2);//ingress + egress + closing + FREE(ctx); + return; +} + +static void test_basic_on_session_ingress(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set + EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0); + if(msg) + { + env->basic_on_session_ingress_called+=1; + ctx->called+=1; + } + return; +} + +static void test_basic_on_session_egress(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(session_exdata_set(sess, 2, sess), -1);// illegal set + EXPECT_EQ(session_exdata_get(sess, 2), nullptr);// illegal get + EXPECT_TRUE(msg!=NULL); + EXPECT_EQ(session_exdata_get(sess, env->basic_exdata_idx), sess); + env->basic_on_session_egress_called+=1; + ctx->called+=1; + return; +} + +static void test_basic_session_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)arg; + EXPECT_EQ(env->basic_exdata_idx, idx); + EXPECT_EQ(session_exdata_get((struct session *)ex_ptr, idx), nullptr);// illegal get in exdata_free callback + EXPECT_EQ(session_exdata_set((struct session *)ex_ptr, idx, arg), -1);// illegal set in exdata_free callback + env->basic_exdata_free_called+=1; +} + +TEST(plugin_manager, session_plugin_on_intrinsic_ingress_egress) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=1; + + int plugin_id=stellar_session_plugin_register(&st, test_basic_session_ctx_new, test_basic_session_ctx_free, &env); + EXPECT_GE(plugin_id, 0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_basic_on_session_ingress, plugin_id), 0); + + env.intrinsc_egress_topic_id=stellar_mq_get_topic_id(&st, TOPIC_EGRESS); + EXPECT_GE(env.intrinsc_egress_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_ingress, plugin_id), 0);// Intentional error + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_egress_topic_id, test_basic_on_session_egress, plugin_id), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "SESSION_EXDATA", test_basic_session_exdata_free,&env); + EXPECT_GE(env.basic_exdata_idx, 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_TRUE(env.basic_on_session_ingress_called == env.basic_on_session_egress_called && env.basic_on_session_ingress_called == env.N_session*env.N_per_session_pkt_cnt); + EXPECT_TRUE(env.basic_ctx_new_called == env.basic_ctx_free_called && env.basic_ctx_new_called == env.N_session); + EXPECT_EQ(env.basic_exdata_free_called, env.N_session); + +} + +struct test_session_mq_ctx +{ + int called; +}; + +static void *test_mq_pub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); + return ctx; +} + +static void test_mq_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; + EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void *test_mq_sub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); + EXPECT_EQ(session_mq_ignore_message(sess, env->intrinsc_tcp_topic_id, env->test_mq_sub_plugin_id), 0); + return ctx; +} + +static void test_mq_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; + EXPECT_EQ(ctx->called, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void test_mq_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + if (msg) + { + env->test_mq_pub_called += 1; + ctx->called += 1; + int *pub_msg = (int *)CALLOC(int, 1); + *pub_msg = env->test_mq_pub_called; + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0); + } + return; +} + +static void test_mq_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(*(int *)msg, env->test_mq_pub_called); + env->test_mq_sub_called+=1; + ctx->called+=1; + return; +} + +static void test_session_msg_free(void *msg, void *msg_free_arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; + if(msg) + { + EXPECT_EQ(env->test_mq_pub_called, *(int *)msg); + env->test_mq_free_called+=1; + FREE(msg); + } + return; +} + +TEST(plugin_manager, session_plugin_ignore_on_ctx_new_sub_other_msg) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_mq_pub_session_ctx_new, test_mq_pub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_pub_plugin_id, 0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_mq_pub_on_session, env.test_mq_pub_plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_session_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_mq_sub_session_ctx_new, test_mq_sub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_sub_plugin_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_mq_on_sub_msg, env.test_mq_sub_plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.test_mq_free_called, env.test_mq_pub_called); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt); +} + +struct test_overlimit_session_mq_ctx +{ + int pkt_cnt; + int pub_cnt; + int sub_cnt; +}; + +static void *test_overlimit_pub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_pub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_cnt, env->N_per_session_pkt_cnt); + FREE(ctx); + return; +} + +static void *test_overlimit_sub_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_overlimit_session_mq_ctx *ctx=CALLOC(struct test_overlimit_session_mq_ctx, 1); + return ctx; +} + +static void test_overlimit_sub_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)session_ctx; + EXPECT_EQ(ctx->sub_cnt, (env->N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1))); //minus intrinsic msg + FREE(ctx); + return; +} + +struct test_overlimit_msg +{ + struct session *sess; + int called; +}; + +static void test_overlimit_pub_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + struct test_overlimit_msg *pub_msg; + if (msg) + { + env->test_mq_pub_called += 1; + ctx->pkt_cnt += 1; + for(int i=0; i < MAX_MSG_PER_DISPATCH*2; i++) + { + pub_msg = CALLOC(struct test_overlimit_msg, 1); + pub_msg->called = env->test_mq_pub_called; + pub_msg->sess=sess; + if(i<(MAX_MSG_PER_DISPATCH-1))// minus intrinsic msg + { + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), 0); + ctx->pub_cnt+=1; + } + else + { + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, pub_msg), -1); + FREE(pub_msg); + } + } + } + return; +} + +static void test_overlimit_on_sub_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + struct test_overlimit_session_mq_ctx *ctx=(struct test_overlimit_session_mq_ctx *)per_session_ctx; + EXPECT_TRUE(env!=NULL); + EXPECT_TRUE(ctx!=NULL); + EXPECT_EQ(sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(recv_msg->called, env->test_mq_pub_called); + env->test_mq_sub_called+=1; + ctx->sub_cnt+=1; + return; +} + +static void test_overlimit_session_msg_free(void *msg, void *msg_free_arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; + struct test_overlimit_msg *recv_msg=(struct test_overlimit_msg *)msg; + if(recv_msg) + { + EXPECT_EQ(recv_msg->sess->plug_mgr_rt->plug_mgr, env->plug_mgr); + EXPECT_EQ(session_mq_publish_message(recv_msg->sess, env->test_mq_topic_id, msg), -1);// illegal publish when msg_free + EXPECT_EQ(env->test_mq_pub_called, recv_msg->called); + env->test_mq_free_called+=1; + FREE(msg); + } + return; +} + +TEST(plugin_manager, session_plugin_pub_msg_overlimt) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + env.test_mq_pub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_pub_session_ctx_new, test_overlimit_pub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_pub_plugin_id, 0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_overlimit_pub_on_session, env.test_mq_pub_plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_overlimit_session_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + env.test_mq_sub_plugin_id=stellar_session_plugin_register(&st, test_overlimit_sub_session_ctx_new, test_overlimit_sub_session_ctx_free, &env); + EXPECT_GE(env.test_mq_sub_plugin_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_overlimit_on_sub_msg, env.test_mq_sub_plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.test_mq_pub_called,env.N_per_session_pkt_cnt*env.N_session); + EXPECT_EQ(env.test_mq_free_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + EXPECT_EQ(env.test_mq_sub_called, env.N_session*env.N_per_session_pkt_cnt*(MAX_MSG_PER_DISPATCH-1)); + +} + + +static void test_dettach_msg_free(void *msg, void *msg_free_arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)msg_free_arg; + env->test_mq_free_called+=1; + return; +} + +static void *test_dettach_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin + + stellar_session_plugin_dettach_current_session(sess); + ctx->called+=1; + + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin + + return ctx; +} + +static void test_dettach_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; + EXPECT_EQ(sess->sess_pkt_cnt, 1);// first packet ingress, call ctx_free immediately + EXPECT_EQ(ctx->called, 1); + + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, plugin_env), 0);// publish success, but won't be delivered to currnet plugin + + FREE(ctx); +} + +static void test_dettach_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + + EXPECT_EQ(topic_id, env->intrinsc_tcp_topic_id); + + ctx->called+=1; +} + +TEST(plugin_manager, session_plugin_on_ctx_new_then_dettach) { + + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + unsigned char ip_proto=6; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + + int plugin_id=stellar_session_plugin_register(&st, test_dettach_session_ctx_new, test_dettach_session_ctx_free, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_dettach_on_session, plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", test_dettach_msg_free, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_dettach_on_session, plugin_id), 0); + + struct packet pkt={&st, TCP, ip_proto}; + + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_ctx_free_called,env.N_session); + EXPECT_EQ(env.test_mq_free_called,env.N_session*3); +} + + + +static void *test_invalid_pub_msg_session_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_basic_ctx *ctx=CALLOC(struct test_basic_ctx, 1); + return ctx; +} + +static void test_invalid_pub_msg_session_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_basic_ctx *ctx=(struct test_basic_ctx *)session_ctx; + EXPECT_EQ(ctx->called,(env->N_per_session_pkt_cnt+1)); + + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, ctx), -1);// illegal publish when session closing + + FREE(ctx); +} + +static void test_invalid_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_basic_ctx *ctx=(struct test_basic_ctx *)per_session_ctx; + ctx->called+=1; +} + +TEST(plugin_manager, session_plugin_pub_on_ctx_free) { + + struct stellar st={0}; + struct session_plugin_env env; + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id=stellar_session_plugin_register(&st, test_invalid_pub_msg_session_ctx_new, test_invalid_pub_msg_session_ctx_free, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_invalid_pub_msg_on_session, plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_MQ_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + +// pesudo packet and session + + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_ctx_free_called,env.N_session); +} + + +struct test_session_closing_ctx +{ + int pkt_called; + int session_free_called; + int userdefine_on_msg_called; +}; + + +static void *test_session_closing_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=CALLOC(struct test_session_closing_ctx, 1); + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_new_called+=1; + return ctx; +} + +static void test_session_closing_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + env->basic_ctx_free_called+=1; + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)session_ctx; + EXPECT_EQ(ctx->pkt_called,env->N_per_session_pkt_cnt); + EXPECT_EQ(ctx->session_free_called,1); + FREE(ctx); +} + +static void test_session_closing_on_intrisic_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + if(msg)ctx->pkt_called+=1; + if(sess->state==SESSION_STATE_CLOSING) + { + EXPECT_EQ(msg,nullptr); + ctx->session_free_called+=1; + session_mq_publish_message(sess, env->test_mq_topic_id, env); + env->test_mq_pub_called+=1; + } +} + +static void test_session_closing_on_userdefine_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct test_session_closing_ctx *ctx=(struct test_session_closing_ctx *)per_session_ctx; + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + ctx->userdefine_on_msg_called+=1; + EXPECT_EQ(msg, plugin_env); + EXPECT_EQ(sess->state,SESSION_STATE_CLOSING); + env->test_mq_sub_called+=1; +} + +TEST(plugin_manager, session_plugin_pub_msg_on_closing) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id=stellar_session_plugin_register(&st, test_session_closing_ctx_new, test_session_closing_ctx_free, &env); + EXPECT_GE(plugin_id,0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_closing_on_intrisic_msg, plugin_id), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_CLOSING_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_closing_on_userdefine_msg, plugin_id), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_ctx_new_called,env.N_session); + EXPECT_EQ(env.basic_ctx_free_called,env.N_session); + EXPECT_EQ(env.test_mq_pub_called,env.N_session); + EXPECT_EQ(env.test_mq_sub_called,env.N_session); +} + +struct test_session_called_ctx +{ + int called; +}; + +static void *test_session_called_ctx_new(struct session *sess, void *plugin_env) +{ + struct test_session_called_ctx *ctx=CALLOC(struct test_session_called_ctx, 1); + return ctx; +} + +static void test_session_called_ctx_free(struct session *sess, void *session_ctx, void *plugin_env) +{ + FREE(session_ctx); +} + +//test session topic is active +static void test_session_mq_topic_is_active_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_1_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id); + + session_mq_ignore_message(sess, topic_id, env->plugin_id_1); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + return; +} + +static void test_session_mq_topic_is_active_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_2_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id); + + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + + if(ctx->called > env->N_per_session_pkt_cnt/2) + { + session_mq_ignore_message(sess, topic_id, env->plugin_id_2); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0); + } + return; +} + +TEST(plugin_manager, test_session_mq_topic_is_active) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_1,0); + + int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_2,0); + + env.plugin_id_1=plugin_id_1; + env.plugin_id_2=plugin_id_2; + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_topic_is_active_plugin_2_on_msg, plugin_id_2), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore + EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore +} + +//test dettach session +static void test_session_dettach_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_1_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id); + stellar_session_plugin_dettach_current_session(sess); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + return; +} + +static void test_session_dettach_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_2_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(env->intrinsc_tcp_topic_id, topic_id); + + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + + if(ctx->called > env->N_per_session_pkt_cnt/2) + { + stellar_session_plugin_dettach_current_session(sess); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 0); + } + + return; +} + +TEST(plugin_manager, test_session_dettach) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_1,0); + + int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_2,0); + + env.plugin_id_1=plugin_id_1; + env.plugin_id_2=plugin_id_2; + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_dettach_plugin_2_on_msg, plugin_id_2), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.plugin_id_1_called,env.N_session*1);// per session called once, then ignore + EXPECT_EQ(env.plugin_id_2_called,env.N_session*(env.N_per_session_pkt_cnt/2+1));// per session called one half, then ignore + +} + + +//test dettach session +static void test_session_mq_priority_plugin_1_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_1_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_1, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + if(topic_id == env->intrinsc_tcp_topic_id) + { + EXPECT_EQ(ctx->called%3, 1);// intrinsc msg has high priority + EXPECT_EQ(session_mq_publish_message_with_priority(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_1, STELLAR_MQ_PRIORITY_LOW), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int )(long)msg, env->plugin_id_1); + } + } + return; +} + +static void test_session_mq_priority_plugin_2_on_msg(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + struct test_session_called_ctx *ctx=(struct test_session_called_ctx *)per_session_ctx; + env->plugin_id_2_called+=1; + ctx->called+=1; + EXPECT_EQ(env->plugin_id_2, sess->plug_mgr_rt->current_session_plugin_id); + EXPECT_EQ(session_mq_topic_is_active(sess, topic_id), 1); + + if(topic_id == env->intrinsc_tcp_topic_id) + { + EXPECT_EQ(ctx->called%3, 1); + // publish msg has normal priority + EXPECT_EQ(session_mq_publish_message(sess, env->test_mq_topic_id, (void *)(long)env->plugin_id_2), 0); + } + if(topic_id == env->test_mq_topic_id) + { + if(ctx->called%3 == 2) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_2); + } + if(ctx->called%3 == 0) + { + EXPECT_EQ((int)(long)msg, env->plugin_id_1); + } + } + return; +} + +TEST(plugin_manager, test_session_mq_priority) { + + struct stellar st={0}; + struct session_plugin_env env; + memset(&env, 0, sizeof(struct session_plugin_env)); + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + int plugin_id_1=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_1,0); + + int plugin_id_2=stellar_session_plugin_register(&st, test_session_called_ctx_new, test_session_called_ctx_free, &env); + EXPECT_GE(plugin_id_2,0); + + env.plugin_id_1=plugin_id_1; + env.plugin_id_2=plugin_id_2; + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + + env.test_mq_topic_id=stellar_mq_create_topic(&st, "SESSION_PRIORITY_TOPIC", NULL, &env); + EXPECT_GE(env.test_mq_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_1_on_msg, plugin_id_1), 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.test_mq_topic_id, test_session_mq_priority_plugin_2_on_msg, plugin_id_2), 0); + +// pesudo packet and session + + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_OPENING; + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + sess[i].state=SESSION_STATE_ACTIVE; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + sess[i].state=SESSION_STATE_CLOSING; + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + // each session publish TCP TOPIC per_session_pkt_cnt+1, and SESSION_PRIORITY_TOPIC 2*(msg per_session_pkt_cnt+1) + EXPECT_EQ(env.plugin_id_1_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3)); + EXPECT_EQ(env.plugin_id_2_called,env.N_session*((env.N_per_session_pkt_cnt+1)*3)); + +} + +void test_session_exdata_free_pub_msg_exdata_free(int idx, void *ex_ptr, void *arg) +{ + struct session_plugin_env *env = (struct session_plugin_env *)arg; + EXPECT_EQ(session_mq_publish_message((struct session *)ex_ptr, env->intrinsc_tcp_topic_id, arg), -1); + env->basic_exdata_free_called+=1; +} + +static void test_session_exdata_free_pub_msg_on_session(struct session *sess, int topic_id, const void *msg, void *per_session_ctx, void *plugin_env) +{ + struct session_plugin_env *env = (struct session_plugin_env *)plugin_env; + EXPECT_EQ(session_exdata_set(sess, env->basic_exdata_idx, sess), 0); + if(msg)env->plugin_id_1_called+=1; +} + +TEST(plugin_manager, session_exdata_free_pub_msg) { + + struct stellar st={0}; + struct session_plugin_env env; + +// pesudo init stage + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + + env.plugin_id_1=stellar_session_plugin_register(&st, NULL, NULL, &env); + EXPECT_GE(env.plugin_id_1,0); + + env.intrinsc_tcp_topic_id=stellar_mq_get_topic_id(&st, TOPIC_TCP); + EXPECT_GE(env.intrinsc_tcp_topic_id, 0); + EXPECT_EQ(stellar_session_mq_subscribe(&st, env.intrinsc_tcp_topic_id, test_session_exdata_free_pub_msg_on_session, env.plugin_id_1), 0); + + env.basic_exdata_idx=stellar_exdata_new_index(&st, "BASIC_EXDATA", test_session_exdata_free_pub_msg_exdata_free, &env) ; + EXPECT_GE(env.basic_exdata_idx, 0); + +// pesudo packet and session + + memset(&env, 0, sizeof(struct session_plugin_env)); + env.plug_mgr=plug_mgr; + env.N_per_session_pkt_cnt=10; + env.N_session=10; + + struct packet pkt={&st, TCP, 6}; + + struct session sess[env.N_session]; + memset(&sess, 0, sizeof(sess)); + +// pesudo running stage + for(int i=0; i < env.N_session; i++) + { + sess[i].plug_mgr_rt=plugin_manager_session_runtime_new(plug_mgr, &sess[i]); + sess[i].type=SESSION_TYPE_TCP; + } + + for (int j = 0; j < env.N_per_session_pkt_cnt; j++) + { + plugin_manager_on_packet_ingress(plug_mgr, &pkt); + + for (int i = 0; i < env.N_session; i++) + { + sess[i].sess_pkt_cnt+=1; + plugin_manager_on_session_ingress(&sess[i], &pkt); + plugin_manager_on_session_egress(&sess[i], &pkt); + } + + plugin_manager_on_packet_egress(plug_mgr, &pkt); + } + + for(int i=0; i < env.N_session; i++) + { + plugin_manager_on_session_closing(&sess[i]); + plugin_manager_session_runtime_free(sess[i].plug_mgr_rt); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.basic_exdata_free_called,env.N_session); + EXPECT_EQ(env.plugin_id_1_called,env.N_session*env.N_per_session_pkt_cnt); +} + + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN INIT * + **********************************************/ + +int test_plugin_on_polling_func(void *plugin_env) +{ + return 1; +} + +TEST(plugin_manager_init, polling_plugin_register) { + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + st.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + + int plugin_id = stellar_polling_plugin_register(&st, test_plugin_on_polling_func, &st); + { + SCOPED_TRACE("White-box test, check stellar internal schema"); + EXPECT_TRUE(plugin_id>=POLLING_PULGIN_ID_BASE); + struct registered_polling_plugin_schema *schema = (struct registered_polling_plugin_schema *)utarray_eltptr( + plug_mgr->registered_polling_plugin_array, (unsigned int)(plugin_id-POLLING_PULGIN_ID_BASE)); + EXPECT_EQ(schema->on_polling, (void *)test_plugin_on_polling_func); + EXPECT_EQ(schema->plugin_env, &st); + EXPECT_EQ(utarray_len(plug_mgr->registered_polling_plugin_array), 1); + } + + plugin_manager_exit(plug_mgr); +} + +/********************************************** + * TEST PLUGIN MANAGER ON POLLING PLUGIN RUNTIME * + **********************************************/ + +struct polling_plugin_env +{ + struct plugin_manager_schema *plug_mgr; + int N_polling; + int return0_polling_called; + int return1_polling_called; +}; + +int return1_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return1_polling_called+=1; + return 1; +} + +int return0_plugin_on_polling(void *plugin_env) +{ + struct polling_plugin_env *env = (struct polling_plugin_env *)plugin_env; + env->return0_polling_called+=1; + return 0; +} + +TEST(plugin_manager, basic_polling_plugins) { + +// pesudo init stage + struct stellar st={0}; + struct plugin_manager_schema *plug_mgr = plugin_manager_init(&st, NULL); + st.plug_mgr=plug_mgr; + struct polling_plugin_env env; + memset(&env, 0, sizeof(struct polling_plugin_env)); + env.plug_mgr=plug_mgr; + whitebox_test_plugin_manager_intrisic_metadata(&st, plug_mgr); + +// plugin manager register plugin + int plugin_id = stellar_polling_plugin_register(&st, return0_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + plugin_id = stellar_polling_plugin_register(&st, return1_plugin_on_polling, &env); + EXPECT_TRUE(plugin_id>=0); + +// pesudo runtime stage + + env.plug_mgr=plug_mgr; + env.N_polling=10; + + for(int i=0; i<env.N_polling; i++) + { + EXPECT_EQ(plugin_manager_on_polling(plug_mgr), 1); + } + +// pesudo exit stage + plugin_manager_exit(plug_mgr); + + EXPECT_EQ(env.return0_polling_called,env.N_polling); + EXPECT_EQ(env.return1_polling_called,env.N_polling); + +} + + +/********************************************** + * GTEST MAIN * + **********************************************/ + +int main(int argc, char ** argv) +{ + int ret=0; + ::testing::InitGoogleTest(&argc, argv); + ret=RUN_ALL_TESTS(); + return ret; +}
\ No newline at end of file diff --git a/src/plugin/test/plugin_manager_gtest_mock.h b/src/plugin/test/plugin_manager_gtest_mock.h new file mode 100644 index 0000000..b25fb63 --- /dev/null +++ b/src/plugin/test/plugin_manager_gtest_mock.h @@ -0,0 +1,116 @@ +#pragma once + +#ifdef __cplusplus +extern "C" +{ +#endif + +#include "../plugin_manager_interna.h" + +#include "stellar/session.h" +#include "tuple/tuple.h" + +//mock stellar +struct stellar +{ + struct plugin_manager_schema *plug_mgr; +}; + +enum packet_type +{ + UNKNOWN, + IPv4, + IPv6, + UDP, + TCP, + TCP_STREAM, + CONTROL, +}; + +struct packet +{ + struct stellar *st; + enum packet_type type; + unsigned char ip_proto; +}; + + +struct session +{ + struct plugin_manager_runtime *plug_mgr_rt; + enum session_type type; + enum session_state state; + int sess_pkt_cnt; +}; + +struct plugin_manager_schema * stellar_get_plugin_manager(struct stellar *st) +{ + return st->plug_mgr; +} + +int stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema *pm) +{ + st->plug_mgr=pm; + return 0; +} + +int stellar_get_worker_thread_num(struct stellar *st) +{ + return 16; +} + +uint16_t stellar_get_current_thread_index() +{ + return 0; +} + +unsigned char packet_get_ip_protocol(struct packet *pkt) +{ + return pkt->ip_proto; +} + +enum session_type session_get_type(const struct session *sess) +{ + return sess->type; + +} + +void session_set_user_data(struct session *sess, void *user_data) +{ + sess->plug_mgr_rt = (struct plugin_manager_runtime *)user_data; +} + +void *session_get_user_data(const struct session *sess) +{ + return sess->plug_mgr_rt; +} + +void *packet_get_user_data(const struct packet *pkt) +{ + return pkt->st->plug_mgr; +} + +int packet_get_innermost_tuple6(const struct packet *pkt, struct tuple6 *tuple) +{ + tuple->ip_proto = pkt->ip_proto; + return 0; +} + +uint8_t packet_is_ctrl(const struct packet *pkt) +{ + return 0; +} + +struct tcp_segment *session_get_tcp_segment(struct session *sess) +{ + return NULL; +} + +void session_free_tcp_segment(struct session *sess, struct tcp_segment *seg) +{ + return; +} + +#ifdef __cplusplus +} +#endif
\ No newline at end of file diff --git a/src/stellar/CMakeLists.txt b/src/stellar/CMakeLists.txt index 1185726..bfd8f65 100644 --- a/src/stellar/CMakeLists.txt +++ b/src/stellar/CMakeLists.txt @@ -1,16 +1,17 @@ set(SOURCE stellar_config.cpp stellar_stat.cpp stellar_core.cpp) -set(LIBRARY times plugin_manager session_manager ip_reassembly packet_io pthread fieldstat4 toml) +set(LIBRARY times session_manager plugin_manager ip_reassembly packet_io packet pthread fieldstat4 toml) add_library(stellar_core STATIC ${SOURCE}) -target_link_libraries(stellar_core ${LIBRARY}) +target_link_libraries(stellar_core PRIVATE ${LIBRARY}) add_library(stellar_devel SHARED ${SOURCE}) -target_link_libraries(stellar_devel ${LIBRARY}) +#target_link_libraries(stellar_devel ${LIBRARY}) set_target_properties(stellar_devel PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map") +target_link_libraries(stellar_devel PRIVATE -Wl,--whole-archive ${LIBRARY} -Wl,--no-whole-archive) add_executable(stellar main.cpp) -target_link_libraries(stellar stellar_core) -target_link_libraries(stellar "-rdynamic") +target_link_libraries(stellar PRIVATE -Wl,--whole-archive stellar_core ${LIBRARY} -Wl,--no-whole-archive) +target_link_libraries(stellar PRIVATE "-rdynamic") set_target_properties(stellar PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map") install(TARGETS stellar RUNTIME DESTINATION bin COMPONENT PROGRAM) diff --git a/src/stellar/stellar_core.cpp b/src/stellar/stellar_core.cpp index 7af49d9..f1ed79a 100644 --- a/src/stellar/stellar_core.cpp +++ b/src/stellar/stellar_core.cpp @@ -101,6 +101,7 @@ static inline void free_evicted_sessions(struct session_manager *sess_mgr, uint6 if (sess) { plugin_ctx = session_get_user_data(sess); + plugin_manager_on_session_closing(sess); plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx); session_manager_free_session(sess_mgr, sess); } @@ -121,6 +122,7 @@ static inline void free_expired_sessions(struct session_manager *sess_mgr, uint6 if (sess) { plugin_ctx = session_get_user_data(sess); + plugin_manager_on_session_closing(sess); plugin_manager_session_runtime_free((struct plugin_manager_runtime *)plugin_ctx); session_manager_free_session(sess_mgr, sess); } @@ -164,6 +166,11 @@ static void *work_thread(void *arg) memset(packets, 0, sizeof(packets)); + for(int i=0; i<RX_BURST_MAX; i++) + { + packet_set_user_data(&packets[i], (void *)plug_mgr); + } + snprintf(thd_name, sizeof(thd_name), "stellar:%d", thr_idx); prctl(PR_SET_NAME, (unsigned long long)thd_name, NULL, NULL, NULL); @@ -200,7 +207,7 @@ static void *work_thread(void *arg) defraged_pkt = NULL; pkt = &packets[i]; - plugin_manager_on_packet(plug_mgr, pkt); + plugin_manager_on_packet_ingress(plug_mgr, pkt); if (packet_is_fragment(pkt)) { defraged_pkt = ip_reassembly_packet(ip_reass, pkt, now_ms); @@ -211,10 +218,12 @@ static void *work_thread(void *arg) else { pkt = defraged_pkt; - plugin_manager_on_packet(plug_mgr, pkt); + plugin_manager_on_packet_ingress(plug_mgr, pkt); + plugin_manager_on_packet_egress(plug_mgr, pkt); } } + pkt = &packets[i]; sess = session_manager_lookup_session_by_packet(sess_mgr, pkt); if (sess == NULL) { @@ -241,6 +250,8 @@ static void *work_thread(void *arg) fast_path: plugin_manager_on_session_egress(sess, pkt); + plugin_manager_on_packet_egress(plug_mgr, pkt); + if (sess && session_get_current_state(sess) == SESSION_STATE_DISCARD) { packet_set_action(pkt, PACKET_ACTION_DROP); @@ -567,4 +578,9 @@ void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt) { packet_io_inject(packet_io, thr_idx, pkt, 1); } +} + +int stellar_get_worker_thread_num(struct stellar *st) +{ + return st->config.pkt_io_opts.nr_threads; }
\ No newline at end of file diff --git a/src/stellar/stellar_core.h b/src/stellar/stellar_core.h index 2291203..788ef1d 100644 --- a/src/stellar/stellar_core.h +++ b/src/stellar/stellar_core.h @@ -15,7 +15,6 @@ void stellar_set_plugin_manger(struct stellar *st, struct plugin_manager_schema // only send user crafted packet, can't send packet which come from network void stellar_send_crafted_packet(struct stellar *st, struct packet *pkt); -int stellar_run(int argc, char **argv); #ifdef __cplusplus } diff --git a/src/stellar/version.map b/src/stellar/version.map index 6581205..a46e8f1 100644 --- a/src/stellar/version.map +++ b/src/stellar/version.map @@ -26,10 +26,10 @@ global: session_exdata_set; session_exdata_get; - stellar_session_mq_create_topic; - stellar_session_mq_get_topic_id; - stellar_session_mq_update_topic; - stellar_session_mq_destroy_topic; + stellar_mq_create_topic; + stellar_mq_get_topic_id; + stellar_mq_update_topic; + stellar_mq_destroy_topic; stellar_session_mq_subscribe; session_mq_publish_message; session_mq_ignore_message; diff --git a/test/packet_inject/CMakeLists.txt b/test/packet_inject/CMakeLists.txt index 61fc165..3bfb3c2 100644 --- a/test/packet_inject/CMakeLists.txt +++ b/test/packet_inject/CMakeLists.txt @@ -1,14 +1,15 @@ # build libpacket_inject.so add_library(packet_inject SHARED packet_inject.cpp) -target_link_libraries(packet_inject stellar_devel toml) +target_link_libraries(packet_inject toml) target_include_directories(packet_inject PUBLIC ${CMAKE_SOURCE_DIR}/include/) set_target_properties(packet_inject PROPERTIES LINK_FLAGS "-Wl,--version-script=${CMAKE_CURRENT_LIST_DIR}/version.map") # build gtest function(packet_inject_add_case EXEC_NAME) add_executable(${EXEC_NAME} ${EXEC_NAME}.cpp) - target_link_libraries(${EXEC_NAME} "-rdynamic") - target_link_libraries(${EXEC_NAME} stellar_devel gtest) + target_include_directories(${EXEC_NAME} PUBLIC ${CMAKE_SOURCE_DIR}/include/) + target_link_libraries(${EXEC_NAME} PRIVATE "-rdynamic") + target_link_libraries(${EXEC_NAME} PRIVATE stellar_devel gtest) gtest_discover_tests(${EXEC_NAME}) endfunction() diff --git a/test/packet_inject/packet_inject.cpp b/test/packet_inject/packet_inject.cpp index 193cbd5..0ffdc61 100644 --- a/test/packet_inject/packet_inject.cpp +++ b/test/packet_inject/packet_inject.cpp @@ -5,8 +5,10 @@ #include <arpa/inet.h> #include "toml.h" +#include "stellar/stellar.h" #include "stellar/layer.h" -#include "stellar/session_mq.h" +#include "stellar/session.h" +#include "stellar/stellar_mq.h" #define LOG_ERR(fmt, ...) printf("ERROR [packet inject] " fmt, ##__VA_ARGS__) #define LOG_INFO(fmt, ...) printf("INFO [packet inject] " fmt, ##__VA_ARGS__) @@ -545,8 +547,8 @@ extern "C" ctx->st = st; ctx->sess_plug_id = stellar_session_plugin_register(st, on_sess_new, on_sess_free, ctx); - ctx->tcp_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_TCP); - ctx->udp_topic_id = stellar_session_mq_get_topic_id(st, TOPIC_UDP); + ctx->tcp_topic_id = stellar_mq_get_topic_id(st, TOPIC_TCP); + ctx->udp_topic_id = stellar_mq_get_topic_id(st, TOPIC_UDP); stellar_session_mq_subscribe(st, ctx->tcp_topic_id, on_sess_msg, ctx->sess_plug_id); stellar_session_mq_subscribe(st, ctx->udp_topic_id, on_sess_msg, ctx->sess_plug_id); |
