summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2023-08-09 22:29:30 +0800
committeryangwei <[email protected]>2023-08-10 10:36:33 +0800
commitff4fd95a9d435b8e6a9709e47417b8a35436277f (patch)
treecfbe1304b761c80c42cc366853d3f7532e70a648
parentcc863db7a96998bf0555dfa9e9c7885e97405fb5 (diff)
WIP
-rw-r--r--CMakeLists.txt25
-rw-r--r--sdk/include/subsystem.h2
-rw-r--r--sdk/include/utils.h2
-rw-r--r--src/adapter/adapter.c10
-rw-r--r--src/adapter/session_mq.c44
5 files changed, 56 insertions, 27 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 964d8ff..2d1e724 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -20,6 +20,31 @@ endif()
option(ENABLE_PIC "Generate position independent code (necessary for shared libraries)" TRUE)
option(ENABLE_WARNING_ALL "Enable all optional warnings which are desirable for normal code" TRUE)
+find_program(CMAKE_CXX_CPPCHECK NAMES cppcheck)
+if (CMAKE_CXX_CPPCHECK)
+ list(
+ APPEND CMAKE_CXX_CPPCHECK
+ "--enable=all"
+ "--error-exitcode=1"
+ "--suppress=unusedFunction"
+ "--suppress=missingInclude"
+ "--suppress=uselessAssignmentPtrArg"
+ "--suppress=unmatchedSuppression"
+ "--suppress=variableScope"
+ "--suppress=unreadVariable"
+ "--suppress=cstyleCast"
+ "--suppress=memleakOnRealloc"
+ "--suppress=constParameter"
+ "--suppress=uselessAssignmentArg"
+ "--suppress=uninitvar"
+ "--suppress=unusedStructMember"
+ "--suppress=unreachableCode"
+ )
+ set(CMAKE_C_CPPCHECK ${CMAKE_CXX_CPPCHECK})
+else()
+ message(FATAL_ERROR "Could not find the program cppcheck.")
+endif()
+
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wall")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall")
set(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -lelf")
diff --git a/sdk/include/subsystem.h b/sdk/include/subsystem.h
index b6f1807..3597b5a 100644
--- a/sdk/include/subsystem.h
+++ b/sdk/include/subsystem.h
@@ -8,7 +8,7 @@ typedef void subsystem_exit_callback(void);
typedef void subsystem_entry_callback(struct session *session, enum session_state state, int thread_id, void **ctx);
struct subsystem_schema;
-struct subsystem_schema *subsystem_get0_descriptor(enum session_type type, subsystem_entry_callback *entry_cb);
+struct subsystem_schema *subsystem_get0_schema(enum session_type type, subsystem_entry_callback *entry_cb);
void *subsystem_session_ctx_get(struct session *session, struct subsystem_schema *schema);
void subsystem_dettach_session(struct session *session, struct subsystem_schema *schema);
diff --git a/sdk/include/utils.h b/sdk/include/utils.h
index 685b58c..7beafbb 100644
--- a/sdk/include/utils.h
+++ b/sdk/include/utils.h
@@ -5,7 +5,7 @@
#define CALLOC(type, number) ((type *)calloc(sizeof(type), number))
-#define FREE(p) {free(p);(p)=NULL;}
+#define FREE(p) {free(p);p=NULL;}
#define TRUE 1
#define FALSE 0
diff --git a/src/adapter/adapter.c b/src/adapter/adapter.c
index db809c2..3d0816d 100644
--- a/src/adapter/adapter.c
+++ b/src/adapter/adapter.c
@@ -57,8 +57,8 @@ uint8_t adapter_session_open(struct adapter *adapter, struct streaminfo *stream,
{
return -1;
}
- *session=session_new(stream, type, SESSION_STATE_OPENING, adapter->subsystem_num, stream->threadnum);
- session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_OPENING, stream->threadnum);
+ *session=session_new(stream, type, SESSION_STATE_OPENING, adapter->subsystem_num, thread_id);
+ session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_OPENING, thread_id);
return 0;
}
@@ -79,10 +79,10 @@ uint8_t adapter_session_active(struct adapter *adapter, struct streaminfo *strea
}
if(*session==NULL)
{
- *session=session_new(stream, type, SESSION_STATE_ACTIVE, adapter->subsystem_num,stream->threadnum);
+ *session=session_new(stream, type, SESSION_STATE_ACTIVE, adapter->subsystem_num,thread_id);
}
(*session)->state=SESSION_STATE_ACTIVE;
- session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_ACTIVE, stream->threadnum);
+ session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_ACTIVE, thread_id);
return 0;
}
@@ -102,7 +102,7 @@ uint8_t adapter_session_close(struct adapter *adapter, struct streaminfo *stream
{
return -1;
}
- session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_CLOSING, stream->threadnum);
+ session_dispatch(adapter->desc, adapter->subsystem_num, *session, type, SESSION_STATE_CLOSING, thread_id);
if(*session)
{
session_free(*session, type, adapter->subsystem_num, stream->threadnum);
diff --git a/src/adapter/session_mq.c b/src/adapter/session_mq.c
index 6d1908f..1e133c0 100644
--- a/src/adapter/session_mq.c
+++ b/src/adapter/session_mq.c
@@ -15,7 +15,7 @@ struct session_mq_node
struct session_mq
{
- STAILQ_HEAD(, session_mq_node) head;
+ STAILQ_HEAD(session_mq_head, session_mq_node) head;
};
@@ -31,7 +31,7 @@ struct session_mq_topic_schema
const char *topic_name;
session_mq_free_callback *free_cb;
void *cb_arg;
- STAILQ_HEAD(, subscriber) subscribers;
+ STAILQ_HEAD(sub_q_head, subscriber) subscribers;
UT_hash_handle hh;
};
@@ -86,32 +86,36 @@ void session_mq_dequeue(struct session_mq *mq, struct session *session)
return;
};
-int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg) {
- struct session_mq_topic_schema *topic;
- // 检查topic_name是否已存在
- HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
- if (topic != NULL && topic->free_cb != NULL)
- {
- return -1; // 已存在
- }
- if(topic == NULL)
+int session_mq_topic_create(const char *topic_name, session_mq_free_callback *free_cb, void *cb_arg)
+{
+ struct session_mq_topic_schema *topic;
+ // 检查topic_name是否已存在
+ HASH_FIND_STR(global_session_mq_schema, topic_name, topic);
+ // 如果找到现有主题,并且free_cb也不为空,则认为生产者已创建此主题
+ if (topic != NULL && topic->free_cb != NULL)
{
- topic = CALLOC(struct session_mq_topic_schema,1);
- HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic);
+ return -1; // 已存在
}
- if(topic->topic_name)
+ // 如果主题不存在,则创建新主题,由消费者或生产者调用
+ if (topic == NULL)
{
- topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储
+ topic = CALLOC(struct session_mq_topic_schema, 1);
+ topic->topic_name = strdup(topic_name); // 复制字符串,确保独立存储
+ HASH_ADD_KEYPTR(hh, global_session_mq_schema, topic->topic_name, strlen(topic->topic_name), topic);
+ if (STAILQ_EMPTY(&topic->subscribers))
+ {
+ STAILQ_INIT(&topic->subscribers);
+ }
}
- topic->free_cb = free_cb;
- if(STAILQ_EMPTY(&topic->subscribers))
+ // 如果找到现有主题但free_cb为空,则为消费者占的坑,现在由生产者来填充
+ if (free_cb != NULL)
{
- STAILQ_INIT(&topic->subscribers);
+ topic->free_cb = free_cb;
+ topic->cb_arg = cb_arg;
}
- return 0; // 成功
+ return 0; // 成功
}
-
int session_mq_topic_destroy(const char *topic_name) {
struct session_mq_topic_schema *topic;