diff options
| author | yangwei <[email protected]> | 2023-08-09 20:28:18 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2023-08-10 10:36:33 +0800 |
| commit | cfa01ceb646ead1dfcd39b4cd30dbdb276f16c13 (patch) | |
| tree | 0ac36904d23026803dce35e78e20f360672b8e9b | |
| parent | a4af96c6bcd226fec61bbfb72f6843e1a5f9f054 (diff) | |
WIP
| -rw-r--r-- | CMakeLists.txt | 8 | ||||
| -rw-r--r-- | sdk/include/message.h | 2 | ||||
| -rw-r--r-- | sdk/include/session.h | 2 | ||||
| -rw-r--r-- | sdk/include/session_mq.h | 4 | ||||
| -rw-r--r-- | sdk/include/subsystem.h | 6 | ||||
| -rw-r--r-- | sdk/include/utils.h | 1 | ||||
| -rw-r--r-- | src/adapter/CMakeLists.txt | 3 | ||||
| -rw-r--r-- | src/adapter/adapter.c | 56 | ||||
| -rw-r--r-- | src/adapter/adapter.h | 2 | ||||
| -rw-r--r-- | src/adapter/session.c | 9 | ||||
| -rw-r--r-- | src/adapter/session_manager.c | 50 | ||||
| -rw-r--r-- | src/adapter/session_manager.h | 18 | ||||
| -rw-r--r-- | src/adapter/session_mq.c | 171 | ||||
| -rw-r--r-- | src/adapter/session_mq_internal.h | 8 | ||||
| -rw-r--r-- | src/adapter/subsystem_manager.c | 18 | ||||
| -rw-r--r-- | src/adapter/subsystem_manager.h | 31 |
16 files changed, 361 insertions, 28 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 9dc150a..964d8ff 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,11 +28,13 @@ add_custom_target("install-program" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Pr add_custom_target("install-profile" COMMAND ${CMAKE_COMMAND} ARGS -DCOMPONENT=Profile -P cmake_install.cmake) include_directories(${CMAKE_SOURCE_DIR}) +include_directories(${CMAKE_SOURCE_DIR}/deps) include_directories(${CMAKE_SOURCE_DIR}/sdk/include) -add_subdirectory(vendor) +#add_subdirectory(vendor) +add_subdirectory(src/adapter) add_subdirectory(deps/toml) add_subdirectory(deps/utable) -enable_testing() -add_subdirectory(test) +#enable_testing() +#add_subdirectory(test) diff --git a/sdk/include/message.h b/sdk/include/message.h index c4209c9..c59a0b3 100644 --- a/sdk/include/message.h +++ b/sdk/include/message.h @@ -1,3 +1,5 @@ +#pragma once + #include <stddef.h> #define MESSAGE_MAGIC 0x12345678 diff --git a/sdk/include/session.h b/sdk/include/session.h index b17ced0..8fc16a6 100644 --- a/sdk/include/session.h +++ b/sdk/include/session.h @@ -21,7 +21,7 @@ enum session_state SESSION_STATE_OPENING = 1 << 1, SESSION_STATE_ACTIVE = 1 << 2, SESSION_STATE_CLOSING = 1 << 3, - SESSION_STATE_ALL = (1 << 1 | 1 << 2 | 1 << 3), + SESSION_STATE_ALL = (1 << 1 | 1 << 2 | 1 << 3), }; struct session; diff --git a/sdk/include/session_mq.h b/sdk/include/session_mq.h index 20ba7ac..37e9153 100644 --- a/sdk/include/session_mq.h +++ b/sdk/include/session_mq.h @@ -4,8 +4,8 @@ struct session; -typedef void session_mq_free_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len); -typedef int session_mq_read_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len); +typedef void session_mq_free_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len, void *cb_arg); +typedef int session_mq_read_callback(const struct session *session, const char* topic_name, void *data, const size_t data_len, void *cb_arg); int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg); diff --git a/sdk/include/subsystem.h b/sdk/include/subsystem.h index 67fb09a..eff6d7d 100644 --- a/sdk/include/subsystem.h +++ b/sdk/include/subsystem.h @@ -1,4 +1,6 @@ //subsystem manager +#pragma once + #include "session.h" typedef int subsystem_init_callback(void); @@ -8,4 +10,6 @@ typedef void subsystem_entry_callback(struct session *session, enum session_stat struct subsystem_descriptor; struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb); void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc); -void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc);
\ No newline at end of file + +void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc); +void subsystem_attach_session(struct session *session, struct subsystem_descriptor *desc);
\ No newline at end of file diff --git a/sdk/include/utils.h b/sdk/include/utils.h index 13eb7ba..7beafbb 100644 --- a/sdk/include/utils.h +++ b/sdk/include/utils.h @@ -1,6 +1,7 @@ #pragma once #include <stdlib.h> //calloc +#include <stddef.h> //NULL #define CALLOC(type, number) ((type *)calloc(sizeof(type), number)) diff --git a/src/adapter/CMakeLists.txt b/src/adapter/CMakeLists.txt new file mode 100644 index 0000000..1140fae --- /dev/null +++ b/src/adapter/CMakeLists.txt @@ -0,0 +1,3 @@ +set(CMAKE_C_FLAGS "-std=c99") +add_definitions(-fPIC) +add_library(adapter STATIC adapter.c subsystem_manager.c session_manager.c session_mq.c)
\ No newline at end of file diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c index 6202369..5330a34 100644 --- a/src/adapter/adapter.c +++ b/src/adapter/adapter.c @@ -1,26 +1,60 @@ -#include "session.h" #include "adapter.h" +#include "session_manager.h" -struct session +#include "stream.h" + +struct adapter { }; -struct subsystem_descriptor +struct adapter *adapter_init(void) { + return NULL; +} +void adapter_exit(struct adapter *adapter) +{ + return; }; -struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb) + +struct streaminfo; +//streaminfo open +uint8_t adapter_session_open(struct streaminfo *stream, struct session **session, int thread_id) { - return NULL; + *session=session_new(stream, SESSION_TYPE_TCP, SESSION_STATE_OPENING, stream->threadnum); + session_dispatch(*session, stream->threadnum); + return 0; +} + +uint8_t adapter_session_active(struct streaminfo *stream, struct session **session, int thread_id) +{ + if(*session==NULL) + { + *session=session_new(stream, SESSION_TYPE_TCP, SESSION_STATE_ACTIVE, stream->threadnum); + } + (*session)->state=SESSION_STATE_ACTIVE; + session_dispatch(*session, stream->threadnum); + return 0; } -void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc) + +//streaminfo close, or firewall active close streaminfo +uint8_t adapter_session_close(struct streaminfo *stream, struct session **session, int thread_id) { - return NULL; -}; + session_dispatch(*session, stream->threadnum); + if(*session) + { + session_free(*session, stream->threadnum); + } + return 0; +} -void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc) +uint8_t session_direction_get(struct session *session) { - return; -};
\ No newline at end of file + return 0; +} +uint8_t session_current_direction_get(struct session *session) +{ + return 0; +}
\ No newline at end of file diff --git a/src/adapter/adapter.h b/src/adapter/adapter.h index fb643ef..3938d81 100644 --- a/src/adapter/adapter.h +++ b/src/adapter/adapter.h @@ -1,6 +1,6 @@ #include "session.h" #include "session_mq.h" -#include "subsystem.h" +#include "subsystem_manager.h" #include <stdint.h> diff --git a/src/adapter/session.c b/src/adapter/session.c deleted file mode 100644 index 83c81c5..0000000 --- a/src/adapter/session.c +++ /dev/null @@ -1,9 +0,0 @@ -#include "session.h" - -struct session -{ - -}; - -struct session *session_new(void); -void session_free(struct session *session);
\ No newline at end of file diff --git a/src/adapter/session_manager.c b/src/adapter/session_manager.c new file mode 100644 index 0000000..b5936c0 --- /dev/null +++ b/src/adapter/session_manager.c @@ -0,0 +1,50 @@ +#include "utils.h" +#include "session_manager.h" +#include "subsystem_manager.h" +#include "session_mq_internal.h" + +#include "uthash/uthash.h" + +struct streaminfo; +struct session *session_new(struct streaminfo* stream, enum session_type type, enum session_state state, int thread_id) +{ + if(stream == NULL)return NULL; + struct session *session = CALLOC(struct session, 1); + session->stream = stream; + session->type = type; + session->state = state; + session->subsystem_contexts = NULL; + session->mq = session_mq_new(); + return session; + +} + +void session_free(struct session *session, int thread_id) +{ + if(session==NULL)return; + + if(session->subsystem_contexts != NULL) + { + FREE(session->subsystem_contexts); + } + if(session->mq != NULL) + { + session_mq_free(session->mq); + } +} + +void session_dispatch(struct session *session, int thread_id) +{ + if(session==NULL)return; + int subsystem_num = sizeof(global_subsystem_schema) / sizeof(global_subsystem_schema[0]); + for(int i = 0; i < subsystem_num; i++) + { + if ((global_subsystem_schema[i].interest_state & session->state) && (global_subsystem_schema[i].entry_cb != NULL)) + { + global_subsystem_schema[i].entry_cb(session, session->state, thread_id, session->subsystem_contexts[i]); + } + } + session_mq_dequeue(session->mq, session); + return; +} + diff --git a/src/adapter/session_manager.h b/src/adapter/session_manager.h new file mode 100644 index 0000000..15fc676 --- /dev/null +++ b/src/adapter/session_manager.h @@ -0,0 +1,18 @@ +#pragma once + +#include "session.h" + +struct session +{ + struct streaminfo *stream; + enum session_type type; + enum session_state state; + void **subsystem_contexts; + struct session_mq *mq; +}; + +struct streaminfo; +struct session *session_new(struct streaminfo* stream, enum session_type type, enum session_state state, int thread_id); +void session_free(struct session *session, int thread_id); + +void session_dispatch(struct session *session, int thread_id);
\ No newline at end of file diff --git a/src/adapter/session_mq.c b/src/adapter/session_mq.c new file mode 100644 index 0000000..6d1908f --- /dev/null +++ b/src/adapter/session_mq.c @@ -0,0 +1,171 @@ + +#include "utils.h" +#include "session_mq_internal.h" + +#include <sys/queue.h> +#include "uthash/uthash.h" + +struct session_mq_node +{ + const char *topic_name; + void *message; + size_t message_len; + STAILQ_ENTRY(session_mq_node) entries; +}; + +struct session_mq +{ + STAILQ_HEAD(, session_mq_node) head; +}; + + +struct subscriber +{ + session_mq_read_callback *read_cb; + void *cb_arg; + STAILQ_ENTRY(subscriber) entries; +}; + +struct session_mq_topic_schema +{ + const char *topic_name; + session_mq_free_callback *free_cb; + void *cb_arg; + STAILQ_HEAD(, subscriber) subscribers; + UT_hash_handle hh; +}; + +static struct session_mq_topic_schema *global_session_mq_schema = NULL; + +struct session_mq *session_mq_new(void) +{ + struct session_mq *mq = CALLOC(struct session_mq, 1); + STAILQ_INIT(&mq->head); + return mq; +} + +void session_mq_free(struct session_mq *mq) +{ + struct session_mq_node *node; + while ((node = STAILQ_FIRST(&mq->head)) != NULL) + { + STAILQ_REMOVE_HEAD(&mq->head, entries); + free(node); + } +} + +static struct session_mq_topic_schema *session_mq_topic_schema(const char *topic_name) +{ + struct session_mq_topic_schema *schema; + HASH_FIND_STR(global_session_mq_schema, topic_name, schema); + return schema; +}; + +void session_mq_dequeue(struct session_mq *mq, struct session *session) +{ + struct session_mq_node *node; + struct subscriber *sub; + while (!STAILQ_EMPTY(&mq->head)) + { + node = STAILQ_FIRST(&mq->head); + STAILQ_REMOVE_HEAD(&mq->head, entries); + struct session_mq_topic_schema *topic= session_mq_topic_schema(node->topic_name); + if(topic != NULL) + { + STAILQ_FOREACH(sub, &topic->subscribers, entries) + { + sub->read_cb(session, node->topic_name, node->message, node->message_len, sub->cb_arg); + } + if(topic->free_cb != NULL) + { + topic->free_cb(session, node->topic_name, node->message, node->message_len, topic->cb_arg); + } + } + FREE(node); + } + return; +}; + +int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg) { + struct session_mq_topic_schema *topic; + // 检查topic_name是否已存在 + HASH_FIND_STR(global_session_mq_schema, topic_name, topic); + if (topic != NULL && topic->free_cb != NULL) + { + return -1; // 已存在 + } + if(topic == NULL) + { + topic = CALLOC(struct session_mq_topic_schema,1); + HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic); + } + if(topic->topic_name) + { + topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储 + } + topic->free_cb = free_cb; + if(STAILQ_EMPTY(&topic->subscribers)) + { + STAILQ_INIT(&topic->subscribers); + } + return 0; // 成功 +} + + +int session_mq_topic_destroy(const char *topic_name) { + struct session_mq_topic_schema *topic; + + // 在哈希表中查找指定的主题名 + HASH_FIND_STR(global_session_mq_schema, topic_name, topic); + if (topic == NULL) + { + return -1; // 没有找到 + } + // 从哈希表中删除 + HASH_DEL(global_session_mq_schema, topic); + // 释放存储的主题名 + free((void*)topic->topic_name); + + struct subscriber *sub; + while ((sub = STAILQ_FIRST(&topic->subscribers)) != NULL) + { + STAILQ_REMOVE_HEAD(&topic->subscribers, entries); + free(sub); + } + // 释放结构体 + free(topic); + return 0; // 成功 +} + + +int session_mq_send_msg(struct session *session, const char *topic_name, const void *data, const size_t data_len) +{ + struct session_mq_topic_schema *topic; + HASH_FIND_STR(global_session_mq_schema, topic_name, topic); + if (topic == NULL) + { + return -1; // 没有找到 + } + struct session_mq_node *node= CALLOC(struct session_mq_node,1); + node->topic_name = topic->topic_name; + node->message = CALLOC(char, data_len); + memcpy(node->message, data, data_len); + node->message_len=data_len; + return 0; +} + +int session_mq_topic_subscribe(const char *topic_name, session_mq_read_callback *read_cb, void *cb_arg) +{ + struct session_mq_topic_schema *topic; + session_mq_topic_create(topic_name, NULL, NULL); + HASH_FIND_STR(global_session_mq_schema, topic_name, topic); + if (topic == NULL) + { + return -1; // topic 创建失败 + } + struct subscriber *new_subscriber = CALLOC(struct subscriber,1); + new_subscriber->read_cb = read_cb; + new_subscriber->cb_arg = cb_arg; + STAILQ_INSERT_TAIL(&topic->subscribers, new_subscriber, entries); + return 0; +}
\ No newline at end of file diff --git a/src/adapter/session_mq_internal.h b/src/adapter/session_mq_internal.h new file mode 100644 index 0000000..9b99d5d --- /dev/null +++ b/src/adapter/session_mq_internal.h @@ -0,0 +1,8 @@ +#include "session_mq.h" +#include "session_manager.h" + +struct session_mq; + +struct session_mq *session_mq_new(); +void session_mq_free(struct session_mq *mq); +void session_mq_dequeue(struct session_mq *mq, struct session *session);
\ No newline at end of file diff --git a/src/adapter/subsystem_manager.c b/src/adapter/subsystem_manager.c new file mode 100644 index 0000000..fdf3112 --- /dev/null +++ b/src/adapter/subsystem_manager.c @@ -0,0 +1,18 @@ +#include "subsystem_manager.h" + +#include "utils.h" +#include "session.h" + +struct subsystem_descriptor *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb) +{ + return NULL; +} +void *subsystem_get_session_ctx(struct session *session, struct subsystem_descriptor *desc) +{ + return NULL; +}; + +void subsystem_dettach_session(struct session *session, struct subsystem_descriptor *desc) +{ + return; +};
\ No newline at end of file diff --git a/src/adapter/subsystem_manager.h b/src/adapter/subsystem_manager.h new file mode 100644 index 0000000..53edd50 --- /dev/null +++ b/src/adapter/subsystem_manager.h @@ -0,0 +1,31 @@ +#pragma once + +#include "utils.h" +#include "subsystem.h" + +struct subsystem_descriptor +{ + enum session_type type; + int interest_state; + subsystem_init_callback *init_cb; + subsystem_exit_callback *exit_cb; + subsystem_entry_callback *entry_cb; +}; + +struct subsystem_descriptor global_subsystem_schema[] = +{ + { + .type=SESSION_TYPE_UDP, + .init_cb=NULL, + .exit_cb=NULL, + .entry_cb=NULL, + .interest_state=SESSION_STATE_ALL, + }, + { + .type=SESSION_TYPE_UDP, + .init_cb=NULL, + .exit_cb=NULL, + .entry_cb=NULL, + .interest_state=SESSION_STATE_ALL, + }, +};
\ No newline at end of file |
