diff options
| author | yangwei <[email protected]> | 2023-08-17 21:54:41 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2023-08-23 12:25:28 +0800 |
| commit | f85c57d2002f11fac34cfbaff3af0058c0e7757f (patch) | |
| tree | 297373e93fc7c9c3c0ada31116659ecc3a8d500d | |
| parent | 8fffa5090139f72ae53fcba6b346fda389eef2ee (diff) | |
✨ feat(redefine sdk/include):
27 files changed, 938 insertions, 790 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 3fb510d..cf7d68d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -68,13 +68,13 @@ add_custom_target("install-profile" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Pr include_directories(${CMAKE_SOURCE_DIR}) include_directories(${CMAKE_SOURCE_DIR}/deps) -include_directories(${CMAKE_SOURCE_DIR}/sdk/include) +include_directories(${CMAKE_SOURCE_DIR}/include) #add_subdirectory(vendor) -add_subdirectory(deps/toml) +#add_subdirectory(deps/toml) #add_subdirectory(deps/utable) add_subdirectory(src/adapter) -add_subdirectory(src/adapter/test) +add_subdirectory(examples/sapp_plugin) #enable_testing() #add_subdirectory(test) diff --git a/sdk/example/.gitkeep b/examples/.gitkeep index e69de29..e69de29 100644 --- a/sdk/example/.gitkeep +++ b/examples/.gitkeep diff --git a/examples/sapp_plugin/CMakeLists.txt b/examples/sapp_plugin/CMakeLists.txt new file mode 100644 index 0000000..fc524a0 --- /dev/null +++ b/examples/sapp_plugin/CMakeLists.txt @@ -0,0 +1,6 @@ +add_definitions(-fPIC) + +add_library(simple_loader SHARED simple_sapp_entry.c simple_stellar_plugin.c) +target_link_libraries(simple_loader adapter) +set_target_properties(simple_loader PROPERTIES PREFIX "") +include_directories(${CMAKE_SOURCE_DIR}/src)
\ No newline at end of file diff --git a/examples/sapp_plugin/simple_loader.inf b/examples/sapp_plugin/simple_loader.inf new file mode 100644 index 0000000..142b3ec --- /dev/null +++ b/examples/sapp_plugin/simple_loader.inf @@ -0,0 +1,13 @@ +[PLUGINFO] +PLUGNAME=simple_loader +SO_PATH=./plug/business/simple_loader/simple_loader.so +INIT_FUNC=LOADER_INIT +DESTROY_FUNC=LOADER_EXIT + +[TCP_ALL] +FUNC_FLAG=ALL +FUNC_NAME=loader_tcpall_stream_entry + +[UDP] +FUNC_FLAG=ALL +FUNC_NAME=loader_udp_stream_entry
\ No newline at end of file diff --git a/examples/sapp_plugin/simple_sapp_entry.c b/examples/sapp_plugin/simple_sapp_entry.c new file mode 100644 index 0000000..aee7576 --- /dev/null +++ b/examples/sapp_plugin/simple_sapp_entry.c @@ -0,0 +1,154 @@ +#include "stellar/session.h" +#include "stellar/session_mq.h" +#include "stellar/utils.h" + +#include "simple_stellar_plugin.h" + +#include "adapter/adapter.h" +#include "adapter/session_manager.h" + +#include <MESA/stream.h> +#include <stdio.h> + +struct plugin_specific g_plugin_schema[] = +{ + { + .init_cb = simple_stellar_event_plugin_init, + .exit_cb = simple_stellar_event_plugin_exit, + }, + { + .init_cb = simple_stellar_mq_plugin_init, + .exit_cb = simple_stellar_mq_plugin_exit, + }, +}; + +struct simple_stream_ctx +{ + uint32_t c2s_pkts; + uint32_t c2s_bytes; + uint32_t s2c_pkts; + uint32_t s2c_bytes; + struct session *sess; +}; + +static void session_mq_topic_free(void *data, void *cb_arg) +{ + FREE(data); + return; +} + +static int session_mq_loader_read(struct session *sess, int topic_id, const void *data, void *cb_arg) +{ + struct simple_stream_ctx *ctx =(struct simple_stream_ctx *)data; + printf("loader_read_message(topic:%d)-----------%20s", topic_id, session_get0_readable_addr(sess)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", + ctx->c2s_pkts, ctx->c2s_bytes, + ctx->s2c_pkts, ctx->s2c_bytes); + printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); + printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); + return 0; +} + +void *g_stellar=NULL; +int g_topic_id=-1; +int LOADER_INIT() +{ + g_stellar = stellar_init(g_plugin_schema, 2); + if(g_stellar==NULL)return -1; + int t_topic_id=session_mq_get_topic_id(g_stellar, "SIMPLE_MQ_TOPIC"); + if(t_topic_id >= 0) + { + session_mq_update_topic(g_stellar, t_topic_id, session_mq_topic_free, NULL); + g_topic_id=t_topic_id; + } + else + { + g_topic_id=session_mq_create_topic(g_stellar, "SIMPLE_MQ_TOPIC", session_mq_topic_free, NULL); + } + session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL); + return 0; +} + +void LOADER_EXIT(void) +{ + + session_mq_destroy_topic(g_stellar, g_topic_id); + stellar_exit(g_stellar); + return; +} + + +static void print_stream_ctx(struct streaminfo *pstream, struct simple_stream_ctx *ctx) +{ + printf("stream-----------%20s: ", printaddr(&(pstream->addr), pstream->threadnum)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", + ctx->c2s_pkts, ctx->c2s_bytes, + ctx->s2c_pkts, ctx->s2c_bytes); + printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); + printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); + return; +} + +static struct simple_stream_ctx *stream_ctx_dup(const struct simple_stream_ctx *origin) +{ + struct simple_stream_ctx *ctx=CALLOC(struct simple_stream_ctx,1); + memcpy(ctx, origin, sizeof(struct simple_stream_ctx)); + return ctx; +} + +static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) +{ + struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme; + struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail; + if(*pme==NULL) + { + *pme=CALLOC(struct simple_stream_ctx,1); + ctx=(struct simple_stream_ctx *)*pme; + } + + if(a_packet!=NULL) + { + if(DIR_C2S == pstream->curdir){ + ctx->c2s_bytes += pdetail->datalen; + ctx->c2s_pkts++; + }else{ + ctx->s2c_bytes += pdetail->datalen; + ctx->s2c_pkts++; + } + } + struct simple_stream_ctx *msg; + switch (state) + { + case OP_STATE_PENDING: + ctx->sess=adapter_session_open(g_stellar, pstream, a_packet); + break; + case OP_STATE_DATA: + msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme); + if(session_mq_publish_message(ctx->sess, g_topic_id, msg) < 0) + { + FREE(msg); + } + adapter_session_active(pstream, ctx->sess, a_packet); + break; + case OP_STATE_CLOSE: + adapter_session_close(pstream, ctx->sess, a_packet); + print_stream_ctx(pstream, ctx); + FREE(*pme); + break; + default: + break; + } + return; +} + +char loader_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) +{ + loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); + return APP_STATE_GIVEME; +} + +char loader_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) +{ + loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); + return APP_STATE_GIVEME; +} diff --git a/examples/sapp_plugin/simple_stellar_plugin.c b/examples/sapp_plugin/simple_stellar_plugin.c new file mode 100644 index 0000000..7084ceb --- /dev/null +++ b/examples/sapp_plugin/simple_stellar_plugin.c @@ -0,0 +1,151 @@ +#include "simple_stellar_plugin.h" + +#include "stellar/stellar.h" +#include "stellar/utils.h" +#include "stellar/session_exdata.h" +#include "stellar/session_mq.h" + +#include <stdio.h> + +struct simple_stellar_plugin_ctx +{ + int plugin_id; + int exdata_idx; + struct stellar *st; +}; + +struct mq_message_stat +{ + uint32_t c2s_pkts; + uint32_t c2s_bytes; + uint32_t s2c_pkts; + uint32_t s2c_bytes; +}; + +extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg); + +static int session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *cb_arg) +{ + struct mq_message_stat *ctx =(struct mq_message_stat *)data; + struct simple_stellar_plugin_ctx *plugin_ctx = (struct simple_stellar_plugin_ctx *)cb_arg; + printf("%s(topic:%d->plug:%d)-----------%20s: ", __FUNCTION__, topic_id, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", + ctx->c2s_pkts, ctx->c2s_bytes, + ctx->s2c_pkts, ctx->s2c_bytes); + printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); + printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); + + struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); + session_event_assign(i_ev, plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_mq_plugin_entry, plugin_ctx); + printf("%s(plug:%d)session_event_assign-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); + + return 0; +} + +static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, int plugin_id) +{ + printf("%s(plug:%d)-----------%20s: ", __FUNCTION__, plugin_id, session_get0_readable_addr(sess)); + printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", + ctx->c2s_pkts, ctx->c2s_bytes, + ctx->s2c_pkts, ctx->s2c_bytes); + printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); + printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); + return; +} + +int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +{ + if(cb_arg == NULL)return -1; + struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; + struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx); + if (mg_stat == NULL) + { + mg_stat = CALLOC(struct mq_message_stat, 1); + session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat); + } + + if (pkt) + { + size_t payload_len = 0; + session_get0_current_payload(sess, &payload_len); + int dir = session_get_direction(sess); + if (dir==SESSION_DIRECTION_IN) + { + mg_stat->c2s_bytes += payload_len; + mg_stat->c2s_pkts += 1; + } + if (dir==SESSION_DIRECTION_OUT) + { + mg_stat->s2c_bytes += payload_len; + mg_stat->s2c_pkts += 1; + } + } + if (mg_stat != NULL && (events & SESS_EV_CLOSING)) + { + print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id); + } + return 0; +} + +int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg) +{ + if(cb_arg == NULL)return -1; + struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg; + struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id); + session_event_assign(i_ev, plugin_ctx->st, sess, 0, simple_mq_plugin_entry, plugin_ctx); + printf("%s(plug:%d)session_event_assign-----------%20s: \n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess)); + return 0; +} + +static void simple_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg) +{ + if(ex_ptr) + { + FREE(ex_ptr); + } + return; +} + +void *simple_stellar_event_plugin_init(struct stellar *st) +{ + struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); + ctx->st = st; + ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_EVENT_PLUGIN", simple_exdata_free, NULL); + int plugin_id=stellar_plugin_register(st, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_entry, ctx); + if(plugin_id >= 0) + { + ctx->plugin_id=plugin_id; + } + return ctx; +} + +void simple_stellar_event_plugin_exit(void *ctx) +{ + if(ctx)FREE(ctx); + return; +} + +void *simple_stellar_mq_plugin_init(struct stellar *st) +{ + struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1); + ctx->st = st; + ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_MQ_PLUGIN", simple_exdata_free, NULL); + int topic_id=session_mq_get_topic_id(st, "SIMPLE_MQ_TOPIC"); + if(topic_id < 0) + { + topic_id=session_mq_create_topic(st, "SIMPLE_MQ_TOPIC", NULL, NULL); + } + session_mq_subscribe_topic(st, topic_id, session_mq_plugin_sub_fn, ctx); + int plugin_id=stellar_plugin_register(st, 0, simple_mq_plugin_entry, ctx); + if(plugin_id >= 0) + { + ctx->plugin_id=plugin_id; + } + return ctx; +} + +void simple_stellar_mq_plugin_exit(void *ctx) +{ + if(ctx)FREE(ctx); + return; +}
\ No newline at end of file diff --git a/examples/sapp_plugin/simple_stellar_plugin.h b/examples/sapp_plugin/simple_stellar_plugin.h new file mode 100644 index 0000000..1e7e7ba --- /dev/null +++ b/examples/sapp_plugin/simple_stellar_plugin.h @@ -0,0 +1,11 @@ +#pragma once + +#include "stellar/stellar.h" +#include "stellar/session.h" + +void *simple_stellar_event_plugin_init(struct stellar *st); +void simple_stellar_event_plugin_exit(void *plugin_ctx); + +void *simple_stellar_mq_plugin_init(struct stellar *st); +void simple_stellar_mq_plugin_exit(void *plugin_ctx); + diff --git a/include/stellar/session.h b/include/stellar/session.h new file mode 100644 index 0000000..75772b3 --- /dev/null +++ b/include/stellar/session.h @@ -0,0 +1,66 @@ +#pragma once + +#include <stdint.h> +#include <stddef.h> + +#include "stellar.h" + +enum session_type +{ + SESSION_TYPE_TCP, + SESSION_TYPE_UDP, + __SESSION_TYPE_MAX, +}; + +enum session_state +{ + SESSION_STATE_INVALID = 0, + SESSION_STATE_OPENING = 1 , + SESSION_STATE_ACTIVE = 2, + SESSION_STATE_CLOSING = 3, + __SESSION_STATE_MAX, +}; + + +struct session; + +#define SESSION_SEEN_C2S_FLOW (1 >> 1) +#define SESSION_SEEN_S2C_FLOW (1 >> 2) +int session_is_symmetric(const struct session *sess, unsigned char *flag); + +#define SESSION_DIRECTION_IN 0 +#define SESSION_DIRECTION_OUT 1 +int session_get_direction(const struct session *sess); + +const char *session_get0_readable_addr(struct session *sess); +const char *session_get0_current_payload(struct session *sess, size_t *payload_len); + +struct packet; +const struct packet *session_get0_current_packet(struct session *sess); + +#define PACKET_DIRECTION_C2S 0 +#define PACKET_DIRECTION_S2C 1 +int packet_get_direction(const struct packet *pkt); +const char *packet_get0_data(const struct packet *pkt, size_t *data_len); + +#define SESS_EV_OPENING 1<<1 +#define SESS_EV_PACKET 1<<2 +#define SESS_EV_CLOSING 1<<3 + +#define SESS_EV_TCP 1<<4 +#define SESS_EV_UDP 1<<5 + +typedef int session_event_cb_func(struct session *sess, int events, const struct packet *pkt, void *cb_arg); + +struct session_event; +//return plugin_id +int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg);// register intrinsic event +struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id); + +#include <sys/time.h> + +struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg); +int session_event_add(struct session_event *ev, const struct timeval *timeout); +int session_event_del(struct session_event *ev); +int session_event_assign(struct session_event *ev, struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg); +void session_event_free(struct session_event *ev);
\ No newline at end of file diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h new file mode 100644 index 0000000..b1febc2 --- /dev/null +++ b/include/stellar/session_exdata.h @@ -0,0 +1,10 @@ +#pragma once + +#include "session.h" +#include "stellar.h" + +typedef void session_ex_free(struct session *sess, int idx, void *ex_ptr, void *arg); +int stellar_session_get_ex_new_index(struct stellar *st, const char *name, session_ex_free *free_func,void *arg); +int session_set_ex_data(struct session *sess, int idx, void *ex_ptr); +void *session_get_ex_data(struct session *sess, int idx); + diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h new file mode 100644 index 0000000..72c5af4 --- /dev/null +++ b/include/stellar/session_mq.h @@ -0,0 +1,24 @@ +#pragma once + +#include "session.h" +#include "stellar.h" + +#include <sys/time.h> + +//session mq +typedef void msg_free_cb_func(void *msg, void *cb_arg); +typedef int on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *cb_arg); + +//return topic_id +int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg); +int session_mq_get_topic_id(struct stellar *st, const char *topic_name); + +int session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *free_cb, void *cb_arg); + +int session_mq_destroy_topic(struct stellar *st, int topic_id); + +//return sub_id >=0 if success, otherwise return -1. +int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg); + +int session_mq_publish_message(struct session *sess, int topic_id, void *msg); +int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id);
\ No newline at end of file diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h new file mode 100644 index 0000000..f7cd441 --- /dev/null +++ b/include/stellar/stellar.h @@ -0,0 +1,24 @@ +#pragma once + +#include <sys/time.h> + +struct stellar; + +int stellar_get_worker_thread_num(struct stellar *st); +int stellar_get_current_thread_id(struct stellar *st); + +typedef void *plugin_init_callback(struct stellar *st); +typedef void plugin_exit_callback(void *plugin_ctx); + +typedef int stellar_periodic_cb_func(struct stellar *st, void *cb_arg); + +int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval); + +struct plugin_specific +{ + plugin_init_callback *init_cb; + plugin_exit_callback *exit_cb; +}; + +struct stellar *stellar_init(struct plugin_specific specs[] , int specs_num); +void stellar_exit(struct stellar *);
\ No newline at end of file diff --git a/sdk/include/utils.h b/include/stellar/utils.h index 7beafbb..7beafbb 100644 --- a/sdk/include/utils.h +++ b/include/stellar/utils.h diff --git a/sdk/include/plugin_spec.h b/sdk/include/plugin_spec.h deleted file mode 100644 index e516c20..0000000 --- a/sdk/include/plugin_spec.h +++ /dev/null @@ -1,8 +0,0 @@ -#pragma once - -#include "session.h" - -typedef int plugin_init_callback(plugin_id_t plugin_id); -typedef void plugin_exit_callback(void); -typedef void plugin_session_callback(struct session *session, enum session_state state, int thread_id); - diff --git a/sdk/include/session.h b/sdk/include/session.h deleted file mode 100644 index c251f03..0000000 --- a/sdk/include/session.h +++ /dev/null @@ -1,66 +0,0 @@ -#pragma once - -#include <stdint.h> -#include <stddef.h> - -enum session_type -{ - SESSION_TYPE_CUSTOM, - SESSION_TYPE_TCP, - SESSION_TYPE_UDP, - SESSION_TYPE_MAX, -}; - -enum session_state -{ - SESSION_STATE_INVALID = 0, - SESSION_STATE_OPENING = 1 , - SESSION_STATE_ACTIVE = 2, - SESSION_STATE_CLOSING = 3, - __SESSION_STATE_MAX, -}; - - -struct session; - -#define SESSION_SEEN_C2S_FLOW (1 >> 1) -#define SESSION_SEEN_S2C_FLOW (1 >> 2) -int session_is_symmetric(const struct session *sess, unsigned char *flag); - -#define SESSION_DIRECTION_IN 0 -#define SESSION_DIRECTION_OUT 1 -int session_get_direction(const struct session *sess); - -const char *session_get_readable_addr(struct session *sess); -const char *session_get_current_payload(struct session *sess, size_t *payload_len); - -struct packet; -const struct packet *session_get_current_packet(struct session *sess); - -#define PACKET_DIRECTION_C2S 0 -#define PACKET_DIRECTION_S2C 1 -int packet_get_direction(struct packet *pkt); -const char *packet_get_data(const struct packet *pkt, size_t *data_len); - -typedef int plugin_id_t; -//session mq -typedef void msg_free_cb_func(void *msg, void *cb_arg); -typedef int on_msg_cb_func(struct session *sess, const char *topic_name, const void *msg, void *cb_arg); - -int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg); -int session_mq_destroy_topic(const char *topic_name); -int session_publish_message(struct session *sess, const char *topic_name, void *msg); -int session_ignore_message(struct session *sess, const char *topic_name, int sub_id); - -//return sub_id >=0 if success, otherwise return -1. -int session_mq_subscribe_topic(const char *topic_name, on_msg_cb_func *sub_cb, void *cb_arg); - - -typedef void session_ex_free(struct session *sess, int idx, void *ex_ptr, void *arg); -int session_get_ex_new_index(const char *name, session_ex_free *free_func,void *arg); // Only support in INIT stage -int session_set_ex_data(struct session *sess, int idx, void *ex_ptr); -void *session_get_ex_data(struct session *sess, int idx); - -void session_dettach_plugin(struct session *sess, plugin_id_t self); -void session_attach_plugin(struct session *sess, plugin_id_t self); - diff --git a/src/adapter/CMakeLists.txt b/src/adapter/CMakeLists.txt index 89923ed..d31f587 100644 --- a/src/adapter/CMakeLists.txt +++ b/src/adapter/CMakeLists.txt @@ -1,3 +1,3 @@ set(CMAKE_C_FLAGS "-std=c99") add_definitions(-fPIC) -add_library(adapter STATIC adapter.c session_manager.c)
\ No newline at end of file +add_library(adapter STATIC stellar.c adapter.c session_manager.c)
\ No newline at end of file diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index 0114c97..be284c0 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -1,5 +1,9 @@ -#include "session.h" -#include "utils.h" +#include "stellar/session.h" +#include "stellar/stellar.h" +#include "stellar/utils.h" +#include "stellar/session_exdata.h" + +#include "stellar_internal.h" #include "adapter.h" #include "session_manager.h" @@ -7,112 +11,17 @@ #include <MESA/stream.h> -UT_array *global_plugin_specs=0; -int global_plugin_runtime_exdata_idx=-1; - -UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL}; -static void exdata_plugin_rt_free(struct session *sess, int idx, void *ex_ptr, void *arg) +inline void adapter_session_poll(struct session *sess) { - if(ex_ptr) - { - FREE(ex_ptr); - } - return; + session_defer_loop(sess); } -int adapter_init(struct plugin_specific specs[] , int spec_num) -{ - utarray_new(global_plugin_specs,&plugin_specs_icd); - utarray_reserve(global_plugin_specs, spec_num); - int init_cb_ret = 0; - global_plugin_runtime_exdata_idx=session_get_ex_new_index("__SYSTEM__", exdata_plugin_rt_free, NULL); - for(int i = 0; i < spec_num; i++) - { - if (specs[i].init_cb != NULL) - { - init_cb_ret=specs[i].init_cb(i); - if(init_cb_ret < 0) - { - goto ADAPTER_INIT_ERROR; - } - utarray_push_back(global_plugin_specs, &specs[i]); - } - } - return 0; -ADAPTER_INIT_ERROR: - adapter_exit(); - return -1; -} -void adapter_exit() -{ - struct plugin_specific *p=NULL; - while ((p = (struct plugin_specific *)utarray_next(global_plugin_specs, p))) - { - if(p->exit_cb != NULL) - { - p->exit_cb(); - } - } - session_manager_cleanup(); - utarray_free(global_plugin_specs); - return; -}; - - -struct plugin_runtime_exdata -{ - plugin_session_callback *session_entry_cb; - int session_interest_state; -}; - -static struct plugin_runtime_exdata* adapter_new_plugin_runtime(enum session_type type, UT_array *plugin_specs) -{ - struct plugin_runtime_exdata *plugin_rt = NULL; - unsigned int len = utarray_len(plugin_specs); - if (len > 0) - { - plugin_rt=CALLOC(struct plugin_runtime_exdata, len); - for (unsigned int i = 0; i < len; i++) - { - struct plugin_specific *spec = (struct plugin_specific *)utarray_eltptr(plugin_specs, i); - struct plugin_runtime_exdata *exdata = plugin_rt+i; - if (type == SESSION_TYPE_TCP) - { - exdata->session_entry_cb = spec->tcp_session_entry_cb; - exdata->session_interest_state = spec->tcp_session_interest_state; - } - if (type == SESSION_TYPE_UDP) - { - exdata->session_entry_cb = spec->tcp_session_entry_cb; - exdata->session_interest_state = spec->tcp_session_interest_state; - } - } - } - return plugin_rt; -} - -static void adapter_dispatch_session(int plugin_rt_exdata_idx, struct session *sess, enum session_state state, int thread_id) -{ - struct plugin_runtime_exdata *plugin_rt = (struct plugin_runtime_exdata *)session_get_ex_data(sess, plugin_rt_exdata_idx); - struct plugin_runtime_exdata *p = NULL; - unsigned int len = utarray_len(global_plugin_specs); - for(unsigned i = 0; i < len; i++) - { - p = (plugin_rt+i); - if(p->session_entry_cb != NULL && (p->session_interest_state & state)) - { - p->session_entry_cb(sess, state, thread_id); - } - } - session_mq_loop(sess); - return; -} struct streaminfo; //streaminfo open -void adapter_session_open(struct streaminfo *stream, struct session **sess, int thread_id) +struct session *adapter_session_open(struct stellar *st, struct streaminfo *stream, void *a_packet) { enum session_type type; if(stream->type==STREAM_TYPE_TCP) @@ -125,84 +34,64 @@ void adapter_session_open(struct streaminfo *stream, struct session **sess, int } else { - return; + return NULL; } - *sess=session_new(stream, type, thread_id); - struct plugin_runtime_exdata*plugin_rt=adapter_new_plugin_runtime(type, global_plugin_specs); - session_set_ex_data(*sess, global_plugin_runtime_exdata_idx, plugin_rt); - adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_OPENING, thread_id); - return; + struct session *sess=session_new(st, stream, type, stream->threadnum); + session_dispatch(sess, SESSION_STATE_OPENING, a_packet); + return sess; } -void adapter_session_active(struct streaminfo *stream, struct session **sess, int thread_id) +void adapter_session_active(struct streaminfo *stream, struct session *sess, void *a_packet) { - if (*sess) + if (sess) { - session_set_state(*sess, SESSION_STATE_ACTIVE, thread_id); - adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_ACTIVE, thread_id); + session_dispatch( sess, SESSION_STATE_ACTIVE, a_packet); } return; } //streaminfo close, or firewall active close streaminfo -void adapter_session_close(struct streaminfo *stream, struct session **sess, int thread_id) -{ - if(*sess) - { - session_set_state(*sess, SESSION_STATE_CLOSING, thread_id); - adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_CLOSING, thread_id); - session_free(*sess, stream->threadnum); - } - return; -} - -void session_dettach_plugin(struct session *sess, plugin_id_t self) +void adapter_session_close(struct streaminfo *stream, struct session *sess, void *a_packet) { - if(sess==NULL)return; - struct plugin_runtime_exdata *plugin_rt=(struct plugin_runtime_exdata *)session_get_ex_data(sess, global_plugin_runtime_exdata_idx); - struct plugin_runtime_exdata *p=(plugin_rt+(unsigned int)self); - if(p) + if(sess) { - p->session_interest_state=SESSION_STATE_INVALID; + session_dispatch(sess, SESSION_STATE_CLOSING, a_packet); + session_free(sess); } return; } -void session_attach_plugin(struct session *sess, plugin_id_t self) -{ - if(sess==NULL)return; - struct plugin_runtime_exdata *plugin_rt=(struct plugin_runtime_exdata *)session_get_ex_data(sess, global_plugin_runtime_exdata_idx); - struct plugin_runtime_exdata *p=(plugin_rt+(unsigned int)self); - if(p) - { - p->session_interest_state=__SESSION_STATE_MAX; - } - return; -} - -const char* session_get_readable_addr(struct session *sess) +const char* session_get0_readable_addr(struct session *sess) { return printaddr(&sess->stream->addr, sess->stream->threadnum); } -const char *session_get_current_payload(struct session *sess, size_t *payload_len) +const char *session_get0_current_payload(struct session *sess, size_t *payload_len) { *payload_len = (size_t)sess->stream->ptcpdetail->datalen; return (const char*)sess->stream->ptcpdetail->pdata; } -const struct packet *session_get_current_packet(struct session *sess) +const struct packet *session_get0_current_packet(struct session *sess) { return (const struct packet *)get_rawpkt_from_streaminfo(sess->stream); } -const char *packet_get_data(const struct packet *pkt, size_t *data_len) +int packet_get_direction(const struct packet *pkt) +{ + // TODO: + return -1; +} + +const char *packet_get0_data(const struct packet *pkt, size_t *data_len) { + // TODO: return NULL; } int session_get_direction(const struct session *sess) { + // TODO: check route dir if(sess->stream) { if(sess->stream->curdir==DIR_C2S) @@ -237,4 +126,15 @@ int session_is_symmetric(const struct session *sess, unsigned char *flag) } } return is_symmetric; -}
\ No newline at end of file +} + +int stellar_get_worker_thread_num(struct stellar *st) +{ + return get_thread_count(); +} + +int stellar_get_current_thread_id(struct stellar *st) +{ + // TODO: + return 0; +}
\ No newline at end of file diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h index 55fadbd..da902f9 100644 --- a/src/adapter/adapter.h +++ b/src/adapter/adapter.h @@ -1,30 +1,14 @@ -#include "session.h" -#include "plugin_spec.h" - -#include <stdint.h> - -struct plugin_specific -{ - plugin_init_callback *init_cb; - plugin_exit_callback *exit_cb; - plugin_session_callback *udp_session_entry_cb; - plugin_session_callback *tcp_session_entry_cb; - int udp_session_interest_state; - int tcp_session_interest_state; - -}; - -int adapter_init(struct plugin_specific specs[] , int specs_num); -void adapter_exit(); - - +#pragma once +#include "stellar/session.h" struct streaminfo; //streaminfo open -void adapter_session_open(struct streaminfo *stream, struct session **session, int thread_id); -void adapter_session_active(struct streaminfo *stream, struct session **session, int thread_id); +struct session *adapter_session_open(struct stellar *st, struct streaminfo *stream, void *a_packet); +void adapter_session_active(struct streaminfo *stream, struct session *session, void *a_packet); //streaminfo close, or firewall active close streaminfo -void adapter_session_close(struct streaminfo *stream, struct session **session, int thread_id);
\ No newline at end of file +void adapter_session_close(struct streaminfo *stream, struct session *session, void *a_packet); + +void adapter_session_poll(struct session *sess);
\ No newline at end of file diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c index 96d9f2b..2701ac1 100644 --- a/src/adapter/session_manager.c +++ b/src/adapter/session_manager.c @@ -1,11 +1,15 @@ -#include "utils.h" +#include "stellar/session.h" +#include "stellar/stellar.h" +#include "stellar/utils.h" +#include "stellar/session_exdata.h" +#include "stellar/session_mq.h" #include "session_manager.h" +#include "stellar_internal.h" -#include "uthash/uthash.h" #include "uthash/utarray.h" -#include <sys/queue.h> +#include "uthash/utlist.h" -struct session_exdata_meta +struct session_exdata_schema { char *name; session_ex_free *free_func; @@ -13,47 +17,48 @@ struct session_exdata_meta int idx; }; -static void session_exdata_met_copy(void *_dst, const void *_src) +static void session_exdata_met_copy(void *_dst, const void *_src) { - struct session_exdata_meta *dst = (struct session_exdata_meta*)_dst, *src = (struct session_exdata_meta*)_src; - dst->free_func = src->free_func; - dst->arg = src->arg; - dst->idx = src->idx; - dst->name = src->name ? strdup(src->name) : NULL; + struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src; + dst->free_func = src->free_func; + dst->arg = src->arg; + dst->idx = src->idx; + dst->name = src->name ? strdup(src->name) : NULL; } -static void session_exdata_met_dtor(void *_elt) +static void session_exdata_met_dtor(void *_elt) { - struct session_exdata_meta *elt = (struct session_exdata_meta*)_elt; - if (elt->name) FREE(elt->name); + struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt; + if (elt->name) + FREE(elt->name); } -UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_meta), NULL, session_exdata_met_copy, session_exdata_met_dtor}; -UT_array *global_session_exdata_metas = NULL; +UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor}; -int session_get_ex_new_index(const char *name, session_ex_free *free_func,void *arg) + +int stellar_session_get_ex_new_index(struct stellar *st, const char *name, session_ex_free *free_func,void *arg) { - if(global_session_exdata_metas == NULL) + if(st->session_exdata_schema_array == NULL) { - utarray_new(global_session_exdata_metas, &session_exdata_meta_icd); + utarray_new(st->session_exdata_schema_array, &session_exdata_meta_icd); } - unsigned int len = utarray_len(global_session_exdata_metas); - struct session_exdata_meta *t_meta; + unsigned int len = utarray_len(st->session_exdata_schema_array); + struct session_exdata_schema *t_schema; for(unsigned int i = 0; i < len; i++) { - t_meta = (struct session_exdata_meta *)utarray_eltptr(global_session_exdata_metas, i); - if(strcmp(t_meta->name, name) == 0) + t_schema = (struct session_exdata_schema *)utarray_eltptr(st->session_exdata_schema_array, i); + if(strcmp(t_schema->name, name) == 0) { - return t_meta->idx; + return t_schema->idx; } } - struct session_exdata_meta meta; - meta.free_func=free_func; - meta.name=(char *)name; - meta.idx=len; - meta.arg=arg; - utarray_push_back(global_session_exdata_metas, &meta); - return meta.idx; + struct session_exdata_schema new_schema; + new_schema.free_func=free_func; + new_schema.name=(char *)name; + new_schema.idx=len; + new_schema.arg=arg; + utarray_push_back(st->session_exdata_schema_array, &new_schema); + return new_schema.idx; } struct session_exdata_runtime @@ -64,15 +69,15 @@ struct session_exdata_runtime int session_set_ex_data(struct session *sess, int idx, void *ex_ptr) { if(sess->ex_data_rt == NULL)return -1; - unsigned int len=utarray_len(global_session_exdata_metas); + unsigned int len=utarray_len(sess->st->session_exdata_schema_array); if(len < (unsigned int)idx)return -1; struct session_exdata_runtime* exdata = (struct session_exdata_runtime*)(sess->ex_data_rt+idx); - struct session_exdata_meta* meta = (struct session_exdata_meta*)utarray_eltptr(global_session_exdata_metas, (unsigned int)idx); + struct session_exdata_schema* schema = (struct session_exdata_schema*)utarray_eltptr(sess->st->session_exdata_schema_array, (unsigned int)idx); if(exdata) { - if(meta->free_func && exdata->data) + if(schema->free_func && exdata->data) { - meta->free_func(sess, idx, exdata->data, meta->arg); + schema->free_func(sess, idx, exdata->data, schema->arg); } exdata->data = ex_ptr; } @@ -82,248 +87,179 @@ int session_set_ex_data(struct session *sess, int idx, void *ex_ptr) void *session_get_ex_data(struct session *sess, int idx) { if(sess->ex_data_rt == NULL)return NULL; - unsigned int len = utarray_len(global_session_exdata_metas); + unsigned int len = utarray_len(sess->st->session_exdata_schema_array); if(len < (unsigned int)idx)return NULL; struct session_exdata_runtime* exdata = (struct session_exdata_runtime*)(sess->ex_data_rt+idx); return exdata->data; } -struct session_mq_node +typedef struct session_mq { - const char *topic_name; + int topic_id; void *message; size_t message_len; - STAILQ_ENTRY(session_mq_node) entries; -}; - -struct session_mq -{ - STAILQ_HEAD(session_mq_head, session_mq_node) head; -}; + struct session_mq *next, *prev; +}session_mq; - -struct subscriber +typedef struct session_mq_subscriber { on_msg_cb_func *sub_cb; void *cb_arg; - STAILQ_ENTRY(subscriber) entries; -}; + struct session_mq_subscriber *next, *prev; +}session_mq_subscribers; struct session_mq_topic_schema { char *topic_name; msg_free_cb_func *free_cb; void *cb_arg; - STAILQ_HEAD(sub_q_head, subscriber) subscribers; - UT_hash_handle hh; + int topic_id; + int sub_cnt; + struct session_mq_subscriber *subscribers; }; -static struct session_mq_topic_schema *global_session_mq_schema = NULL; -struct session_mq *session_mq_new(void) +static void session_mq_topic_schema_copy(void *_dst, const void *_src) { - struct session_mq *mq = CALLOC(struct session_mq, 1); - STAILQ_INIT(&mq->head); - return mq; + struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst, + *src = (struct session_mq_topic_schema *)_src; + dst->subscribers = src->subscribers; + dst->free_cb = src->free_cb; + dst->cb_arg = src->cb_arg; + dst->topic_id = src->topic_id; + dst->sub_cnt = src->sub_cnt; + dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL; } -void session_mq_free(struct session_mq *mq) +static void session_mq_topic_schema_dtor(void *_elt) { - struct session_mq_node *node; - while ((node = STAILQ_FIRST(&mq->head)) != NULL) - { - STAILQ_REMOVE_HEAD(&mq->head, entries); - FREE(node); - } - FREE(mq); + struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt; + if (elt->topic_name) + FREE(elt->topic_name); + // FREE(elt); // free the item } -static struct session_mq_topic_schema *session_mq_topic2schema(struct session_mq_topic_schema *topic_schema, const char *topic_name) +UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor}; + + +void session_mq_free(struct session_mq *head) { - struct session_mq_topic_schema *schema; - HASH_FIND_STR(topic_schema, topic_name, schema); - return schema; -}; + struct session_mq *elt, *tmp; + DL_FOREACH_SAFE(head, elt, tmp) + { + DL_DELETE(head, elt); + free(elt); + } + free(head); +} + -static void session_mq_cleanup(struct session_mq_topic_schema *schema) + +int session_mq_get_topic_id(struct stellar *st, const char *topic_name) { - struct session_mq_topic_schema *current, *tmp; - HASH_ITER(hh, schema, current, tmp) + if(topic_name == NULL || st == NULL || st->session_mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(st->session_mq_schema_array); + struct session_mq_topic_schema *t_schema; + for(unsigned int i = 0; i < len; i++) { - HASH_DEL(schema, current); // delete frome global_session_mq_schema - // free topic_name - if (current->topic_name) + t_schema = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, i); + if(strcmp(t_schema->topic_name, topic_name) == 0) { - FREE(current->topic_name); + return i; } - - // cleanup STAILQ - struct subscriber *sub; - while (!STAILQ_EMPTY(¤t->subscribers)) - { - sub = STAILQ_FIRST(¤t->subscribers); - STAILQ_REMOVE_HEAD(¤t->subscribers, entries); - FREE(sub); // free subscriber - } - FREE(current); // free the item - } - return; + } + return -1; } -void session_manager_cleanup() +int session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *free_cb, void *cb_arg) { - session_mq_cleanup(global_session_mq_schema); - utarray_free(global_session_exdata_metas); + if(st->session_mq_schema_array == NULL)return -1; + unsigned int len = utarray_len(st->session_mq_schema_array); + if(len < (unsigned int)topic_id)return -1; + struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id); + if(t_schema == NULL)return -1; + t_schema->free_cb=free_cb; + t_schema->cb_arg=cb_arg; + return 0; } -void session_mq_loop(struct session *sess) +int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg) { - struct session_mq_node *node; - struct subscriber *sub; - struct session_mq *mq = sess->mq; - while (!STAILQ_EMPTY(&mq->head)) + if(st->session_mq_schema_array == NULL) { - node = STAILQ_FIRST(&mq->head); - STAILQ_REMOVE_HEAD(&mq->head, entries); - struct session_mq_topic_schema *topic= session_mq_topic2schema(global_session_mq_schema, node->topic_name); - if(topic != NULL) - { - STAILQ_FOREACH(sub, &topic->subscribers, entries) - { - sub->sub_cb(sess, node->topic_name, node->message, sub->cb_arg); - } - if(topic->free_cb != NULL) - { - topic->free_cb(node->message, topic->cb_arg); - } - } - FREE(node); + utarray_new(st->session_mq_schema_array, &session_mq_topic_schema_icd); } - return; -}; - -int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg) -{ - struct session_mq_topic_schema *topic; - // check if topic_name exist - HASH_FIND_STR(global_session_mq_schema, topic_name, topic); - //if found and free_cb is not NULL, it's a exist producer - if (topic != NULL && topic->free_cb != NULL) + unsigned int len = utarray_len(st->session_mq_schema_array); + if(session_mq_get_topic_id(st, topic_name) >= 0) { - return -1; // exist + return -1; } - // if topic not found, create new topic, called by consumer or producer - if (topic == NULL) - { - topic = CALLOC(struct session_mq_topic_schema, 1); - topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储 - HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic); - if (STAILQ_EMPTY(&topic->subscribers)) - { - STAILQ_INIT(&topic->subscribers); - } - } - //if found but free_cb is NULL, it's a consumer's placeholder, now filled by producer - if (free_cb != NULL) - { - topic->free_cb = free_cb; - topic->cb_arg = cb_arg; - } - return 0; // success + struct session_mq_topic_schema t_schema; + t_schema.free_cb=free_cb; + t_schema.topic_name=(char *)topic_name; + t_schema.topic_id=len; + t_schema.cb_arg=cb_arg; + t_schema.subscribers=NULL; + t_schema.sub_cnt=0; + utarray_push_back(st->session_mq_schema_array, &t_schema); + return t_schema.topic_id; } -int session_mq_destroy_topic(const char *topic_name) { - struct session_mq_topic_schema *topic; - - // find topic in global schma - HASH_FIND_STR(global_session_mq_schema, topic_name, topic); - if (topic == NULL) - { - return -1; // not found +int session_mq_destroy_topic(struct stellar *st, int topic_id) +{ + unsigned int len = utarray_len(st->session_mq_schema_array); + if (len <= (unsigned int)topic_id) + return -1; + struct session_mq_topic_schema *topic = + (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id); + struct session_mq_subscriber *sub_elt, *sub_tmp; + + if (topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + DL_DELETE(topic->subscribers, sub_elt); + free(sub_elt); + } } - // delete topic from global schema - HASH_DEL(global_session_mq_schema, topic); - // free topic name - FREE(topic->topic_name); - - struct subscriber *sub; - while ((sub = STAILQ_FIRST(&topic->subscribers)) != NULL) - { - STAILQ_REMOVE_HEAD(&topic->subscribers, entries); - FREE(sub); - } - // free topic itself - FREE(topic); return 0; // success } -int session_publish_message(struct session *sess, const char *topic_name, void *data) + +int session_mq_publish_message(struct session *sess, int topic_id, void *data) { - struct session_mq_topic_schema *topic; - HASH_FIND_STR(global_session_mq_schema, topic_name, topic); - if (topic == NULL) - { - return -1; - } - struct session_mq *mq=sess->mq; - struct session_mq_node *node= CALLOC(struct session_mq_node,1); - node->topic_name = topic->topic_name; + unsigned int len = utarray_len(sess->st->session_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + struct session_mq *node= CALLOC(struct session_mq,1); + node->topic_id = topic_id; node->message = data; - STAILQ_INSERT_TAIL(&mq->head, node, entries); + DL_APPEND(sess->mq, node); return 0; } -int session_mq_subscribe_topic(const char *topic_name, on_msg_cb_func *sub_cb, void *cb_arg) +int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id) { - struct session_mq_topic_schema *topic; - session_mq_create_topic(topic_name, NULL, NULL); - HASH_FIND_STR(global_session_mq_schema, topic_name, topic); - if (topic == NULL) - { - return -1; // topic create failed - } - struct subscriber *new_subscriber = CALLOC(struct subscriber,1); - new_subscriber->sub_cb = sub_cb; - new_subscriber->cb_arg = cb_arg; - STAILQ_INSERT_TAIL(&topic->subscribers, new_subscriber, entries); - return 0; + // TODO: implement + return -1; } -/* -int session_mq_unsubscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg) -{ - struct session_mq_topic_schema *topic; - int unsubscribe_count = 0; - HASH_FIND_STR(global_session_mq_schema, topic_name, topic); - if (topic == NULL) - { - return -1; // opic create failed - } - struct subscriber *sub, *tmp_sub; - sub = STAILQ_FIRST(&topic->subscribers); - while (sub) - { - if (sub->read_cb == read_cb && sub->cb_arg == cb_arg) - { - tmp_sub = STAILQ_NEXT(sub, entries); - STAILQ_REMOVE(&topic->subscribers, sub, subscriber, entries); - FREE(sub); - unsubscribe_count++; - sub = tmp_sub; - } - else - { - sub = STAILQ_NEXT(sub, entries); // next sub - } - } - return unsubscribe_count; +int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg) +{ + unsigned int len = utarray_len(st->session_mq_schema_array); + if (len <= (unsigned int)topic_id)return -1; + struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id); + struct session_mq_subscriber *new_subscriber = CALLOC(struct session_mq_subscriber,1); + new_subscriber->sub_cb = sub_cb; + new_subscriber->cb_arg = cb_arg; + DL_APPEND(topic->subscribers, new_subscriber); + topic->sub_cnt+=1; + return topic->sub_cnt; } -*/ -static struct session_exdata_runtime *session_new_exdata_rt() +static struct session_exdata_runtime *session_new_exdata_rt(struct stellar *st) { struct session_exdata_runtime *exdata_rt = NULL; - unsigned int len = utarray_len(global_session_exdata_metas); + unsigned int len = utarray_len(st->session_exdata_schema_array); if(len > 0) { exdata_rt=CALLOC(struct session_exdata_runtime, len); @@ -331,53 +267,222 @@ static struct session_exdata_runtime *session_new_exdata_rt() return exdata_rt; } -static void session_free_exdata_rt(struct session* sess, struct session_exdata_runtime *exdata_rt) +static void session_free_exdata_runtime(struct session* sess, struct session_exdata_runtime *exdata_rt) { if(exdata_rt==NULL)return; - unsigned int len=utarray_len(global_session_exdata_metas); + unsigned int len=utarray_len(sess->st->session_exdata_schema_array); for (unsigned int i = 0; i < len; i++) { struct session_exdata_runtime *exdata = (struct session_exdata_runtime *)(exdata_rt + i); - struct session_exdata_meta *meta = (struct session_exdata_meta *)utarray_eltptr(global_session_exdata_metas, i); + struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(sess->st->session_exdata_schema_array, i); if (exdata) { - if (meta->free_func && exdata->data) + if (schema->free_func && exdata->data) { - meta->free_func(sess, i, exdata->data, meta->arg); + schema->free_func(sess, i, exdata->data, schema->arg); } } } } + +typedef struct session_event +{ + struct stellar *st; + struct session *sess; + session_event_cb_func *sesion_event_cb; + struct timeval timeout; + void *cb_arg; + int events; + unsigned short is_delete; + unsigned short is_intrinsic; + struct session_event *next, *prev; +}session_event; + +void session_dispatch(struct session *sess, enum session_state state, struct packet *pkt) +{ + struct session_event *intrinsic_events = (struct session_event *)session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx); + struct session_event *p = NULL; + unsigned int len = utarray_len(sess->st->plugin_specs_array); + sess->state = state; + int events=(1<<state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP));// TODO: upgrade + for(unsigned i = 0; i < len; i++) + { + p = (intrinsic_events+i); + if(p->sesion_event_cb != NULL && (p->events & events) && (p->is_delete == 0)) + { + p->sesion_event_cb(sess, events, pkt, p->cb_arg); + } + } + + session_defer_loop(sess); + return; +} + +static struct session_event* session_intrinsic_events_init(struct stellar *st, struct session *sess) +{ + struct session_event *intrinsic_events = CALLOC(struct session_event, utarray_len(st->intrinsic_session_event_schema_array)); + for(unsigned i = 0; i < utarray_len(st->intrinsic_session_event_schema_array); i++) + { + intrinsic_events[i] = *(struct session_event *)utarray_eltptr(st->intrinsic_session_event_schema_array, i); + } + return intrinsic_events; +} + struct streaminfo; -struct session *session_new(struct streaminfo* stream, enum session_type type, int thread_id) +struct session *session_new(struct stellar *st, struct streaminfo* stream, enum session_type type, int thread_id) { if(stream == NULL)return NULL; if(type!=SESSION_TYPE_TCP && type!=SESSION_TYPE_UDP)return NULL; struct session *sess = CALLOC(struct session, 1); + sess->st = st; sess->stream = stream; sess->type = type; sess->state = SESSION_STATE_OPENING; - sess->mq = session_mq_new(); - sess->ex_data_rt = session_new_exdata_rt(); + sess->mq = NULL; + sess->ex_data_rt = session_new_exdata_rt(st); + sess->ev_list=NULL; + struct session_event*intrinsic_events=session_intrinsic_events_init(st, sess); + session_set_ex_data(sess, st->intrinsic_session_event_exdata_idx, intrinsic_events); return sess; } -void session_free(struct session *sess, int thread_id) +void session_free(struct session *sess) { if(sess==NULL)return; if(sess->mq != NULL) { session_mq_free(sess->mq); } - session_free_exdata_rt(sess, sess->ex_data_rt); + session_free_exdata_runtime(sess, sess->ex_data_rt); FREE(sess->ex_data_rt); FREE(sess); } -void session_set_state(struct session *session, enum session_state state, int thread_id) +UT_icd sesion_event_icd = {sizeof(struct session_event), NULL, NULL, NULL}; + +struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg) +{ + struct session_event *event = CALLOC(struct session_event, 1); + event->events = events; + event->sesion_event_cb = cb; + event->cb_arg = cb_arg; + event->sess = sess; + event->st = st; + return event; +} + +void session_event_free(struct session_event *ev) +{ + if(ev==NULL)return; + if(ev->is_intrinsic) + { + ev->is_delete=1; + } + else + { + FREE(ev); + } + return; +} + +int session_event_add(struct session_event *ev, const struct timeval *timeout) +{ + // TODO + if(timeout) + { + ev->timeout = *timeout; + } + // TODO: if is_intrinsic, dup or move to ev_list + DL_APPEND(ev->sess->ev_list, ev); + return 0; +} + + +int session_event_assign(struct session_event *ev, struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg) +{ + if(ev==NULL)return -1; + ev->events = events; + ev->sesion_event_cb = cb; + ev->cb_arg = cb_arg; + ev->sess = sess; + ev->st = st; + return 0; +} + +int session_event_del(struct session_event *ev) +{ + // TODO: + ev->is_delete=1; + if(ev->is_intrinsic==0 && (ev->prev != NULL || ev->next != NULL)) + { + DL_DELETE(ev->sess->ev_list, ev); + ev->prev = NULL; + ev->next = NULL; + } + return 0; +} + +int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg) +{ + struct session_event event={.is_intrinsic=1, .events=events, .sesion_event_cb=cb, .cb_arg=cb_arg, .st=st, .sess=NULL, .is_delete=0, .timeout={0,0}, .next=NULL, .prev=NULL}; + if(st->intrinsic_session_event_schema_array == NULL) + { + utarray_new(st->intrinsic_session_event_schema_array, &sesion_event_icd); + } + unsigned int len = utarray_len(st->intrinsic_session_event_schema_array); + utarray_push_back(st->intrinsic_session_event_schema_array, &event); + return len; +} + +struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id) +{ + unsigned int len = utarray_len(sess->st->intrinsic_session_event_schema_array); + if(plugin_id < 0 || plugin_id >= (int)len)return NULL; + struct session_event*intrinsic_events=session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx); + return (struct session_event *)(intrinsic_events+plugin_id); +} + +void session_defer_loop(struct session *sess) +{ + + struct session_event *ev=NULL, *ev_tmp=NULL; + int events=(1<<sess->state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP)); + const struct packet *pkt= session_get0_current_packet(sess); + DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp) + { + if(ev->sesion_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0)) + { + ev->sesion_event_cb(sess, events, pkt, ev->cb_arg); + } + } + + struct session_mq *mq_elt=NULL, *mq_tmp=NULL; + struct session_mq_subscriber *sub_elt, *sub_tmp; + struct session_mq_topic_schema *topic; + DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp) + { + topic=(struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array,(unsigned int)(mq_elt->topic_id)); + if(topic) + { + DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp) + { + sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg); + } + if(topic->free_cb) + { + topic->free_cb(mq_elt->message, topic->cb_arg); + } + } + DL_DELETE(sess->mq, mq_elt); + free(mq_elt); + } + return; +} + +int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval) { - if(session == NULL)return; - session->state = state; + // TODO: + return -1; }
\ No newline at end of file diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h index 9e2a984..e0b8f5b 100644 --- a/src/adapter/session_manager.h +++ b/src/adapter/session_manager.h @@ -1,27 +1,28 @@ #pragma once -#include "session.h" -#include "plugin_spec.h" +#include "stellar/session.h" +#include "stellar/stellar.h" #include "uthash/utarray.h" struct streaminfo; struct session_exdata_runtime; +struct session_mq; struct session { enum session_type type; enum session_state state; struct streaminfo *stream; - struct session_mq *mq; + struct stellar *st; struct session_exdata_runtime *ex_data_rt; + struct session_mq *mq; + struct session_event *ev_list; }; -struct session *session_new(struct streaminfo* stream, enum session_type type, int thread_id); -void session_free(struct session *session, int thread_id); - -void session_set_state(struct session *session, enum session_state state, int thread_id); +struct session *session_new(struct stellar *st, struct streaminfo* stream, enum session_type type, int thread_id); +void session_free(struct session *sess); -void session_mq_loop(struct session *session); +void session_defer_loop(struct session *sess); -void session_manager_cleanup();
\ No newline at end of file +void session_dispatch(struct session *sess, enum session_state state, struct packet *pkt);
\ No newline at end of file diff --git a/src/adapter/stellar.c b/src/adapter/stellar.c new file mode 100644 index 0000000..ee28f33 --- /dev/null +++ b/src/adapter/stellar.c @@ -0,0 +1,68 @@ +#include "stellar_internal.h" +#include "session_manager.h" + +#include "stellar/utils.h" +#include "stellar/session_exdata.h" + + +struct plugin_specific_inernal +{ + struct plugin_specific specs; + void *plugin_ctx; +}; + +UT_icd plugin_specs_icd = {sizeof(struct plugin_specific_inernal), NULL, NULL, NULL}; + +static void exdata_plugin_rt_free(struct session *sess, int idx, void *ex_ptr, void *arg) +{ + if(ex_ptr) + { + FREE(ex_ptr); + } + return; +} + + +struct stellar *stellar_init(struct plugin_specific specs[] , int spec_num) +{ + struct stellar *st = CALLOC(struct stellar, 1); + struct plugin_specific_inernal spec_internal={}; + utarray_new(st->plugin_specs_array,&plugin_specs_icd); + utarray_reserve(st->plugin_specs_array, spec_num); + st->intrinsic_session_event_exdata_idx=stellar_session_get_ex_new_index(st, "__STELLAR__", exdata_plugin_rt_free, NULL); + for(int i = 0; i < spec_num; i++) + { + if (specs[i].init_cb != NULL) + { + spec_internal.plugin_ctx=specs[i].init_cb(st); + if(spec_internal.plugin_ctx == NULL) + { + goto ADAPTER_INIT_ERROR; + } + spec_internal.specs=specs[i]; + utarray_push_back(st->plugin_specs_array, &spec_internal); + } + } + return st; +ADAPTER_INIT_ERROR: + stellar_exit(st); + return NULL; +} + +void stellar_exit(struct stellar *st) +{ + struct plugin_specific_inernal *p=NULL; + while ((p = (struct plugin_specific_inernal *)utarray_next(st->plugin_specs_array, p))) + { + if(p->specs.exit_cb != NULL) + { + p->specs.exit_cb(p->plugin_ctx); + } + } + utarray_free(st->session_mq_schema_array); + utarray_free(st->session_exdata_schema_array); + utarray_free(st->intrinsic_session_event_schema_array); + utarray_free(st->plugin_specs_array); + FREE(st); + return; +};
\ No newline at end of file diff --git a/src/adapter/stellar_internal.h b/src/adapter/stellar_internal.h new file mode 100644 index 0000000..f528e62 --- /dev/null +++ b/src/adapter/stellar_internal.h @@ -0,0 +1,13 @@ +#pragma once +#include "stellar/stellar.h" + +#include "uthash/utarray.h" + +struct stellar +{ + int intrinsic_session_event_exdata_idx; + UT_array *intrinsic_session_event_schema_array; + UT_array *plugin_specs_array; + UT_array *session_mq_schema_array; + UT_array *session_exdata_schema_array; +};
\ No newline at end of file diff --git a/src/adapter/test/CMakeLists.txt b/src/adapter/test/CMakeLists.txt deleted file mode 100644 index 930ad08..0000000 --- a/src/adapter/test/CMakeLists.txt +++ /dev/null @@ -1,5 +0,0 @@ -add_definitions(-fPIC) - -add_library(test_loader SHARED test_loader.c test_plugin.c) -target_link_libraries(test_loader adapter) -set_target_properties(test_loader PROPERTIES PREFIX "") diff --git a/src/adapter/test/test_loader.c b/src/adapter/test/test_loader.c deleted file mode 100644 index ec993de..0000000 --- a/src/adapter/test/test_loader.c +++ /dev/null @@ -1,149 +0,0 @@ -#include "session.h" -#include "utils.h" -#include "test_plugin.h" -#include "../adapter.h" -#include "../session_manager.h" - -#include <MESA/stream.h> -#include <stdio.h> - -struct plugin_specific g_plugin_schema[] = -{ - { - .init_cb = test_entry_plugin_init, - .exit_cb = test_entry_plugin_exit, - .udp_session_entry_cb=test_entry_plugin_entry, - .tcp_session_entry_cb=test_entry_plugin_entry, - .udp_session_interest_state=__SESSION_STATE_MAX, - .tcp_session_interest_state=__SESSION_STATE_MAX, - }, - { - .init_cb = test_mq_plugin_init, - .exit_cb = test_mq_plugin_exit, - .udp_session_entry_cb=test_mq_plugin_entry, - .tcp_session_entry_cb=test_mq_plugin_entry, - .udp_session_interest_state=__SESSION_STATE_MAX, - .tcp_session_interest_state=__SESSION_STATE_MAX, - }, -}; - -struct test_stream_ctx -{ - uint32_t c2s_pkts; - uint32_t c2s_bytes; - uint32_t s2c_pkts; - uint32_t s2c_bytes; - struct session *sess; -}; - -static void test_mq_topic_free(void *data, void *cb_arg) -{ - FREE(data); - return; -} - -int test_mq_loader_read(struct session *sess, const char* topic_name, const void *data, void *cb_arg) -{ - struct test_stream_ctx *ctx =(struct test_stream_ctx *)data; - printf("loader_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return 0; -} - -int TEST_INIT() -{ - int ret = adapter_init(g_plugin_schema, 2); - if(ret != 0)return -1; - - session_mq_create_topic("test_mq_topic", test_mq_topic_free, NULL); - session_mq_subscribe_topic("test_mq_topic", test_mq_loader_read, NULL); - return 0; -} - -void TEST_EXIT(void) -{ - - session_mq_destroy_topic("test_mq_topic"); - adapter_exit(); - return; -} - - -static void print_stream_ctx(struct streaminfo *pstream, struct test_stream_ctx *ctx) -{ - printf("stream-----------%20s: ", printaddr(&(pstream->addr), pstream->threadnum)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return; -} - -static struct test_stream_ctx *stream_ctx_dup(const struct test_stream_ctx *origin) -{ - struct test_stream_ctx *ctx=CALLOC(struct test_stream_ctx,1); - memcpy(ctx, origin, sizeof(struct test_stream_ctx)); - return ctx; -} - -static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet) -{ - struct test_stream_ctx *ctx=(struct test_stream_ctx *)*pme; - struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail; - if(*pme==NULL) - { - *pme=CALLOC(struct test_stream_ctx,1); - ctx=(struct test_stream_ctx *)*pme; - } - - if(a_packet!=NULL) - { - if(DIR_C2S == pstream->curdir){ - ctx->c2s_bytes += pdetail->datalen; - ctx->c2s_pkts++; - }else{ - ctx->s2c_bytes += pdetail->datalen; - ctx->s2c_pkts++; - } - } - struct test_stream_ctx *msg; - switch (state) - { - case OP_STATE_PENDING: - adapter_session_open(pstream, &ctx->sess, thread_seq); - break; - case OP_STATE_DATA: - msg = stream_ctx_dup((const struct test_stream_ctx *)*pme); - if(session_publish_message(ctx->sess, "test_mq_topic", msg) < 0) - { - FREE(msg); - } - adapter_session_active(pstream, &ctx->sess, thread_seq); - break; - case OP_STATE_CLOSE: - adapter_session_close(pstream, &ctx->sess, thread_seq); - print_stream_ctx(pstream, ctx); - FREE(*pme); - break; - default: - break; - } - return; -} - -char test_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) -{ - loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet); - return APP_STATE_GIVEME; -} - -char test_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet) -{ - loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet); - return APP_STATE_GIVEME; -} diff --git a/src/adapter/test/test_loader.inf b/src/adapter/test/test_loader.inf deleted file mode 100644 index 47b2fe8..0000000 --- a/src/adapter/test/test_loader.inf +++ /dev/null @@ -1,13 +0,0 @@ -[PLUGINFO] -PLUGNAME=test_app -SO_PATH=./plug/business/test_loader/test_loader.so -INIT_FUNC=TEST_INIT -DESTROY_FUNC=TEST_EXIT - -[TCP_ALL] -FUNC_FLAG=ALL -FUNC_NAME=test_tcpall_stream_entry - -[UDP] -FUNC_FLAG=ALL -FUNC_NAME=test_udp_stream_entry
\ No newline at end of file diff --git a/src/adapter/test/test_plugin.c b/src/adapter/test/test_plugin.c deleted file mode 100644 index 4dcb9d5..0000000 --- a/src/adapter/test/test_plugin.c +++ /dev/null @@ -1,127 +0,0 @@ -#include "plugin_spec.h" -#include "session.h" -#include "utils.h" -#include "test_plugin.h" - -#include <stdio.h> - -int g_test_entry_plugin_id = 0; -int g_test_mq_plugin_id = 0; - - -int g_test_entry_exdata_idx = 0; -int g_test_mq_plugin_exdata_idx = 0; - -struct test_session_message -{ - uint32_t c2s_pkts; - uint32_t c2s_bytes; - uint32_t s2c_pkts; - uint32_t s2c_bytes; -}; - -static void test_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg) -{ - if(ex_ptr) - { - FREE(ex_ptr); - } - return; -} - -int test_entry_plugin_init(int plugin_id) -{ - g_test_entry_plugin_id = plugin_id; - g_test_entry_exdata_idx=session_get_ex_new_index("TEST_ENTRY", test_exdata_free, NULL); - return 0; -} -void test_entry_plugin_exit(void) -{ - return; -} - -static int test_session_mq_plugin_read(struct session *sess, const char* topic_name, const void *data, void *cb_arg) -{ - struct test_session_message *ctx =(struct test_session_message *)data; - printf("plugin_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - printf("session(%d)-----------%20s: attach\n", g_test_mq_plugin_id, session_get_readable_addr(sess)); - session_attach_plugin(sess, g_test_mq_plugin_id); - return 0; -} - -int test_mq_plugin_init(int plugin_id) -{ - g_test_mq_plugin_id = plugin_id; - g_test_mq_plugin_exdata_idx=session_get_ex_new_index("TEST_MQ", test_exdata_free, NULL); - session_mq_subscribe_topic("test_mq_topic", test_session_mq_plugin_read, NULL); - return 0; -} -void test_mq_plugin_exit(void) -{ - return; -} - - - -static void print_session_ctx(struct session *sess, struct test_session_message *ctx, int plugin_id) -{ - printf("session(%d)-----------%20s: ", plugin_id, session_get_readable_addr(sess)); - printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ", - ctx->c2s_pkts, ctx->c2s_bytes, - ctx->s2c_pkts, ctx->s2c_bytes); - printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts); - printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes); - return; -} - -void test_entry_plugin_entry(struct session *sess, enum session_state state, int thread_id) -{ - struct test_session_message *test_ctx = (struct test_session_message *)session_get_ex_data(sess, g_test_entry_exdata_idx); - if (test_ctx == NULL) - { - test_ctx = CALLOC(struct test_session_message, 1); - session_set_ex_data(sess, g_test_entry_exdata_idx, test_ctx); - } - size_t payload_len = 0; - session_get_current_payload(sess, &payload_len); - const struct packet *pkt = session_get_current_packet(sess); - if (pkt) - { - int dir = session_get_direction(sess); - if (dir==SESSION_DIRECTION_IN) - { - test_ctx->c2s_bytes += payload_len; - test_ctx->c2s_pkts += 1; - } - if (dir==SESSION_DIRECTION_OUT) - { - test_ctx->s2c_bytes += payload_len; - test_ctx->s2c_pkts += 1; - } - } - switch (state) - { - case SESSION_STATE_CLOSING: - if (test_ctx != NULL) - { - print_session_ctx(sess, test_ctx, g_test_entry_plugin_id); - } - break; - default: - break; - } - return; -} - -void test_mq_plugin_entry(struct session *sess, enum session_state state, int thread_id) -{ - printf("session(%d)-----------%20s: dettach\n", g_test_mq_plugin_id, session_get_readable_addr(sess)); - session_dettach_plugin(sess, g_test_mq_plugin_id); - return; -} - diff --git a/src/adapter/test/test_plugin.h b/src/adapter/test/test_plugin.h deleted file mode 100644 index ac5e0b7..0000000 --- a/src/adapter/test/test_plugin.h +++ /dev/null @@ -1,14 +0,0 @@ -#pragma once - -#include "plugin_spec.h" - -int test_entry_plugin_init(int plugin_id); -void test_entry_plugin_exit(void); -void test_entry_plugin_entry(struct session *session, enum session_state state, int thread_id); - - -int test_mq_plugin_init(int plugin_id); -void test_mq_plugin_exit(void); -void test_mq_plugin_entry(struct session *session, enum session_state state, int thread_id); - - diff --git a/src/stat_policy/stat_policy.h b/src/stat_policy/stat_policy.h deleted file mode 100644 index e69de29..0000000 --- a/src/stat_policy/stat_policy.h +++ /dev/null |
