summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-08-17 21:54:41 +0800
committeryangwei <[email protected]>2023-08-23 12:25:28 +0800
commitf85c57d2002f11fac34cfbaff3af0058c0e7757f (patch)
tree297373e93fc7c9c3c0ada31116659ecc3a8d500d
parent8fffa5090139f72ae53fcba6b346fda389eef2ee (diff)
✨ feat(redefine sdk/include):
-rw-r--r--CMakeLists.txt6
-rw-r--r--examples/.gitkeep (renamed from sdk/example/.gitkeep)0
-rw-r--r--examples/sapp_plugin/CMakeLists.txt6
-rw-r--r--examples/sapp_plugin/simple_loader.inf13
-rw-r--r--examples/sapp_plugin/simple_sapp_entry.c154
-rw-r--r--examples/sapp_plugin/simple_stellar_plugin.c151
-rw-r--r--examples/sapp_plugin/simple_stellar_plugin.h11
-rw-r--r--include/stellar/session.h66
-rw-r--r--include/stellar/session_exdata.h10
-rw-r--r--include/stellar/session_mq.h24
-rw-r--r--include/stellar/stellar.h24
-rw-r--r--include/stellar/utils.h (renamed from sdk/include/utils.h)0
-rw-r--r--sdk/include/plugin_spec.h8
-rw-r--r--sdk/include/session.h66
-rw-r--r--src/adapter/CMakeLists.txt2
-rw-r--r--src/adapter/adapter.c188
-rw-r--r--src/adapter/adapter.h30
-rw-r--r--src/adapter/session_manager.c561
-rw-r--r--src/adapter/session_manager.h19
-rw-r--r--src/adapter/stellar.c68
-rw-r--r--src/adapter/stellar_internal.h13
-rw-r--r--src/adapter/test/CMakeLists.txt5
-rw-r--r--src/adapter/test/test_loader.c149
-rw-r--r--src/adapter/test/test_loader.inf13
-rw-r--r--src/adapter/test/test_plugin.c127
-rw-r--r--src/adapter/test/test_plugin.h14
-rw-r--r--src/stat_policy/stat_policy.h0
27 files changed, 938 insertions, 790 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 3fb510d..cf7d68d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -68,13 +68,13 @@ add_custom_target("install-profile" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Pr
include_directories(${CMAKE_SOURCE_DIR})
include_directories(${CMAKE_SOURCE_DIR}/deps)
-include_directories(${CMAKE_SOURCE_DIR}/sdk/include)
+include_directories(${CMAKE_SOURCE_DIR}/include)
#add_subdirectory(vendor)
-add_subdirectory(deps/toml)
+#add_subdirectory(deps/toml)
#add_subdirectory(deps/utable)
add_subdirectory(src/adapter)
-add_subdirectory(src/adapter/test)
+add_subdirectory(examples/sapp_plugin)
#enable_testing()
#add_subdirectory(test)
diff --git a/sdk/example/.gitkeep b/examples/.gitkeep
index e69de29..e69de29 100644
--- a/sdk/example/.gitkeep
+++ b/examples/.gitkeep
diff --git a/examples/sapp_plugin/CMakeLists.txt b/examples/sapp_plugin/CMakeLists.txt
new file mode 100644
index 0000000..fc524a0
--- /dev/null
+++ b/examples/sapp_plugin/CMakeLists.txt
@@ -0,0 +1,6 @@
+add_definitions(-fPIC)
+
+add_library(simple_loader SHARED simple_sapp_entry.c simple_stellar_plugin.c)
+target_link_libraries(simple_loader adapter)
+set_target_properties(simple_loader PROPERTIES PREFIX "")
+include_directories(${CMAKE_SOURCE_DIR}/src) \ No newline at end of file
diff --git a/examples/sapp_plugin/simple_loader.inf b/examples/sapp_plugin/simple_loader.inf
new file mode 100644
index 0000000..142b3ec
--- /dev/null
+++ b/examples/sapp_plugin/simple_loader.inf
@@ -0,0 +1,13 @@
+[PLUGINFO]
+PLUGNAME=simple_loader
+SO_PATH=./plug/business/simple_loader/simple_loader.so
+INIT_FUNC=LOADER_INIT
+DESTROY_FUNC=LOADER_EXIT
+
+[TCP_ALL]
+FUNC_FLAG=ALL
+FUNC_NAME=loader_tcpall_stream_entry
+
+[UDP]
+FUNC_FLAG=ALL
+FUNC_NAME=loader_udp_stream_entry \ No newline at end of file
diff --git a/examples/sapp_plugin/simple_sapp_entry.c b/examples/sapp_plugin/simple_sapp_entry.c
new file mode 100644
index 0000000..aee7576
--- /dev/null
+++ b/examples/sapp_plugin/simple_sapp_entry.c
@@ -0,0 +1,154 @@
+#include "stellar/session.h"
+#include "stellar/session_mq.h"
+#include "stellar/utils.h"
+
+#include "simple_stellar_plugin.h"
+
+#include "adapter/adapter.h"
+#include "adapter/session_manager.h"
+
+#include <MESA/stream.h>
+#include <stdio.h>
+
+struct plugin_specific g_plugin_schema[] =
+{
+ {
+ .init_cb = simple_stellar_event_plugin_init,
+ .exit_cb = simple_stellar_event_plugin_exit,
+ },
+ {
+ .init_cb = simple_stellar_mq_plugin_init,
+ .exit_cb = simple_stellar_mq_plugin_exit,
+ },
+};
+
+struct simple_stream_ctx
+{
+ uint32_t c2s_pkts;
+ uint32_t c2s_bytes;
+ uint32_t s2c_pkts;
+ uint32_t s2c_bytes;
+ struct session *sess;
+};
+
+static void session_mq_topic_free(void *data, void *cb_arg)
+{
+ FREE(data);
+ return;
+}
+
+static int session_mq_loader_read(struct session *sess, int topic_id, const void *data, void *cb_arg)
+{
+ struct simple_stream_ctx *ctx =(struct simple_stream_ctx *)data;
+ printf("loader_read_message(topic:%d)-----------%20s", topic_id, session_get0_readable_addr(sess));
+ printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
+ ctx->c2s_pkts, ctx->c2s_bytes,
+ ctx->s2c_pkts, ctx->s2c_bytes);
+ printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
+ printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
+ return 0;
+}
+
+void *g_stellar=NULL;
+int g_topic_id=-1;
+int LOADER_INIT()
+{
+ g_stellar = stellar_init(g_plugin_schema, 2);
+ if(g_stellar==NULL)return -1;
+ int t_topic_id=session_mq_get_topic_id(g_stellar, "SIMPLE_MQ_TOPIC");
+ if(t_topic_id >= 0)
+ {
+ session_mq_update_topic(g_stellar, t_topic_id, session_mq_topic_free, NULL);
+ g_topic_id=t_topic_id;
+ }
+ else
+ {
+ g_topic_id=session_mq_create_topic(g_stellar, "SIMPLE_MQ_TOPIC", session_mq_topic_free, NULL);
+ }
+ session_mq_subscribe_topic(g_stellar , g_topic_id, session_mq_loader_read, NULL);
+ return 0;
+}
+
+void LOADER_EXIT(void)
+{
+
+ session_mq_destroy_topic(g_stellar, g_topic_id);
+ stellar_exit(g_stellar);
+ return;
+}
+
+
+static void print_stream_ctx(struct streaminfo *pstream, struct simple_stream_ctx *ctx)
+{
+ printf("stream-----------%20s: ", printaddr(&(pstream->addr), pstream->threadnum));
+ printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
+ ctx->c2s_pkts, ctx->c2s_bytes,
+ ctx->s2c_pkts, ctx->s2c_bytes);
+ printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
+ printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
+ return;
+}
+
+static struct simple_stream_ctx *stream_ctx_dup(const struct simple_stream_ctx *origin)
+{
+ struct simple_stream_ctx *ctx=CALLOC(struct simple_stream_ctx,1);
+ memcpy(ctx, origin, sizeof(struct simple_stream_ctx));
+ return ctx;
+}
+
+static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
+{
+ struct simple_stream_ctx *ctx=(struct simple_stream_ctx *)*pme;
+ struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail;
+ if(*pme==NULL)
+ {
+ *pme=CALLOC(struct simple_stream_ctx,1);
+ ctx=(struct simple_stream_ctx *)*pme;
+ }
+
+ if(a_packet!=NULL)
+ {
+ if(DIR_C2S == pstream->curdir){
+ ctx->c2s_bytes += pdetail->datalen;
+ ctx->c2s_pkts++;
+ }else{
+ ctx->s2c_bytes += pdetail->datalen;
+ ctx->s2c_pkts++;
+ }
+ }
+ struct simple_stream_ctx *msg;
+ switch (state)
+ {
+ case OP_STATE_PENDING:
+ ctx->sess=adapter_session_open(g_stellar, pstream, a_packet);
+ break;
+ case OP_STATE_DATA:
+ msg = stream_ctx_dup((const struct simple_stream_ctx *)*pme);
+ if(session_mq_publish_message(ctx->sess, g_topic_id, msg) < 0)
+ {
+ FREE(msg);
+ }
+ adapter_session_active(pstream, ctx->sess, a_packet);
+ break;
+ case OP_STATE_CLOSE:
+ adapter_session_close(pstream, ctx->sess, a_packet);
+ print_stream_ctx(pstream, ctx);
+ FREE(*pme);
+ break;
+ default:
+ break;
+ }
+ return;
+}
+
+char loader_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+{
+ loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
+ return APP_STATE_GIVEME;
+}
+
+char loader_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
+{
+ loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet);
+ return APP_STATE_GIVEME;
+}
diff --git a/examples/sapp_plugin/simple_stellar_plugin.c b/examples/sapp_plugin/simple_stellar_plugin.c
new file mode 100644
index 0000000..7084ceb
--- /dev/null
+++ b/examples/sapp_plugin/simple_stellar_plugin.c
@@ -0,0 +1,151 @@
+#include "simple_stellar_plugin.h"
+
+#include "stellar/stellar.h"
+#include "stellar/utils.h"
+#include "stellar/session_exdata.h"
+#include "stellar/session_mq.h"
+
+#include <stdio.h>
+
+struct simple_stellar_plugin_ctx
+{
+ int plugin_id;
+ int exdata_idx;
+ struct stellar *st;
+};
+
+struct mq_message_stat
+{
+ uint32_t c2s_pkts;
+ uint32_t c2s_bytes;
+ uint32_t s2c_pkts;
+ uint32_t s2c_bytes;
+};
+
+extern int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg);
+
+static int session_mq_plugin_sub_fn(struct session *sess, int topic_id, const void *data, void *cb_arg)
+{
+ struct mq_message_stat *ctx =(struct mq_message_stat *)data;
+ struct simple_stellar_plugin_ctx *plugin_ctx = (struct simple_stellar_plugin_ctx *)cb_arg;
+ printf("%s(topic:%d->plug:%d)-----------%20s: ", __FUNCTION__, topic_id, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
+ printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
+ ctx->c2s_pkts, ctx->c2s_bytes,
+ ctx->s2c_pkts, ctx->s2c_bytes);
+ printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
+ printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
+
+ struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id);
+ session_event_assign(i_ev, plugin_ctx->st, sess, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_mq_plugin_entry, plugin_ctx);
+ printf("%s(plug:%d)session_event_assign-----------%20s\n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
+
+ return 0;
+}
+
+static void print_session_ctx(struct session *sess, struct mq_message_stat *ctx, int plugin_id)
+{
+ printf("%s(plug:%d)-----------%20s: ", __FUNCTION__, plugin_id, session_get0_readable_addr(sess));
+ printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
+ ctx->c2s_pkts, ctx->c2s_bytes,
+ ctx->s2c_pkts, ctx->s2c_bytes);
+ printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
+ printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
+ return;
+}
+
+int simple_event_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
+{
+ if(cb_arg == NULL)return -1;
+ struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
+ struct mq_message_stat *mg_stat = (struct mq_message_stat *)session_get_ex_data(sess, plugin_ctx->exdata_idx);
+ if (mg_stat == NULL)
+ {
+ mg_stat = CALLOC(struct mq_message_stat, 1);
+ session_set_ex_data(sess, plugin_ctx->exdata_idx, mg_stat);
+ }
+
+ if (pkt)
+ {
+ size_t payload_len = 0;
+ session_get0_current_payload(sess, &payload_len);
+ int dir = session_get_direction(sess);
+ if (dir==SESSION_DIRECTION_IN)
+ {
+ mg_stat->c2s_bytes += payload_len;
+ mg_stat->c2s_pkts += 1;
+ }
+ if (dir==SESSION_DIRECTION_OUT)
+ {
+ mg_stat->s2c_bytes += payload_len;
+ mg_stat->s2c_pkts += 1;
+ }
+ }
+ if (mg_stat != NULL && (events & SESS_EV_CLOSING))
+ {
+ print_session_ctx(sess, mg_stat, plugin_ctx->plugin_id);
+ }
+ return 0;
+}
+
+int simple_mq_plugin_entry(struct session *sess, int events, const struct packet *pkt, void *cb_arg)
+{
+ if(cb_arg == NULL)return -1;
+ struct simple_stellar_plugin_ctx *plugin_ctx=(struct simple_stellar_plugin_ctx *)cb_arg;
+ struct session_event *i_ev=session_get_intrinsic_event(sess, plugin_ctx->plugin_id);
+ session_event_assign(i_ev, plugin_ctx->st, sess, 0, simple_mq_plugin_entry, plugin_ctx);
+ printf("%s(plug:%d)session_event_assign-----------%20s: \n", __FUNCTION__, plugin_ctx->plugin_id, session_get0_readable_addr(sess));
+ return 0;
+}
+
+static void simple_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg)
+{
+ if(ex_ptr)
+ {
+ FREE(ex_ptr);
+ }
+ return;
+}
+
+void *simple_stellar_event_plugin_init(struct stellar *st)
+{
+ struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1);
+ ctx->st = st;
+ ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_EVENT_PLUGIN", simple_exdata_free, NULL);
+ int plugin_id=stellar_plugin_register(st, (SESS_EV_TCP|SESS_EV_UDP|SESS_EV_OPENING|SESS_EV_PACKET|SESS_EV_CLOSING), simple_event_plugin_entry, ctx);
+ if(plugin_id >= 0)
+ {
+ ctx->plugin_id=plugin_id;
+ }
+ return ctx;
+}
+
+void simple_stellar_event_plugin_exit(void *ctx)
+{
+ if(ctx)FREE(ctx);
+ return;
+}
+
+void *simple_stellar_mq_plugin_init(struct stellar *st)
+{
+ struct simple_stellar_plugin_ctx *ctx = CALLOC(struct simple_stellar_plugin_ctx, 1);
+ ctx->st = st;
+ ctx->exdata_idx = stellar_session_get_ex_new_index(st, "SIMPLE_MQ_PLUGIN", simple_exdata_free, NULL);
+ int topic_id=session_mq_get_topic_id(st, "SIMPLE_MQ_TOPIC");
+ if(topic_id < 0)
+ {
+ topic_id=session_mq_create_topic(st, "SIMPLE_MQ_TOPIC", NULL, NULL);
+ }
+ session_mq_subscribe_topic(st, topic_id, session_mq_plugin_sub_fn, ctx);
+ int plugin_id=stellar_plugin_register(st, 0, simple_mq_plugin_entry, ctx);
+ if(plugin_id >= 0)
+ {
+ ctx->plugin_id=plugin_id;
+ }
+ return ctx;
+}
+
+void simple_stellar_mq_plugin_exit(void *ctx)
+{
+ if(ctx)FREE(ctx);
+ return;
+} \ No newline at end of file
diff --git a/examples/sapp_plugin/simple_stellar_plugin.h b/examples/sapp_plugin/simple_stellar_plugin.h
new file mode 100644
index 0000000..1e7e7ba
--- /dev/null
+++ b/examples/sapp_plugin/simple_stellar_plugin.h
@@ -0,0 +1,11 @@
+#pragma once
+
+#include "stellar/stellar.h"
+#include "stellar/session.h"
+
+void *simple_stellar_event_plugin_init(struct stellar *st);
+void simple_stellar_event_plugin_exit(void *plugin_ctx);
+
+void *simple_stellar_mq_plugin_init(struct stellar *st);
+void simple_stellar_mq_plugin_exit(void *plugin_ctx);
+
diff --git a/include/stellar/session.h b/include/stellar/session.h
new file mode 100644
index 0000000..75772b3
--- /dev/null
+++ b/include/stellar/session.h
@@ -0,0 +1,66 @@
+#pragma once
+
+#include <stdint.h>
+#include <stddef.h>
+
+#include "stellar.h"
+
+enum session_type
+{
+ SESSION_TYPE_TCP,
+ SESSION_TYPE_UDP,
+ __SESSION_TYPE_MAX,
+};
+
+enum session_state
+{
+ SESSION_STATE_INVALID = 0,
+ SESSION_STATE_OPENING = 1 ,
+ SESSION_STATE_ACTIVE = 2,
+ SESSION_STATE_CLOSING = 3,
+ __SESSION_STATE_MAX,
+};
+
+
+struct session;
+
+#define SESSION_SEEN_C2S_FLOW (1 >> 1)
+#define SESSION_SEEN_S2C_FLOW (1 >> 2)
+int session_is_symmetric(const struct session *sess, unsigned char *flag);
+
+#define SESSION_DIRECTION_IN 0
+#define SESSION_DIRECTION_OUT 1
+int session_get_direction(const struct session *sess);
+
+const char *session_get0_readable_addr(struct session *sess);
+const char *session_get0_current_payload(struct session *sess, size_t *payload_len);
+
+struct packet;
+const struct packet *session_get0_current_packet(struct session *sess);
+
+#define PACKET_DIRECTION_C2S 0
+#define PACKET_DIRECTION_S2C 1
+int packet_get_direction(const struct packet *pkt);
+const char *packet_get0_data(const struct packet *pkt, size_t *data_len);
+
+#define SESS_EV_OPENING 1<<1
+#define SESS_EV_PACKET 1<<2
+#define SESS_EV_CLOSING 1<<3
+
+#define SESS_EV_TCP 1<<4
+#define SESS_EV_UDP 1<<5
+
+typedef int session_event_cb_func(struct session *sess, int events, const struct packet *pkt, void *cb_arg);
+
+struct session_event;
+//return plugin_id
+int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg);// register intrinsic event
+struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id);
+
+#include <sys/time.h>
+
+struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg);
+int session_event_add(struct session_event *ev, const struct timeval *timeout);
+int session_event_del(struct session_event *ev);
+int session_event_assign(struct session_event *ev, struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg);
+void session_event_free(struct session_event *ev); \ No newline at end of file
diff --git a/include/stellar/session_exdata.h b/include/stellar/session_exdata.h
new file mode 100644
index 0000000..b1febc2
--- /dev/null
+++ b/include/stellar/session_exdata.h
@@ -0,0 +1,10 @@
+#pragma once
+
+#include "session.h"
+#include "stellar.h"
+
+typedef void session_ex_free(struct session *sess, int idx, void *ex_ptr, void *arg);
+int stellar_session_get_ex_new_index(struct stellar *st, const char *name, session_ex_free *free_func,void *arg);
+int session_set_ex_data(struct session *sess, int idx, void *ex_ptr);
+void *session_get_ex_data(struct session *sess, int idx);
+
diff --git a/include/stellar/session_mq.h b/include/stellar/session_mq.h
new file mode 100644
index 0000000..72c5af4
--- /dev/null
+++ b/include/stellar/session_mq.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include "session.h"
+#include "stellar.h"
+
+#include <sys/time.h>
+
+//session mq
+typedef void msg_free_cb_func(void *msg, void *cb_arg);
+typedef int on_msg_cb_func(struct session *sess, int topic_id, const void *msg, void *cb_arg);
+
+//return topic_id
+int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg);
+int session_mq_get_topic_id(struct stellar *st, const char *topic_name);
+
+int session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *free_cb, void *cb_arg);
+
+int session_mq_destroy_topic(struct stellar *st, int topic_id);
+
+//return sub_id >=0 if success, otherwise return -1.
+int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg);
+
+int session_mq_publish_message(struct session *sess, int topic_id, void *msg);
+int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id); \ No newline at end of file
diff --git a/include/stellar/stellar.h b/include/stellar/stellar.h
new file mode 100644
index 0000000..f7cd441
--- /dev/null
+++ b/include/stellar/stellar.h
@@ -0,0 +1,24 @@
+#pragma once
+
+#include <sys/time.h>
+
+struct stellar;
+
+int stellar_get_worker_thread_num(struct stellar *st);
+int stellar_get_current_thread_id(struct stellar *st);
+
+typedef void *plugin_init_callback(struct stellar *st);
+typedef void plugin_exit_callback(void *plugin_ctx);
+
+typedef int stellar_periodic_cb_func(struct stellar *st, void *cb_arg);
+
+int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval);
+
+struct plugin_specific
+{
+ plugin_init_callback *init_cb;
+ plugin_exit_callback *exit_cb;
+};
+
+struct stellar *stellar_init(struct plugin_specific specs[] , int specs_num);
+void stellar_exit(struct stellar *); \ No newline at end of file
diff --git a/sdk/include/utils.h b/include/stellar/utils.h
index 7beafbb..7beafbb 100644
--- a/sdk/include/utils.h
+++ b/include/stellar/utils.h
diff --git a/sdk/include/plugin_spec.h b/sdk/include/plugin_spec.h
deleted file mode 100644
index e516c20..0000000
--- a/sdk/include/plugin_spec.h
+++ /dev/null
@@ -1,8 +0,0 @@
-#pragma once
-
-#include "session.h"
-
-typedef int plugin_init_callback(plugin_id_t plugin_id);
-typedef void plugin_exit_callback(void);
-typedef void plugin_session_callback(struct session *session, enum session_state state, int thread_id);
-
diff --git a/sdk/include/session.h b/sdk/include/session.h
deleted file mode 100644
index c251f03..0000000
--- a/sdk/include/session.h
+++ /dev/null
@@ -1,66 +0,0 @@
-#pragma once
-
-#include <stdint.h>
-#include <stddef.h>
-
-enum session_type
-{
- SESSION_TYPE_CUSTOM,
- SESSION_TYPE_TCP,
- SESSION_TYPE_UDP,
- SESSION_TYPE_MAX,
-};
-
-enum session_state
-{
- SESSION_STATE_INVALID = 0,
- SESSION_STATE_OPENING = 1 ,
- SESSION_STATE_ACTIVE = 2,
- SESSION_STATE_CLOSING = 3,
- __SESSION_STATE_MAX,
-};
-
-
-struct session;
-
-#define SESSION_SEEN_C2S_FLOW (1 >> 1)
-#define SESSION_SEEN_S2C_FLOW (1 >> 2)
-int session_is_symmetric(const struct session *sess, unsigned char *flag);
-
-#define SESSION_DIRECTION_IN 0
-#define SESSION_DIRECTION_OUT 1
-int session_get_direction(const struct session *sess);
-
-const char *session_get_readable_addr(struct session *sess);
-const char *session_get_current_payload(struct session *sess, size_t *payload_len);
-
-struct packet;
-const struct packet *session_get_current_packet(struct session *sess);
-
-#define PACKET_DIRECTION_C2S 0
-#define PACKET_DIRECTION_S2C 1
-int packet_get_direction(struct packet *pkt);
-const char *packet_get_data(const struct packet *pkt, size_t *data_len);
-
-typedef int plugin_id_t;
-//session mq
-typedef void msg_free_cb_func(void *msg, void *cb_arg);
-typedef int on_msg_cb_func(struct session *sess, const char *topic_name, const void *msg, void *cb_arg);
-
-int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg);
-int session_mq_destroy_topic(const char *topic_name);
-int session_publish_message(struct session *sess, const char *topic_name, void *msg);
-int session_ignore_message(struct session *sess, const char *topic_name, int sub_id);
-
-//return sub_id >=0 if success, otherwise return -1.
-int session_mq_subscribe_topic(const char *topic_name, on_msg_cb_func *sub_cb, void *cb_arg);
-
-
-typedef void session_ex_free(struct session *sess, int idx, void *ex_ptr, void *arg);
-int session_get_ex_new_index(const char *name, session_ex_free *free_func,void *arg); // Only support in INIT stage
-int session_set_ex_data(struct session *sess, int idx, void *ex_ptr);
-void *session_get_ex_data(struct session *sess, int idx);
-
-void session_dettach_plugin(struct session *sess, plugin_id_t self);
-void session_attach_plugin(struct session *sess, plugin_id_t self);
-
diff --git a/src/adapter/CMakeLists.txt b/src/adapter/CMakeLists.txt
index 89923ed..d31f587 100644
--- a/src/adapter/CMakeLists.txt
+++ b/src/adapter/CMakeLists.txt
@@ -1,3 +1,3 @@
set(CMAKE_C_FLAGS "-std=c99")
add_definitions(-fPIC)
-add_library(adapter STATIC adapter.c session_manager.c) \ No newline at end of file
+add_library(adapter STATIC stellar.c adapter.c session_manager.c) \ No newline at end of file
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index 0114c97..be284c0 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -1,5 +1,9 @@
-#include "session.h"
-#include "utils.h"
+#include "stellar/session.h"
+#include "stellar/stellar.h"
+#include "stellar/utils.h"
+#include "stellar/session_exdata.h"
+
+#include "stellar_internal.h"
#include "adapter.h"
#include "session_manager.h"
@@ -7,112 +11,17 @@
#include <MESA/stream.h>
-UT_array *global_plugin_specs=0;
-int global_plugin_runtime_exdata_idx=-1;
-
-UT_icd plugin_specs_icd = {sizeof(struct plugin_specific), NULL, NULL, NULL};
-static void exdata_plugin_rt_free(struct session *sess, int idx, void *ex_ptr, void *arg)
+inline void adapter_session_poll(struct session *sess)
{
- if(ex_ptr)
- {
- FREE(ex_ptr);
- }
- return;
+ session_defer_loop(sess);
}
-int adapter_init(struct plugin_specific specs[] , int spec_num)
-{
- utarray_new(global_plugin_specs,&plugin_specs_icd);
- utarray_reserve(global_plugin_specs, spec_num);
- int init_cb_ret = 0;
- global_plugin_runtime_exdata_idx=session_get_ex_new_index("__SYSTEM__", exdata_plugin_rt_free, NULL);
- for(int i = 0; i < spec_num; i++)
- {
- if (specs[i].init_cb != NULL)
- {
- init_cb_ret=specs[i].init_cb(i);
- if(init_cb_ret < 0)
- {
- goto ADAPTER_INIT_ERROR;
- }
- utarray_push_back(global_plugin_specs, &specs[i]);
- }
- }
- return 0;
-ADAPTER_INIT_ERROR:
- adapter_exit();
- return -1;
-}
-void adapter_exit()
-{
- struct plugin_specific *p=NULL;
- while ((p = (struct plugin_specific *)utarray_next(global_plugin_specs, p)))
- {
- if(p->exit_cb != NULL)
- {
- p->exit_cb();
- }
- }
- session_manager_cleanup();
- utarray_free(global_plugin_specs);
- return;
-};
-
-
-struct plugin_runtime_exdata
-{
- plugin_session_callback *session_entry_cb;
- int session_interest_state;
-};
-
-static struct plugin_runtime_exdata* adapter_new_plugin_runtime(enum session_type type, UT_array *plugin_specs)
-{
- struct plugin_runtime_exdata *plugin_rt = NULL;
- unsigned int len = utarray_len(plugin_specs);
- if (len > 0)
- {
- plugin_rt=CALLOC(struct plugin_runtime_exdata, len);
- for (unsigned int i = 0; i < len; i++)
- {
- struct plugin_specific *spec = (struct plugin_specific *)utarray_eltptr(plugin_specs, i);
- struct plugin_runtime_exdata *exdata = plugin_rt+i;
- if (type == SESSION_TYPE_TCP)
- {
- exdata->session_entry_cb = spec->tcp_session_entry_cb;
- exdata->session_interest_state = spec->tcp_session_interest_state;
- }
- if (type == SESSION_TYPE_UDP)
- {
- exdata->session_entry_cb = spec->tcp_session_entry_cb;
- exdata->session_interest_state = spec->tcp_session_interest_state;
- }
- }
- }
- return plugin_rt;
-}
-
-static void adapter_dispatch_session(int plugin_rt_exdata_idx, struct session *sess, enum session_state state, int thread_id)
-{
- struct plugin_runtime_exdata *plugin_rt = (struct plugin_runtime_exdata *)session_get_ex_data(sess, plugin_rt_exdata_idx);
- struct plugin_runtime_exdata *p = NULL;
- unsigned int len = utarray_len(global_plugin_specs);
- for(unsigned i = 0; i < len; i++)
- {
- p = (plugin_rt+i);
- if(p->session_entry_cb != NULL && (p->session_interest_state & state))
- {
- p->session_entry_cb(sess, state, thread_id);
- }
- }
- session_mq_loop(sess);
- return;
-}
struct streaminfo;
//streaminfo open
-void adapter_session_open(struct streaminfo *stream, struct session **sess, int thread_id)
+struct session *adapter_session_open(struct stellar *st, struct streaminfo *stream, void *a_packet)
{
enum session_type type;
if(stream->type==STREAM_TYPE_TCP)
@@ -125,84 +34,64 @@ void adapter_session_open(struct streaminfo *stream, struct session **sess, int
}
else
{
- return;
+ return NULL;
}
- *sess=session_new(stream, type, thread_id);
- struct plugin_runtime_exdata*plugin_rt=adapter_new_plugin_runtime(type, global_plugin_specs);
- session_set_ex_data(*sess, global_plugin_runtime_exdata_idx, plugin_rt);
- adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_OPENING, thread_id);
- return;
+ struct session *sess=session_new(st, stream, type, stream->threadnum);
+ session_dispatch(sess, SESSION_STATE_OPENING, a_packet);
+ return sess;
}
-void adapter_session_active(struct streaminfo *stream, struct session **sess, int thread_id)
+void adapter_session_active(struct streaminfo *stream, struct session *sess, void *a_packet)
{
- if (*sess)
+ if (sess)
{
- session_set_state(*sess, SESSION_STATE_ACTIVE, thread_id);
- adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_ACTIVE, thread_id);
+ session_dispatch( sess, SESSION_STATE_ACTIVE, a_packet);
}
return;
}
//streaminfo close, or firewall active close streaminfo
-void adapter_session_close(struct streaminfo *stream, struct session **sess, int thread_id)
-{
- if(*sess)
- {
- session_set_state(*sess, SESSION_STATE_CLOSING, thread_id);
- adapter_dispatch_session(global_plugin_runtime_exdata_idx, *sess, SESSION_STATE_CLOSING, thread_id);
- session_free(*sess, stream->threadnum);
- }
- return;
-}
-
-void session_dettach_plugin(struct session *sess, plugin_id_t self)
+void adapter_session_close(struct streaminfo *stream, struct session *sess, void *a_packet)
{
- if(sess==NULL)return;
- struct plugin_runtime_exdata *plugin_rt=(struct plugin_runtime_exdata *)session_get_ex_data(sess, global_plugin_runtime_exdata_idx);
- struct plugin_runtime_exdata *p=(plugin_rt+(unsigned int)self);
- if(p)
+ if(sess)
{
- p->session_interest_state=SESSION_STATE_INVALID;
+ session_dispatch(sess, SESSION_STATE_CLOSING, a_packet);
+ session_free(sess);
}
return;
}
-void session_attach_plugin(struct session *sess, plugin_id_t self)
-{
- if(sess==NULL)return;
- struct plugin_runtime_exdata *plugin_rt=(struct plugin_runtime_exdata *)session_get_ex_data(sess, global_plugin_runtime_exdata_idx);
- struct plugin_runtime_exdata *p=(plugin_rt+(unsigned int)self);
- if(p)
- {
- p->session_interest_state=__SESSION_STATE_MAX;
- }
- return;
-}
-
-const char* session_get_readable_addr(struct session *sess)
+const char* session_get0_readable_addr(struct session *sess)
{
return printaddr(&sess->stream->addr, sess->stream->threadnum);
}
-const char *session_get_current_payload(struct session *sess, size_t *payload_len)
+const char *session_get0_current_payload(struct session *sess, size_t *payload_len)
{
*payload_len = (size_t)sess->stream->ptcpdetail->datalen;
return (const char*)sess->stream->ptcpdetail->pdata;
}
-const struct packet *session_get_current_packet(struct session *sess)
+const struct packet *session_get0_current_packet(struct session *sess)
{
return (const struct packet *)get_rawpkt_from_streaminfo(sess->stream);
}
-const char *packet_get_data(const struct packet *pkt, size_t *data_len)
+int packet_get_direction(const struct packet *pkt)
+{
+ // TODO:
+ return -1;
+}
+
+const char *packet_get0_data(const struct packet *pkt, size_t *data_len)
{
+ // TODO:
return NULL;
}
int session_get_direction(const struct session *sess)
{
+ // TODO: check route dir
if(sess->stream)
{
if(sess->stream->curdir==DIR_C2S)
@@ -237,4 +126,15 @@ int session_is_symmetric(const struct session *sess, unsigned char *flag)
}
}
return is_symmetric;
-} \ No newline at end of file
+}
+
+int stellar_get_worker_thread_num(struct stellar *st)
+{
+ return get_thread_count();
+}
+
+int stellar_get_current_thread_id(struct stellar *st)
+{
+ // TODO:
+ return 0;
+} \ No newline at end of file
diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h
index 55fadbd..da902f9 100644
--- a/src/adapter/adapter.h
+++ b/src/adapter/adapter.h
@@ -1,30 +1,14 @@
-#include "session.h"
-#include "plugin_spec.h"
-
-#include <stdint.h>
-
-struct plugin_specific
-{
- plugin_init_callback *init_cb;
- plugin_exit_callback *exit_cb;
- plugin_session_callback *udp_session_entry_cb;
- plugin_session_callback *tcp_session_entry_cb;
- int udp_session_interest_state;
- int tcp_session_interest_state;
-
-};
-
-int adapter_init(struct plugin_specific specs[] , int specs_num);
-void adapter_exit();
-
-
+#pragma once
+#include "stellar/session.h"
struct streaminfo;
//streaminfo open
-void adapter_session_open(struct streaminfo *stream, struct session **session, int thread_id);
-void adapter_session_active(struct streaminfo *stream, struct session **session, int thread_id);
+struct session *adapter_session_open(struct stellar *st, struct streaminfo *stream, void *a_packet);
+void adapter_session_active(struct streaminfo *stream, struct session *session, void *a_packet);
//streaminfo close, or firewall active close streaminfo
-void adapter_session_close(struct streaminfo *stream, struct session **session, int thread_id); \ No newline at end of file
+void adapter_session_close(struct streaminfo *stream, struct session *session, void *a_packet);
+
+void adapter_session_poll(struct session *sess); \ No newline at end of file
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
index 96d9f2b..2701ac1 100644
--- a/src/adapter/session_manager.c
+++ b/src/adapter/session_manager.c
@@ -1,11 +1,15 @@
-#include "utils.h"
+#include "stellar/session.h"
+#include "stellar/stellar.h"
+#include "stellar/utils.h"
+#include "stellar/session_exdata.h"
+#include "stellar/session_mq.h"
#include "session_manager.h"
+#include "stellar_internal.h"
-#include "uthash/uthash.h"
#include "uthash/utarray.h"
-#include <sys/queue.h>
+#include "uthash/utlist.h"
-struct session_exdata_meta
+struct session_exdata_schema
{
char *name;
session_ex_free *free_func;
@@ -13,47 +17,48 @@ struct session_exdata_meta
int idx;
};
-static void session_exdata_met_copy(void *_dst, const void *_src)
+static void session_exdata_met_copy(void *_dst, const void *_src)
{
- struct session_exdata_meta *dst = (struct session_exdata_meta*)_dst, *src = (struct session_exdata_meta*)_src;
- dst->free_func = src->free_func;
- dst->arg = src->arg;
- dst->idx = src->idx;
- dst->name = src->name ? strdup(src->name) : NULL;
+ struct session_exdata_schema *dst = (struct session_exdata_schema *)_dst, *src = (struct session_exdata_schema *)_src;
+ dst->free_func = src->free_func;
+ dst->arg = src->arg;
+ dst->idx = src->idx;
+ dst->name = src->name ? strdup(src->name) : NULL;
}
-static void session_exdata_met_dtor(void *_elt)
+static void session_exdata_met_dtor(void *_elt)
{
- struct session_exdata_meta *elt = (struct session_exdata_meta*)_elt;
- if (elt->name) FREE(elt->name);
+ struct session_exdata_schema *elt = (struct session_exdata_schema *)_elt;
+ if (elt->name)
+ FREE(elt->name);
}
-UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_meta), NULL, session_exdata_met_copy, session_exdata_met_dtor};
-UT_array *global_session_exdata_metas = NULL;
+UT_icd session_exdata_meta_icd = {sizeof(struct session_exdata_schema), NULL, session_exdata_met_copy, session_exdata_met_dtor};
-int session_get_ex_new_index(const char *name, session_ex_free *free_func,void *arg)
+
+int stellar_session_get_ex_new_index(struct stellar *st, const char *name, session_ex_free *free_func,void *arg)
{
- if(global_session_exdata_metas == NULL)
+ if(st->session_exdata_schema_array == NULL)
{
- utarray_new(global_session_exdata_metas, &session_exdata_meta_icd);
+ utarray_new(st->session_exdata_schema_array, &session_exdata_meta_icd);
}
- unsigned int len = utarray_len(global_session_exdata_metas);
- struct session_exdata_meta *t_meta;
+ unsigned int len = utarray_len(st->session_exdata_schema_array);
+ struct session_exdata_schema *t_schema;
for(unsigned int i = 0; i < len; i++)
{
- t_meta = (struct session_exdata_meta *)utarray_eltptr(global_session_exdata_metas, i);
- if(strcmp(t_meta->name, name) == 0)
+ t_schema = (struct session_exdata_schema *)utarray_eltptr(st->session_exdata_schema_array, i);
+ if(strcmp(t_schema->name, name) == 0)
{
- return t_meta->idx;
+ return t_schema->idx;
}
}
- struct session_exdata_meta meta;
- meta.free_func=free_func;
- meta.name=(char *)name;
- meta.idx=len;
- meta.arg=arg;
- utarray_push_back(global_session_exdata_metas, &meta);
- return meta.idx;
+ struct session_exdata_schema new_schema;
+ new_schema.free_func=free_func;
+ new_schema.name=(char *)name;
+ new_schema.idx=len;
+ new_schema.arg=arg;
+ utarray_push_back(st->session_exdata_schema_array, &new_schema);
+ return new_schema.idx;
}
struct session_exdata_runtime
@@ -64,15 +69,15 @@ struct session_exdata_runtime
int session_set_ex_data(struct session *sess, int idx, void *ex_ptr)
{
if(sess->ex_data_rt == NULL)return -1;
- unsigned int len=utarray_len(global_session_exdata_metas);
+ unsigned int len=utarray_len(sess->st->session_exdata_schema_array);
if(len < (unsigned int)idx)return -1;
struct session_exdata_runtime* exdata = (struct session_exdata_runtime*)(sess->ex_data_rt+idx);
- struct session_exdata_meta* meta = (struct session_exdata_meta*)utarray_eltptr(global_session_exdata_metas, (unsigned int)idx);
+ struct session_exdata_schema* schema = (struct session_exdata_schema*)utarray_eltptr(sess->st->session_exdata_schema_array, (unsigned int)idx);
if(exdata)
{
- if(meta->free_func && exdata->data)
+ if(schema->free_func && exdata->data)
{
- meta->free_func(sess, idx, exdata->data, meta->arg);
+ schema->free_func(sess, idx, exdata->data, schema->arg);
}
exdata->data = ex_ptr;
}
@@ -82,248 +87,179 @@ int session_set_ex_data(struct session *sess, int idx, void *ex_ptr)
void *session_get_ex_data(struct session *sess, int idx)
{
if(sess->ex_data_rt == NULL)return NULL;
- unsigned int len = utarray_len(global_session_exdata_metas);
+ unsigned int len = utarray_len(sess->st->session_exdata_schema_array);
if(len < (unsigned int)idx)return NULL;
struct session_exdata_runtime* exdata = (struct session_exdata_runtime*)(sess->ex_data_rt+idx);
return exdata->data;
}
-struct session_mq_node
+typedef struct session_mq
{
- const char *topic_name;
+ int topic_id;
void *message;
size_t message_len;
- STAILQ_ENTRY(session_mq_node) entries;
-};
-
-struct session_mq
-{
- STAILQ_HEAD(session_mq_head, session_mq_node) head;
-};
+ struct session_mq *next, *prev;
+}session_mq;
-
-struct subscriber
+typedef struct session_mq_subscriber
{
on_msg_cb_func *sub_cb;
void *cb_arg;
- STAILQ_ENTRY(subscriber) entries;
-};
+ struct session_mq_subscriber *next, *prev;
+}session_mq_subscribers;
struct session_mq_topic_schema
{
char *topic_name;
msg_free_cb_func *free_cb;
void *cb_arg;
- STAILQ_HEAD(sub_q_head, subscriber) subscribers;
- UT_hash_handle hh;
+ int topic_id;
+ int sub_cnt;
+ struct session_mq_subscriber *subscribers;
};
-static struct session_mq_topic_schema *global_session_mq_schema = NULL;
-struct session_mq *session_mq_new(void)
+static void session_mq_topic_schema_copy(void *_dst, const void *_src)
{
- struct session_mq *mq = CALLOC(struct session_mq, 1);
- STAILQ_INIT(&mq->head);
- return mq;
+ struct session_mq_topic_schema *dst = (struct session_mq_topic_schema *)_dst,
+ *src = (struct session_mq_topic_schema *)_src;
+ dst->subscribers = src->subscribers;
+ dst->free_cb = src->free_cb;
+ dst->cb_arg = src->cb_arg;
+ dst->topic_id = src->topic_id;
+ dst->sub_cnt = src->sub_cnt;
+ dst->topic_name = src->topic_name ? strdup(src->topic_name) : NULL;
}
-void session_mq_free(struct session_mq *mq)
+static void session_mq_topic_schema_dtor(void *_elt)
{
- struct session_mq_node *node;
- while ((node = STAILQ_FIRST(&mq->head)) != NULL)
- {
- STAILQ_REMOVE_HEAD(&mq->head, entries);
- FREE(node);
- }
- FREE(mq);
+ struct session_mq_topic_schema *elt = (struct session_mq_topic_schema *)_elt;
+ if (elt->topic_name)
+ FREE(elt->topic_name);
+ // FREE(elt); // free the item
}
-static struct session_mq_topic_schema *session_mq_topic2schema(struct session_mq_topic_schema *topic_schema, const char *topic_name)
+UT_icd session_mq_topic_schema_icd = {sizeof(struct session_mq_topic_schema), NULL, session_mq_topic_schema_copy, session_mq_topic_schema_dtor};
+
+
+void session_mq_free(struct session_mq *head)
{
- struct session_mq_topic_schema *schema;
- HASH_FIND_STR(topic_schema, topic_name, schema);
- return schema;
-};
+ struct session_mq *elt, *tmp;
+ DL_FOREACH_SAFE(head, elt, tmp)
+ {
+ DL_DELETE(head, elt);
+ free(elt);
+ }
+ free(head);
+}
+
-static void session_mq_cleanup(struct session_mq_topic_schema *schema)
+
+int session_mq_get_topic_id(struct stellar *st, const char *topic_name)
{
- struct session_mq_topic_schema *current, *tmp;
- HASH_ITER(hh, schema, current, tmp)
+ if(topic_name == NULL || st == NULL || st->session_mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ struct session_mq_topic_schema *t_schema;
+ for(unsigned int i = 0; i < len; i++)
{
- HASH_DEL(schema, current); // delete frome global_session_mq_schema
- // free topic_name
- if (current->topic_name)
+ t_schema = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, i);
+ if(strcmp(t_schema->topic_name, topic_name) == 0)
{
- FREE(current->topic_name);
+ return i;
}
-
- // cleanup STAILQ
- struct subscriber *sub;
- while (!STAILQ_EMPTY(&current->subscribers))
- {
- sub = STAILQ_FIRST(&current->subscribers);
- STAILQ_REMOVE_HEAD(&current->subscribers, entries);
- FREE(sub); // free subscriber
- }
- FREE(current); // free the item
- }
- return;
+ }
+ return -1;
}
-void session_manager_cleanup()
+int session_mq_update_topic(struct stellar *st, int topic_id, msg_free_cb_func *free_cb, void *cb_arg)
{
- session_mq_cleanup(global_session_mq_schema);
- utarray_free(global_session_exdata_metas);
+ if(st->session_mq_schema_array == NULL)return -1;
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ if(len < (unsigned int)topic_id)return -1;
+ struct session_mq_topic_schema *t_schema = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id);
+ if(t_schema == NULL)return -1;
+ t_schema->free_cb=free_cb;
+ t_schema->cb_arg=cb_arg;
+ return 0;
}
-void session_mq_loop(struct session *sess)
+int session_mq_create_topic(struct stellar *st, const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg)
{
- struct session_mq_node *node;
- struct subscriber *sub;
- struct session_mq *mq = sess->mq;
- while (!STAILQ_EMPTY(&mq->head))
+ if(st->session_mq_schema_array == NULL)
{
- node = STAILQ_FIRST(&mq->head);
- STAILQ_REMOVE_HEAD(&mq->head, entries);
- struct session_mq_topic_schema *topic= session_mq_topic2schema(global_session_mq_schema, node->topic_name);
- if(topic != NULL)
- {
- STAILQ_FOREACH(sub, &topic->subscribers, entries)
- {
- sub->sub_cb(sess, node->topic_name, node->message, sub->cb_arg);
- }
- if(topic->free_cb != NULL)
- {
- topic->free_cb(node->message, topic->cb_arg);
- }
- }
- FREE(node);
+ utarray_new(st->session_mq_schema_array, &session_mq_topic_schema_icd);
}
- return;
-};
-
-int session_mq_create_topic(const char *topic_name, msg_free_cb_func *free_cb, void *cb_arg)
-{
- struct session_mq_topic_schema *topic;
- // check if topic_name exist
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- //if found and free_cb is not NULL, it's a exist producer
- if (topic != NULL && topic->free_cb != NULL)
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ if(session_mq_get_topic_id(st, topic_name) >= 0)
{
- return -1; // exist
+ return -1;
}
- // if topic not found, create new topic, called by consumer or producer
- if (topic == NULL)
- {
- topic = CALLOC(struct session_mq_topic_schema, 1);
- topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储
- HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic);
- if (STAILQ_EMPTY(&topic->subscribers))
- {
- STAILQ_INIT(&topic->subscribers);
- }
- }
- //if found but free_cb is NULL, it's a consumer's placeholder, now filled by producer
- if (free_cb != NULL)
- {
- topic->free_cb = free_cb;
- topic->cb_arg = cb_arg;
- }
- return 0; // success
+ struct session_mq_topic_schema t_schema;
+ t_schema.free_cb=free_cb;
+ t_schema.topic_name=(char *)topic_name;
+ t_schema.topic_id=len;
+ t_schema.cb_arg=cb_arg;
+ t_schema.subscribers=NULL;
+ t_schema.sub_cnt=0;
+ utarray_push_back(st->session_mq_schema_array, &t_schema);
+ return t_schema.topic_id;
}
-int session_mq_destroy_topic(const char *topic_name) {
- struct session_mq_topic_schema *topic;
-
- // find topic in global schma
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- if (topic == NULL)
- {
- return -1; // not found
+int session_mq_destroy_topic(struct stellar *st, int topic_id)
+{
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)
+ return -1;
+ struct session_mq_topic_schema *topic =
+ (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id);
+ struct session_mq_subscriber *sub_elt, *sub_tmp;
+
+ if (topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ DL_DELETE(topic->subscribers, sub_elt);
+ free(sub_elt);
+ }
}
- // delete topic from global schema
- HASH_DEL(global_session_mq_schema, topic);
- // free topic name
- FREE(topic->topic_name);
-
- struct subscriber *sub;
- while ((sub = STAILQ_FIRST(&topic->subscribers)) != NULL)
- {
- STAILQ_REMOVE_HEAD(&topic->subscribers, entries);
- FREE(sub);
- }
- // free topic itself
- FREE(topic);
return 0; // success
}
-int session_publish_message(struct session *sess, const char *topic_name, void *data)
+
+int session_mq_publish_message(struct session *sess, int topic_id, void *data)
{
- struct session_mq_topic_schema *topic;
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- if (topic == NULL)
- {
- return -1;
- }
- struct session_mq *mq=sess->mq;
- struct session_mq_node *node= CALLOC(struct session_mq_node,1);
- node->topic_name = topic->topic_name;
+ unsigned int len = utarray_len(sess->st->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct session_mq *node= CALLOC(struct session_mq,1);
+ node->topic_id = topic_id;
node->message = data;
- STAILQ_INSERT_TAIL(&mq->head, node, entries);
+ DL_APPEND(sess->mq, node);
return 0;
}
-int session_mq_subscribe_topic(const char *topic_name, on_msg_cb_func *sub_cb, void *cb_arg)
+int session_mq_ignore_message(struct session *sess, int topic_id, int sub_id)
{
- struct session_mq_topic_schema *topic;
- session_mq_create_topic(topic_name, NULL, NULL);
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- if (topic == NULL)
- {
- return -1; // topic create failed
- }
- struct subscriber *new_subscriber = CALLOC(struct subscriber,1);
- new_subscriber->sub_cb = sub_cb;
- new_subscriber->cb_arg = cb_arg;
- STAILQ_INSERT_TAIL(&topic->subscribers, new_subscriber, entries);
- return 0;
+ // TODO: implement
+ return -1;
}
-/*
-int session_mq_unsubscribe_topic(const char *topic_name, msg_consume_cb_func *read_cb, void *cb_arg)
-{
- struct session_mq_topic_schema *topic;
- int unsubscribe_count = 0;
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- if (topic == NULL)
- {
- return -1; // opic create failed
- }
- struct subscriber *sub, *tmp_sub;
- sub = STAILQ_FIRST(&topic->subscribers);
- while (sub)
- {
- if (sub->read_cb == read_cb && sub->cb_arg == cb_arg)
- {
- tmp_sub = STAILQ_NEXT(sub, entries);
- STAILQ_REMOVE(&topic->subscribers, sub, subscriber, entries);
- FREE(sub);
- unsubscribe_count++;
- sub = tmp_sub;
- }
- else
- {
- sub = STAILQ_NEXT(sub, entries); // next sub
- }
- }
- return unsubscribe_count;
+int session_mq_subscribe_topic(struct stellar *st, int topic_id, on_msg_cb_func *sub_cb, void *cb_arg)
+{
+ unsigned int len = utarray_len(st->session_mq_schema_array);
+ if (len <= (unsigned int)topic_id)return -1;
+ struct session_mq_topic_schema *topic = (struct session_mq_topic_schema *)utarray_eltptr(st->session_mq_schema_array, (unsigned int)topic_id);
+ struct session_mq_subscriber *new_subscriber = CALLOC(struct session_mq_subscriber,1);
+ new_subscriber->sub_cb = sub_cb;
+ new_subscriber->cb_arg = cb_arg;
+ DL_APPEND(topic->subscribers, new_subscriber);
+ topic->sub_cnt+=1;
+ return topic->sub_cnt;
}
-*/
-static struct session_exdata_runtime *session_new_exdata_rt()
+static struct session_exdata_runtime *session_new_exdata_rt(struct stellar *st)
{
struct session_exdata_runtime *exdata_rt = NULL;
- unsigned int len = utarray_len(global_session_exdata_metas);
+ unsigned int len = utarray_len(st->session_exdata_schema_array);
if(len > 0)
{
exdata_rt=CALLOC(struct session_exdata_runtime, len);
@@ -331,53 +267,222 @@ static struct session_exdata_runtime *session_new_exdata_rt()
return exdata_rt;
}
-static void session_free_exdata_rt(struct session* sess, struct session_exdata_runtime *exdata_rt)
+static void session_free_exdata_runtime(struct session* sess, struct session_exdata_runtime *exdata_rt)
{
if(exdata_rt==NULL)return;
- unsigned int len=utarray_len(global_session_exdata_metas);
+ unsigned int len=utarray_len(sess->st->session_exdata_schema_array);
for (unsigned int i = 0; i < len; i++)
{
struct session_exdata_runtime *exdata = (struct session_exdata_runtime *)(exdata_rt + i);
- struct session_exdata_meta *meta = (struct session_exdata_meta *)utarray_eltptr(global_session_exdata_metas, i);
+ struct session_exdata_schema *schema = (struct session_exdata_schema *)utarray_eltptr(sess->st->session_exdata_schema_array, i);
if (exdata)
{
- if (meta->free_func && exdata->data)
+ if (schema->free_func && exdata->data)
{
- meta->free_func(sess, i, exdata->data, meta->arg);
+ schema->free_func(sess, i, exdata->data, schema->arg);
}
}
}
}
+
+typedef struct session_event
+{
+ struct stellar *st;
+ struct session *sess;
+ session_event_cb_func *sesion_event_cb;
+ struct timeval timeout;
+ void *cb_arg;
+ int events;
+ unsigned short is_delete;
+ unsigned short is_intrinsic;
+ struct session_event *next, *prev;
+}session_event;
+
+void session_dispatch(struct session *sess, enum session_state state, struct packet *pkt)
+{
+ struct session_event *intrinsic_events = (struct session_event *)session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx);
+ struct session_event *p = NULL;
+ unsigned int len = utarray_len(sess->st->plugin_specs_array);
+ sess->state = state;
+ int events=(1<<state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP));// TODO: upgrade
+ for(unsigned i = 0; i < len; i++)
+ {
+ p = (intrinsic_events+i);
+ if(p->sesion_event_cb != NULL && (p->events & events) && (p->is_delete == 0))
+ {
+ p->sesion_event_cb(sess, events, pkt, p->cb_arg);
+ }
+ }
+
+ session_defer_loop(sess);
+ return;
+}
+
+static struct session_event* session_intrinsic_events_init(struct stellar *st, struct session *sess)
+{
+ struct session_event *intrinsic_events = CALLOC(struct session_event, utarray_len(st->intrinsic_session_event_schema_array));
+ for(unsigned i = 0; i < utarray_len(st->intrinsic_session_event_schema_array); i++)
+ {
+ intrinsic_events[i] = *(struct session_event *)utarray_eltptr(st->intrinsic_session_event_schema_array, i);
+ }
+ return intrinsic_events;
+}
+
struct streaminfo;
-struct session *session_new(struct streaminfo* stream, enum session_type type, int thread_id)
+struct session *session_new(struct stellar *st, struct streaminfo* stream, enum session_type type, int thread_id)
{
if(stream == NULL)return NULL;
if(type!=SESSION_TYPE_TCP && type!=SESSION_TYPE_UDP)return NULL;
struct session *sess = CALLOC(struct session, 1);
+ sess->st = st;
sess->stream = stream;
sess->type = type;
sess->state = SESSION_STATE_OPENING;
- sess->mq = session_mq_new();
- sess->ex_data_rt = session_new_exdata_rt();
+ sess->mq = NULL;
+ sess->ex_data_rt = session_new_exdata_rt(st);
+ sess->ev_list=NULL;
+ struct session_event*intrinsic_events=session_intrinsic_events_init(st, sess);
+ session_set_ex_data(sess, st->intrinsic_session_event_exdata_idx, intrinsic_events);
return sess;
}
-void session_free(struct session *sess, int thread_id)
+void session_free(struct session *sess)
{
if(sess==NULL)return;
if(sess->mq != NULL)
{
session_mq_free(sess->mq);
}
- session_free_exdata_rt(sess, sess->ex_data_rt);
+ session_free_exdata_runtime(sess, sess->ex_data_rt);
FREE(sess->ex_data_rt);
FREE(sess);
}
-void session_set_state(struct session *session, enum session_state state, int thread_id)
+UT_icd sesion_event_icd = {sizeof(struct session_event), NULL, NULL, NULL};
+
+struct session_event *session_event_new(struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg)
+{
+ struct session_event *event = CALLOC(struct session_event, 1);
+ event->events = events;
+ event->sesion_event_cb = cb;
+ event->cb_arg = cb_arg;
+ event->sess = sess;
+ event->st = st;
+ return event;
+}
+
+void session_event_free(struct session_event *ev)
+{
+ if(ev==NULL)return;
+ if(ev->is_intrinsic)
+ {
+ ev->is_delete=1;
+ }
+ else
+ {
+ FREE(ev);
+ }
+ return;
+}
+
+int session_event_add(struct session_event *ev, const struct timeval *timeout)
+{
+ // TODO
+ if(timeout)
+ {
+ ev->timeout = *timeout;
+ }
+ // TODO: if is_intrinsic, dup or move to ev_list
+ DL_APPEND(ev->sess->ev_list, ev);
+ return 0;
+}
+
+
+int session_event_assign(struct session_event *ev, struct stellar *st, struct session *sess, int events, session_event_cb_func *cb, void *cb_arg)
+{
+ if(ev==NULL)return -1;
+ ev->events = events;
+ ev->sesion_event_cb = cb;
+ ev->cb_arg = cb_arg;
+ ev->sess = sess;
+ ev->st = st;
+ return 0;
+}
+
+int session_event_del(struct session_event *ev)
+{
+ // TODO:
+ ev->is_delete=1;
+ if(ev->is_intrinsic==0 && (ev->prev != NULL || ev->next != NULL))
+ {
+ DL_DELETE(ev->sess->ev_list, ev);
+ ev->prev = NULL;
+ ev->next = NULL;
+ }
+ return 0;
+}
+
+int stellar_plugin_register(struct stellar *st, int events, session_event_cb_func *cb, void *cb_arg)
+{
+ struct session_event event={.is_intrinsic=1, .events=events, .sesion_event_cb=cb, .cb_arg=cb_arg, .st=st, .sess=NULL, .is_delete=0, .timeout={0,0}, .next=NULL, .prev=NULL};
+ if(st->intrinsic_session_event_schema_array == NULL)
+ {
+ utarray_new(st->intrinsic_session_event_schema_array, &sesion_event_icd);
+ }
+ unsigned int len = utarray_len(st->intrinsic_session_event_schema_array);
+ utarray_push_back(st->intrinsic_session_event_schema_array, &event);
+ return len;
+}
+
+struct session_event *session_get_intrinsic_event(struct session *sess, int plugin_id)
+{
+ unsigned int len = utarray_len(sess->st->intrinsic_session_event_schema_array);
+ if(plugin_id < 0 || plugin_id >= (int)len)return NULL;
+ struct session_event*intrinsic_events=session_get_ex_data(sess, sess->st->intrinsic_session_event_exdata_idx);
+ return (struct session_event *)(intrinsic_events+plugin_id);
+}
+
+void session_defer_loop(struct session *sess)
+{
+
+ struct session_event *ev=NULL, *ev_tmp=NULL;
+ int events=(1<<sess->state|(sess->type==SESSION_TYPE_TCP?SESS_EV_TCP:SESS_EV_UDP));
+ const struct packet *pkt= session_get0_current_packet(sess);
+ DL_FOREACH_SAFE(sess->ev_list, ev, ev_tmp)
+ {
+ if(ev->sesion_event_cb != NULL && (ev->events & events) && (ev->is_delete == 0))
+ {
+ ev->sesion_event_cb(sess, events, pkt, ev->cb_arg);
+ }
+ }
+
+ struct session_mq *mq_elt=NULL, *mq_tmp=NULL;
+ struct session_mq_subscriber *sub_elt, *sub_tmp;
+ struct session_mq_topic_schema *topic;
+ DL_FOREACH_SAFE(sess->mq, mq_elt, mq_tmp)
+ {
+ topic=(struct session_mq_topic_schema *)utarray_eltptr(sess->st->session_mq_schema_array,(unsigned int)(mq_elt->topic_id));
+ if(topic)
+ {
+ DL_FOREACH_SAFE(topic->subscribers, sub_elt, sub_tmp)
+ {
+ sub_elt->sub_cb(sess, mq_elt->topic_id, mq_elt->message, sub_elt->cb_arg);
+ }
+ if(topic->free_cb)
+ {
+ topic->free_cb(mq_elt->message, topic->cb_arg);
+ }
+ }
+ DL_DELETE(sess->mq, mq_elt);
+ free(mq_elt);
+ }
+ return;
+}
+
+int stellar_worker_thread_periodic_add(struct stellar *st, stellar_periodic_cb_func *periodic_cb, void *cb_arg, const struct timeval *interval)
{
- if(session == NULL)return;
- session->state = state;
+ // TODO:
+ return -1;
} \ No newline at end of file
diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h
index 9e2a984..e0b8f5b 100644
--- a/src/adapter/session_manager.h
+++ b/src/adapter/session_manager.h
@@ -1,27 +1,28 @@
#pragma once
-#include "session.h"
-#include "plugin_spec.h"
+#include "stellar/session.h"
+#include "stellar/stellar.h"
#include "uthash/utarray.h"
struct streaminfo;
struct session_exdata_runtime;
+struct session_mq;
struct session
{
enum session_type type;
enum session_state state;
struct streaminfo *stream;
- struct session_mq *mq;
+ struct stellar *st;
struct session_exdata_runtime *ex_data_rt;
+ struct session_mq *mq;
+ struct session_event *ev_list;
};
-struct session *session_new(struct streaminfo* stream, enum session_type type, int thread_id);
-void session_free(struct session *session, int thread_id);
-
-void session_set_state(struct session *session, enum session_state state, int thread_id);
+struct session *session_new(struct stellar *st, struct streaminfo* stream, enum session_type type, int thread_id);
+void session_free(struct session *sess);
-void session_mq_loop(struct session *session);
+void session_defer_loop(struct session *sess);
-void session_manager_cleanup(); \ No newline at end of file
+void session_dispatch(struct session *sess, enum session_state state, struct packet *pkt); \ No newline at end of file
diff --git a/src/adapter/stellar.c b/src/adapter/stellar.c
new file mode 100644
index 0000000..ee28f33
--- /dev/null
+++ b/src/adapter/stellar.c
@@ -0,0 +1,68 @@
+#include "stellar_internal.h"
+#include "session_manager.h"
+
+#include "stellar/utils.h"
+#include "stellar/session_exdata.h"
+
+
+struct plugin_specific_inernal
+{
+ struct plugin_specific specs;
+ void *plugin_ctx;
+};
+
+UT_icd plugin_specs_icd = {sizeof(struct plugin_specific_inernal), NULL, NULL, NULL};
+
+static void exdata_plugin_rt_free(struct session *sess, int idx, void *ex_ptr, void *arg)
+{
+ if(ex_ptr)
+ {
+ FREE(ex_ptr);
+ }
+ return;
+}
+
+
+struct stellar *stellar_init(struct plugin_specific specs[] , int spec_num)
+{
+ struct stellar *st = CALLOC(struct stellar, 1);
+ struct plugin_specific_inernal spec_internal={};
+ utarray_new(st->plugin_specs_array,&plugin_specs_icd);
+ utarray_reserve(st->plugin_specs_array, spec_num);
+ st->intrinsic_session_event_exdata_idx=stellar_session_get_ex_new_index(st, "__STELLAR__", exdata_plugin_rt_free, NULL);
+ for(int i = 0; i < spec_num; i++)
+ {
+ if (specs[i].init_cb != NULL)
+ {
+ spec_internal.plugin_ctx=specs[i].init_cb(st);
+ if(spec_internal.plugin_ctx == NULL)
+ {
+ goto ADAPTER_INIT_ERROR;
+ }
+ spec_internal.specs=specs[i];
+ utarray_push_back(st->plugin_specs_array, &spec_internal);
+ }
+ }
+ return st;
+ADAPTER_INIT_ERROR:
+ stellar_exit(st);
+ return NULL;
+}
+
+void stellar_exit(struct stellar *st)
+{
+ struct plugin_specific_inernal *p=NULL;
+ while ((p = (struct plugin_specific_inernal *)utarray_next(st->plugin_specs_array, p)))
+ {
+ if(p->specs.exit_cb != NULL)
+ {
+ p->specs.exit_cb(p->plugin_ctx);
+ }
+ }
+ utarray_free(st->session_mq_schema_array);
+ utarray_free(st->session_exdata_schema_array);
+ utarray_free(st->intrinsic_session_event_schema_array);
+ utarray_free(st->plugin_specs_array);
+ FREE(st);
+ return;
+}; \ No newline at end of file
diff --git a/src/adapter/stellar_internal.h b/src/adapter/stellar_internal.h
new file mode 100644
index 0000000..f528e62
--- /dev/null
+++ b/src/adapter/stellar_internal.h
@@ -0,0 +1,13 @@
+#pragma once
+#include "stellar/stellar.h"
+
+#include "uthash/utarray.h"
+
+struct stellar
+{
+ int intrinsic_session_event_exdata_idx;
+ UT_array *intrinsic_session_event_schema_array;
+ UT_array *plugin_specs_array;
+ UT_array *session_mq_schema_array;
+ UT_array *session_exdata_schema_array;
+}; \ No newline at end of file
diff --git a/src/adapter/test/CMakeLists.txt b/src/adapter/test/CMakeLists.txt
deleted file mode 100644
index 930ad08..0000000
--- a/src/adapter/test/CMakeLists.txt
+++ /dev/null
@@ -1,5 +0,0 @@
-add_definitions(-fPIC)
-
-add_library(test_loader SHARED test_loader.c test_plugin.c)
-target_link_libraries(test_loader adapter)
-set_target_properties(test_loader PROPERTIES PREFIX "")
diff --git a/src/adapter/test/test_loader.c b/src/adapter/test/test_loader.c
deleted file mode 100644
index ec993de..0000000
--- a/src/adapter/test/test_loader.c
+++ /dev/null
@@ -1,149 +0,0 @@
-#include "session.h"
-#include "utils.h"
-#include "test_plugin.h"
-#include "../adapter.h"
-#include "../session_manager.h"
-
-#include <MESA/stream.h>
-#include <stdio.h>
-
-struct plugin_specific g_plugin_schema[] =
-{
- {
- .init_cb = test_entry_plugin_init,
- .exit_cb = test_entry_plugin_exit,
- .udp_session_entry_cb=test_entry_plugin_entry,
- .tcp_session_entry_cb=test_entry_plugin_entry,
- .udp_session_interest_state=__SESSION_STATE_MAX,
- .tcp_session_interest_state=__SESSION_STATE_MAX,
- },
- {
- .init_cb = test_mq_plugin_init,
- .exit_cb = test_mq_plugin_exit,
- .udp_session_entry_cb=test_mq_plugin_entry,
- .tcp_session_entry_cb=test_mq_plugin_entry,
- .udp_session_interest_state=__SESSION_STATE_MAX,
- .tcp_session_interest_state=__SESSION_STATE_MAX,
- },
-};
-
-struct test_stream_ctx
-{
- uint32_t c2s_pkts;
- uint32_t c2s_bytes;
- uint32_t s2c_pkts;
- uint32_t s2c_bytes;
- struct session *sess;
-};
-
-static void test_mq_topic_free(void *data, void *cb_arg)
-{
- FREE(data);
- return;
-}
-
-int test_mq_loader_read(struct session *sess, const char* topic_name, const void *data, void *cb_arg)
-{
- struct test_stream_ctx *ctx =(struct test_stream_ctx *)data;
- printf("loader_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
- return 0;
-}
-
-int TEST_INIT()
-{
- int ret = adapter_init(g_plugin_schema, 2);
- if(ret != 0)return -1;
-
- session_mq_create_topic("test_mq_topic", test_mq_topic_free, NULL);
- session_mq_subscribe_topic("test_mq_topic", test_mq_loader_read, NULL);
- return 0;
-}
-
-void TEST_EXIT(void)
-{
-
- session_mq_destroy_topic("test_mq_topic");
- adapter_exit();
- return;
-}
-
-
-static void print_stream_ctx(struct streaminfo *pstream, struct test_stream_ctx *ctx)
-{
- printf("stream-----------%20s: ", printaddr(&(pstream->addr), pstream->threadnum));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
- return;
-}
-
-static struct test_stream_ctx *stream_ctx_dup(const struct test_stream_ctx *origin)
-{
- struct test_stream_ctx *ctx=CALLOC(struct test_stream_ctx,1);
- memcpy(ctx, origin, sizeof(struct test_stream_ctx));
- return ctx;
-}
-
-static void loader_transfer_stream_entry(struct streaminfo *pstream, UCHAR state, void **pme, int thread_seq,void *a_packet)
-{
- struct test_stream_ctx *ctx=(struct test_stream_ctx *)*pme;
- struct tcpdetail *pdetail=(struct tcpdetail *)pstream->pdetail;
- if(*pme==NULL)
- {
- *pme=CALLOC(struct test_stream_ctx,1);
- ctx=(struct test_stream_ctx *)*pme;
- }
-
- if(a_packet!=NULL)
- {
- if(DIR_C2S == pstream->curdir){
- ctx->c2s_bytes += pdetail->datalen;
- ctx->c2s_pkts++;
- }else{
- ctx->s2c_bytes += pdetail->datalen;
- ctx->s2c_pkts++;
- }
- }
- struct test_stream_ctx *msg;
- switch (state)
- {
- case OP_STATE_PENDING:
- adapter_session_open(pstream, &ctx->sess, thread_seq);
- break;
- case OP_STATE_DATA:
- msg = stream_ctx_dup((const struct test_stream_ctx *)*pme);
- if(session_publish_message(ctx->sess, "test_mq_topic", msg) < 0)
- {
- FREE(msg);
- }
- adapter_session_active(pstream, &ctx->sess, thread_seq);
- break;
- case OP_STATE_CLOSE:
- adapter_session_close(pstream, &ctx->sess, thread_seq);
- print_stream_ctx(pstream, ctx);
- FREE(*pme);
- break;
- default:
- break;
- }
- return;
-}
-
-char test_udp_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
-{
- loader_transfer_stream_entry(pstream, pstream->opstate, pme, thread_seq, a_packet);
- return APP_STATE_GIVEME;
-}
-
-char test_tcpall_stream_entry(struct streaminfo *pstream,void **pme, int thread_seq,void *a_packet)
-{
- loader_transfer_stream_entry(pstream, pstream->pktstate, pme, thread_seq, a_packet);
- return APP_STATE_GIVEME;
-}
diff --git a/src/adapter/test/test_loader.inf b/src/adapter/test/test_loader.inf
deleted file mode 100644
index 47b2fe8..0000000
--- a/src/adapter/test/test_loader.inf
+++ /dev/null
@@ -1,13 +0,0 @@
-[PLUGINFO]
-PLUGNAME=test_app
-SO_PATH=./plug/business/test_loader/test_loader.so
-INIT_FUNC=TEST_INIT
-DESTROY_FUNC=TEST_EXIT
-
-[TCP_ALL]
-FUNC_FLAG=ALL
-FUNC_NAME=test_tcpall_stream_entry
-
-[UDP]
-FUNC_FLAG=ALL
-FUNC_NAME=test_udp_stream_entry \ No newline at end of file
diff --git a/src/adapter/test/test_plugin.c b/src/adapter/test/test_plugin.c
deleted file mode 100644
index 4dcb9d5..0000000
--- a/src/adapter/test/test_plugin.c
+++ /dev/null
@@ -1,127 +0,0 @@
-#include "plugin_spec.h"
-#include "session.h"
-#include "utils.h"
-#include "test_plugin.h"
-
-#include <stdio.h>
-
-int g_test_entry_plugin_id = 0;
-int g_test_mq_plugin_id = 0;
-
-
-int g_test_entry_exdata_idx = 0;
-int g_test_mq_plugin_exdata_idx = 0;
-
-struct test_session_message
-{
- uint32_t c2s_pkts;
- uint32_t c2s_bytes;
- uint32_t s2c_pkts;
- uint32_t s2c_bytes;
-};
-
-static void test_exdata_free(struct session *sess, int idx, void *ex_ptr, void *arg)
-{
- if(ex_ptr)
- {
- FREE(ex_ptr);
- }
- return;
-}
-
-int test_entry_plugin_init(int plugin_id)
-{
- g_test_entry_plugin_id = plugin_id;
- g_test_entry_exdata_idx=session_get_ex_new_index("TEST_ENTRY", test_exdata_free, NULL);
- return 0;
-}
-void test_entry_plugin_exit(void)
-{
- return;
-}
-
-static int test_session_mq_plugin_read(struct session *sess, const char* topic_name, const void *data, void *cb_arg)
-{
- struct test_session_message *ctx =(struct test_session_message *)data;
- printf("plugin_read_message(%s)-----------%20s", topic_name, session_get_readable_addr(sess));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
- printf("session(%d)-----------%20s: attach\n", g_test_mq_plugin_id, session_get_readable_addr(sess));
- session_attach_plugin(sess, g_test_mq_plugin_id);
- return 0;
-}
-
-int test_mq_plugin_init(int plugin_id)
-{
- g_test_mq_plugin_id = plugin_id;
- g_test_mq_plugin_exdata_idx=session_get_ex_new_index("TEST_MQ", test_exdata_free, NULL);
- session_mq_subscribe_topic("test_mq_topic", test_session_mq_plugin_read, NULL);
- return 0;
-}
-void test_mq_plugin_exit(void)
-{
- return;
-}
-
-
-
-static void print_session_ctx(struct session *sess, struct test_session_message *ctx, int plugin_id)
-{
- printf("session(%d)-----------%20s: ", plugin_id, session_get_readable_addr(sess));
- printf("server-pkt=%u, server-count=%u, client-pkt=%u, client-count=%u, ",
- ctx->c2s_pkts, ctx->c2s_bytes,
- ctx->s2c_pkts, ctx->s2c_bytes);
- printf("total-pkt=%u, ", ctx->c2s_pkts+ctx->s2c_pkts);
- printf("total-count=%u\n", ctx->c2s_bytes+ctx->s2c_bytes);
- return;
-}
-
-void test_entry_plugin_entry(struct session *sess, enum session_state state, int thread_id)
-{
- struct test_session_message *test_ctx = (struct test_session_message *)session_get_ex_data(sess, g_test_entry_exdata_idx);
- if (test_ctx == NULL)
- {
- test_ctx = CALLOC(struct test_session_message, 1);
- session_set_ex_data(sess, g_test_entry_exdata_idx, test_ctx);
- }
- size_t payload_len = 0;
- session_get_current_payload(sess, &payload_len);
- const struct packet *pkt = session_get_current_packet(sess);
- if (pkt)
- {
- int dir = session_get_direction(sess);
- if (dir==SESSION_DIRECTION_IN)
- {
- test_ctx->c2s_bytes += payload_len;
- test_ctx->c2s_pkts += 1;
- }
- if (dir==SESSION_DIRECTION_OUT)
- {
- test_ctx->s2c_bytes += payload_len;
- test_ctx->s2c_pkts += 1;
- }
- }
- switch (state)
- {
- case SESSION_STATE_CLOSING:
- if (test_ctx != NULL)
- {
- print_session_ctx(sess, test_ctx, g_test_entry_plugin_id);
- }
- break;
- default:
- break;
- }
- return;
-}
-
-void test_mq_plugin_entry(struct session *sess, enum session_state state, int thread_id)
-{
- printf("session(%d)-----------%20s: dettach\n", g_test_mq_plugin_id, session_get_readable_addr(sess));
- session_dettach_plugin(sess, g_test_mq_plugin_id);
- return;
-}
-
diff --git a/src/adapter/test/test_plugin.h b/src/adapter/test/test_plugin.h
deleted file mode 100644
index ac5e0b7..0000000
--- a/src/adapter/test/test_plugin.h
+++ /dev/null
@@ -1,14 +0,0 @@
-#pragma once
-
-#include "plugin_spec.h"
-
-int test_entry_plugin_init(int plugin_id);
-void test_entry_plugin_exit(void);
-void test_entry_plugin_entry(struct session *session, enum session_state state, int thread_id);
-
-
-int test_mq_plugin_init(int plugin_id);
-void test_mq_plugin_exit(void);
-void test_mq_plugin_entry(struct session *session, enum session_state state, int thread_id);
-
-
diff --git a/src/stat_policy/stat_policy.h b/src/stat_policy/stat_policy.h
deleted file mode 100644
index e69de29..0000000
--- a/src/stat_policy/stat_policy.h
+++ /dev/null