summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-08-09 20:28:18 +0800
committeryangwei <[email protected]>2023-08-10 10:36:33 +0800
commitcfa01ceb646ead1dfcd39b4cd30dbdb276f16c13 (patch)
tree0ac36904d23026803dce35e78e20f360672b8e9b
parenta4af96c6bcd226fec61bbfb72f6843e1a5f9f054 (diff)
WIP
-rw-r--r--CMakeLists.txt8
-rw-r--r--sdk/include/message.h2
-rw-r--r--sdk/include/session.h2
-rw-r--r--sdk/include/session_mq.h4
-rw-r--r--sdk/include/subsystem.h6
-rw-r--r--sdk/include/utils.h1
-rw-r--r--src/adapter/CMakeLists.txt3
-rw-r--r--src/adapter/adapter.c56
-rw-r--r--src/adapter/adapter.h2
-rw-r--r--src/adapter/session.c9
-rw-r--r--src/adapter/session_manager.c50
-rw-r--r--src/adapter/session_manager.h18
-rw-r--r--src/adapter/session_mq.c171
-rw-r--r--src/adapter/session_mq_internal.h8
-rw-r--r--src/adapter/subsystem_manager.c18
-rw-r--r--src/adapter/subsystem_manager.h31
16 files changed, 361 insertions, 28 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 9dc150a..964d8ff 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,11 +28,13 @@ add_custom_target("install-program" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Pr
add_custom_target("install-profile" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Profile -P cmake_install.cmake)
include_directories(${CMAKE_SOURCE_DIR})
+include_directories(${CMAKE_SOURCE_DIR}/deps)
include_directories(${CMAKE_SOURCE_DIR}/sdk/include)
-add_subdirectory(vendor)
+#add_subdirectory(vendor)
+add_subdirectory(src/adapter)
add_subdirectory(deps/toml)
add_subdirectory(deps/utable)
-enable_testing()
-add_subdirectory(test)
+#enable_testing()
+#add_subdirectory(test)
diff --git a/sdk/include/message.h b/sdk/include/message.h
index c4209c9..c59a0b3 100644
--- a/sdk/include/message.h
+++ b/sdk/include/message.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <stddef.h>
#define MESSAGE_MAGIC 0x12345678
diff --git a/sdk/include/session.h b/sdk/include/session.h
index b17ced0..8fc16a6 100644
--- a/sdk/include/session.h
+++ b/sdk/include/session.h
@@ -21,7 +21,7 @@ enum session_state
SESSION_STATE_OPENING = 1 << 1,
SESSION_STATE_ACTIVE = 1 << 2,
SESSION_STATE_CLOSING = 1 << 3,
- SESSION_STATE_ALL = (1 << 1 | 1 << 2 | 1 << 3),
+ SESSION_STATE_ALL = (1 << 1 | 1 << 2 | 1 << 3),
};
struct session;
diff --git a/sdk/include/session_mq.h b/sdk/include/session_mq.h
index 20ba7ac..37e9153 100644
--- a/sdk/include/session_mq.h
+++ b/sdk/include/session_mq.h
@@ -4,8 +4,8 @@
struct session;
-typedef void session_mq_free_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len);
-typedef int session_mq_read_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len);
+typedef void session_mq_free_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len, void *cb_arg);
+typedef int session_mq_read_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len, void *cb_arg);
int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg);
diff --git a/sdk/include/subsystem.h b/sdk/include/subsystem.h
index 67fb09a..eff6d7d 100644
--- a/sdk/include/subsystem.h
+++ b/sdk/include/subsystem.h
@@ -1,4 +1,6 @@
//subsystem manager
+#pragma once
+
#include "session.h"
typedef int subsystem_init_callback(void);
@@ -8,4 +10,6 @@ typedef void subsystem_entry_callback(struct session *session, enum session_stat
struct subsystem_descriptor;
struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb);
void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc);
-void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc); \ No newline at end of file
+
+void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc);
+void subsystem_attach_session(struct session *session, struct subsystem_descriptor *desc); \ No newline at end of file
diff --git a/sdk/include/utils.h b/sdk/include/utils.h
index 13eb7ba..7beafbb 100644
--- a/sdk/include/utils.h
+++ b/sdk/include/utils.h
@@ -1,6 +1,7 @@
#pragma once
#include <stdlib.h> //calloc
+#include <stddef.h> //NULL
#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
diff --git a/src/adapter/CMakeLists.txt b/src/adapter/CMakeLists.txt
new file mode 100644
index 0000000..1140fae
--- /dev/null
+++ b/src/adapter/CMakeLists.txt
@@ -0,0 +1,3 @@
+set(CMAKE_C_FLAGS "-std=c99")
+add_definitions(-fPIC)
+add_library(adapter STATIC adapter.c subsystem_manager.c session_manager.c session_mq.c) \ No newline at end of file
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index 6202369..5330a34 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -1,26 +1,60 @@
-#include "session.h"
#include "adapter.h"
+#include "session_manager.h"
-struct session
+#include "stream.h"
+
+struct adapter
{
};
-struct subsystem_descriptor
+struct adapter *adapter_init(void)
{
+ return NULL;
+}
+void adapter_exit(struct adapter *adapter)
+{
+ return;
};
-struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb)
+
+struct streaminfo;
+//streaminfo open
+uint8_t adapter_session_open(struct streaminfo *stream, struct session **session, int thread_id)
{
- return NULL;
+ *session=session_new(stream, SESSION_TYPE_TCP, SESSION_STATE_OPENING, stream->threadnum);
+ session_dispatch(*session, stream->threadnum);
+ return 0;
+}
+
+uint8_t adapter_session_active(struct streaminfo *stream, struct session **session, int thread_id)
+{
+ if(*session==NULL)
+ {
+ *session=session_new(stream, SESSION_TYPE_TCP, SESSION_STATE_ACTIVE, stream->threadnum);
+ }
+ (*session)->state=SESSION_STATE_ACTIVE;
+ session_dispatch(*session, stream->threadnum);
+ return 0;
}
-void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc)
+
+//streaminfo close, or firewall active close streaminfo
+uint8_t adapter_session_close(struct streaminfo *stream, struct session **session, int thread_id)
{
- return NULL;
-};
+ session_dispatch(*session, stream->threadnum);
+ if(*session)
+ {
+ session_free(*session, stream->threadnum);
+ }
+ return 0;
+}
-void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc)
+uint8_t session_direction_get(struct session *session)
{
- return;
-}; \ No newline at end of file
+ return 0;
+}
+uint8_t session_current_direction_get(struct session *session)
+{
+ return 0;
+} \ No newline at end of file
diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h
index fb643ef..3938d81 100644
--- a/src/adapter/adapter.h
+++ b/src/adapter/adapter.h
@@ -1,6 +1,6 @@
#include "session.h"
#include "session_mq.h"
-#include "subsystem.h"
+#include "subsystem_manager.h"
#include <stdint.h>
diff --git a/src/adapter/session.c b/src/adapter/session.c
deleted file mode 100644
index 83c81c5..0000000
--- a/src/adapter/session.c
+++ /dev/null
@@ -1,9 +0,0 @@
-#include "session.h"
-
-struct session
-{
-
-};
-
-struct session *session_new(void);
-void session_free(struct session *session); \ No newline at end of file
diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c
new file mode 100644
index 0000000..b5936c0
--- /dev/null
+++ b/src/adapter/session_manager.c
@@ -0,0 +1,50 @@
+#include "utils.h"
+#include "session_manager.h"
+#include "subsystem_manager.h"
+#include "session_mq_internal.h"
+
+#include "uthash/uthash.h"
+
+struct streaminfo;
+struct session *session_new(struct streaminfo* stream, enum session_type type, enum session_state state, int thread_id)
+{
+ if(stream == NULL)return NULL;
+ struct session *session = CALLOC(struct session, 1);
+ session->stream = stream;
+ session->type = type;
+ session->state = state;
+ session->subsystem_contexts = NULL;
+ session->mq = session_mq_new();
+ return session;
+
+}
+
+void session_free(struct session *session, int thread_id)
+{
+ if(session==NULL)return;
+
+ if(session->subsystem_contexts != NULL)
+ {
+ FREE(session->subsystem_contexts);
+ }
+ if(session->mq != NULL)
+ {
+ session_mq_free(session->mq);
+ }
+}
+
+void session_dispatch(struct session *session, int thread_id)
+{
+ if(session==NULL)return;
+ int subsystem_num = sizeof(global_subsystem_schema) / sizeof(global_subsystem_schema[0]);
+ for(int i = 0; i < subsystem_num; i++)
+ {
+ if ((global_subsystem_schema[i].interest_state & session->state) && (global_subsystem_schema[i].entry_cb != NULL))
+ {
+ global_subsystem_schema[i].entry_cb(session, session->state, thread_id, session->subsystem_contexts[i]);
+ }
+ }
+ session_mq_dequeue(session->mq, session);
+ return;
+}
+
diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h
new file mode 100644
index 0000000..15fc676
--- /dev/null
+++ b/src/adapter/session_manager.h
@@ -0,0 +1,18 @@
+#pragma once
+
+#include "session.h"
+
+struct session
+{
+ struct streaminfo *stream;
+ enum session_type type;
+ enum session_state state;
+ void **subsystem_contexts;
+ struct session_mq *mq;
+};
+
+struct streaminfo;
+struct session *session_new(struct streaminfo* stream, enum session_type type, enum session_state state, int thread_id);
+void session_free(struct session *session, int thread_id);
+
+void session_dispatch(struct session *session, int thread_id); \ No newline at end of file
diff --git a/src/adapter/session_mq.c b/src/adapter/session_mq.c
new file mode 100644
index 0000000..6d1908f
--- /dev/null
+++ b/src/adapter/session_mq.c
@@ -0,0 +1,171 @@
+
+#include "utils.h"
+#include "session_mq_internal.h"
+
+#include <sys/queue.h>
+#include "uthash/uthash.h"
+
+struct session_mq_node
+{
+ const char *topic_name;
+ void *message;
+ size_t message_len;
+ STAILQ_ENTRY(session_mq_node) entries;
+};
+
+struct session_mq
+{
+ STAILQ_HEAD(, session_mq_node) head;
+};
+
+
+struct subscriber
+{
+ session_mq_read_callback *read_cb;
+ void *cb_arg;
+ STAILQ_ENTRY(subscriber) entries;
+};
+
+struct session_mq_topic_schema
+{
+ const char *topic_name;
+ session_mq_free_callback *free_cb;
+ void *cb_arg;
+ STAILQ_HEAD(, subscriber) subscribers;
+ UT_hash_handle hh;
+};
+
+static struct session_mq_topic_schema *global_session_mq_schema = NULL;
+
+struct session_mq *session_mq_new(void)
+{
+ struct session_mq *mq = CALLOC(struct session_mq, 1);
+ STAILQ_INIT(&mq->head);
+ return mq;
+}
+
+void session_mq_free(struct session_mq *mq)
+{
+ struct session_mq_node *node;
+ while ((node = STAILQ_FIRST(&mq->head)) != NULL)
+ {
+ STAILQ_REMOVE_HEAD(&mq->head, entries);
+ free(node);
+ }
+}
+
+static struct session_mq_topic_schema *session_mq_topic_schema(const char *topic_name)
+{
+ struct session_mq_topic_schema *schema;
+ HASH_FIND_STR(global_session_mq_schema, topic_name, schema);
+ return schema;
+};
+
+void session_mq_dequeue(struct session_mq *mq, struct session *session)
+{
+ struct session_mq_node *node;
+ struct subscriber *sub;
+ while (!STAILQ_EMPTY(&mq->head))
+ {
+ node = STAILQ_FIRST(&mq->head);
+ STAILQ_REMOVE_HEAD(&mq->head, entries);
+ struct session_mq_topic_schema *topic= session_mq_topic_schema(node->topic_name);
+ if(topic != NULL)
+ {
+ STAILQ_FOREACH(sub, &topic->subscribers, entries)
+ {
+ sub->read_cb(session, node->topic_name, node->message, node->message_len, sub->cb_arg);
+ }
+ if(topic->free_cb != NULL)
+ {
+ topic->free_cb(session, node->topic_name, node->message, node->message_len, topic->cb_arg);
+ }
+ }
+ FREE(node);
+ }
+ return;
+};
+
+int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg) {
+ struct session_mq_topic_schema *topic;
+ // 检查topic_name是否已存在
+ HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
+ if (topic != NULL && topic->free_cb != NULL)
+ {
+ return -1; // 已存在
+ }
+ if(topic == NULL)
+ {
+ topic = CALLOC(struct session_mq_topic_schema,1);
+ HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic);
+ }
+ if(topic->topic_name)
+ {
+ topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储
+ }
+ topic->free_cb = free_cb;
+ if(STAILQ_EMPTY(&topic->subscribers))
+ {
+ STAILQ_INIT(&topic->subscribers);
+ }
+ return 0; // 成功
+}
+
+
+int session_mq_topic_destroy(const char *topic_name) {
+ struct session_mq_topic_schema *topic;
+
+ // 在哈希表中查找指定的主题名
+ HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
+ if (topic == NULL)
+ {
+ return -1; // 没有找到
+ }
+ // 从哈希表中删除
+ HASH_DEL(global_session_mq_schema, topic);
+ // 释放存储的主题名
+ free((void*)topic->topic_name);
+
+ struct subscriber *sub;
+ while ((sub = STAILQ_FIRST(&topic->subscribers)) != NULL)
+ {
+ STAILQ_REMOVE_HEAD(&topic->subscribers, entries);
+ free(sub);
+ }
+ // 释放结构体
+ free(topic);
+ return 0; // 成功
+}
+
+
+int session_mq_send_msg(struct session *session, const char *topic_name, const void *data, const size_t data_len)
+{
+ struct session_mq_topic_schema *topic;
+ HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
+ if (topic == NULL)
+ {
+ return -1; // 没有找到
+ }
+ struct session_mq_node *node= CALLOC(struct session_mq_node,1);
+ node->topic_name = topic->topic_name;
+ node->message = CALLOC(char, data_len);
+ memcpy(node->message, data, data_len);
+ node->message_len=data_len;
+ return 0;
+}
+
+int session_mq_topic_subscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg)
+{
+ struct session_mq_topic_schema *topic;
+ session_mq_topic_create(topic_name, NULL, NULL);
+ HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
+ if (topic == NULL)
+ {
+ return -1; // topic 创建失败
+ }
+ struct subscriber *new_subscriber = CALLOC(struct subscriber,1);
+ new_subscriber->read_cb = read_cb;
+ new_subscriber->cb_arg = cb_arg;
+ STAILQ_INSERT_TAIL(&topic->subscribers, new_subscriber, entries);
+ return 0;
+} \ No newline at end of file
diff --git a/src/adapter/session_mq_internal.h b/src/adapter/session_mq_internal.h
new file mode 100644
index 0000000..9b99d5d
--- /dev/null
+++ b/src/adapter/session_mq_internal.h
@@ -0,0 +1,8 @@
+#include "session_mq.h"
+#include "session_manager.h"
+
+struct session_mq;
+
+struct session_mq *session_mq_new();
+void session_mq_free(struct session_mq *mq);
+void session_mq_dequeue(struct session_mq *mq, struct session *session); \ No newline at end of file
diff --git a/src/adapter/subsystem_manager.c b/src/adapter/subsystem_manager.c
new file mode 100644
index 0000000..fdf3112
--- /dev/null
+++ b/src/adapter/subsystem_manager.c
@@ -0,0 +1,18 @@
+#include "subsystem_manager.h"
+
+#include "utils.h"
+#include "session.h"
+
+struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb)
+{
+ return NULL;
+}
+void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc)
+{
+ return NULL;
+};
+
+void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc)
+{
+ return;
+}; \ No newline at end of file
diff --git a/src/adapter/subsystem_manager.h b/src/adapter/subsystem_manager.h
new file mode 100644
index 0000000..53edd50
--- /dev/null
+++ b/src/adapter/subsystem_manager.h
@@ -0,0 +1,31 @@
+#pragma once
+
+#include "utils.h"
+#include "subsystem.h"
+
+struct subsystem_descriptor
+{
+ enum session_type type;
+ int interest_state;
+ subsystem_init_callback *init_cb;
+ subsystem_exit_callback *exit_cb;
+ subsystem_entry_callback *entry_cb;
+};
+
+struct subsystem_descriptor global_subsystem_schema[] =
+{
+ {
+ .type=SESSION_TYPE_UDP,
+ .init_cb=NULL,
+ .exit_cb=NULL,
+ .entry_cb=NULL,
+ .interest_state=SESSION_STATE_ALL,
+ },
+ {
+ .type=SESSION_TYPE_UDP,
+ .init_cb=NULL,
+ .exit_cb=NULL,
+ .entry_cb=NULL,
+ .interest_state=SESSION_STATE_ALL,
+ },
+}; \ No newline at end of file