summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李佳 <[email protected]>2021-04-26 02:08:18 +0000
committer李佳 <[email protected]>2021-04-26 02:08:18 +0000
commit6c467abc5c68c6ed5caab660761c2b525a0deae4 (patch)
treed84b2267905d2b2e530eb9612604f42a38c125e8
parent91bc8e3880ecf5c7f5a18a1392c7109c9babe0f8 (diff)
Feature v4.2 stream bridgev4.2.33
-rw-r--r--CMakeLists.txt1
-rw-r--r--include/private/project_requirement.h4
-rw-r--r--include/private/stream_internal.h5
-rw-r--r--include/public/stream.h3
-rw-r--r--include/public/stream_inc/stream_bridge.h100
-rw-r--r--include/public/stream_inc/stream_project.h8
-rw-r--r--src/dealpkt/deal_tcp.c7
-rw-r--r--src/dealpkt/deal_udp.c16
-rw-r--r--src/dealpkt/stream_manage.c4
-rw-r--r--src/project/CMakeLists.txt2
-rw-r--r--src/project/project_requirement.c116
-rw-r--r--src/project/raw_ip_frag_list.c4
-rw-r--r--src/project/stream_bridge.c286
-rw-r--r--test/test_app_sapp.c8
14 files changed, 484 insertions, 80 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 6e77057..b4e73ea 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -268,6 +268,7 @@ install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_proxy.h DES
install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_rawpkt.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER)
install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_tunnel.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER)
install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/sapp_inject.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER)
+install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_bridge.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER)
install(FILES ${CMAKE_BINARY_DIR}/src/entry/libsapp_dev.a DESTINATION . COMPONENT HEADER)
diff --git a/include/private/project_requirement.h b/include/private/project_requirement.h
index 44a17da..7c0db8a 100644
--- a/include/private/project_requirement.h
+++ b/include/private/project_requirement.h
@@ -10,6 +10,10 @@
extern "C" {
#endif
+#define RAW_FRAG_V4_MODULE_NAME "ipv4_frag_list"
+#define RAW_FRAG_V6_MODULE_NAME "ipv6_frag_list"
+
+
void *project_requirement_create(int thread_num);
void project_requirement_destroy(int thread_num, void *pproject);
int raw_ip_frag_list_stream_attach(struct streaminfo *stream);
diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h
index 7953013..2927400 100644
--- a/include/private/stream_internal.h
+++ b/include/private/stream_internal.h
@@ -191,6 +191,8 @@ struct streaminfo_private
char has_duplicate_pkt; /* ��ǰ�������й��ظ����ݰ�, ����������Ӧ��ÿ������� */
char __pad3[6];
/* ===8 bytes=== */
+
+ void *stream_bridge; /* 20210425 lijia add, ����project����, ֧��ͬ�����첽����ģʽ, ԭ��project�����첽ģʽ */
};
struct tcpdetail_private
@@ -391,6 +393,9 @@ int sapp_identify_broad_multicast_init(void);
int plugin_manage_init(void);
void *plugin_get_plug_entry(const char *plug_name, const char *plug_entry_type);
int libsapp_setup_env(int argc, char *argv[]);
+void *stream_bridge_create_per_stream(int thread_num);
+void stream_bridge_destroy_per_stream(const struct streaminfo *stream, void *pbridge);
+
#ifdef __cplusplus
}
diff --git a/include/public/stream.h b/include/public/stream.h
index b332ff6..c380340 100644
--- a/include/public/stream.h
+++ b/include/public/stream.h
@@ -4,6 +4,7 @@
#include "stream_inc/stream_base.h"
#include "stream_inc/stream_proxy.h"
#include "stream_inc/stream_project.h"
+#include "stream_inc/stream_bridge.h"
#include "stream_inc/stream_inject.h"
#include "stream_inc/stream_control.h"
#include "stream_inc/stream_entry.h"
@@ -12,7 +13,7 @@
#include "stream_inc/sapp_inject.h"
-#define STREAM_H_VERSION (20191118)
+#define STREAM_H_VERSION (20210425)
#endif
diff --git a/include/public/stream_inc/stream_bridge.h b/include/public/stream_inc/stream_bridge.h
new file mode 100644
index 0000000..3164854
--- /dev/null
+++ b/include/public/stream_inc/stream_bridge.h
@@ -0,0 +1,100 @@
+#ifndef _SAPP_STREAM_BRIDGE_H_
+#define _SAPP_STREAM_BRIDGE_H_
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include <errno.h>
+
+
+/***************************************** stream bridge API *************************************************
+
+ stream_bridge_xxx接口用于在不同插件之间, 创建一个通道, 支持同步和异步两种方式互相通信, 支持多对多的关系.
+
+ 同步模式采用callback注册方式, 避免直接使用extern声明函数, 动态、灵活、可扩展.
+ 异步模式是在streaminfo中保留一份数据, 功能同之前的project_req_xxx系列接口.
+
+ 假设插件plug A,X,Y,Z都注册同一个bridge name:"bridge_demo", bridge_id=1,
+
+ plug A使用异步模式在streaminfo, bridge=1的空间存储了一份数据, receiver X,Y,Z可以在streaminfo生存周期内取数据.
+ -------------------
+ | streaminfo | plug X
+ |-----------------| /
+ plug A-->|bridge1 -> data1 |/--plug Y
+ |-----------------|\
+ |bridge2 -> data2 | \ plug Z
+ |-----------------|
+ |bridge3 -> data3 |
+ -------------------
+
+
+ plugA发送一条同步消息, receiver X,Y,Z都可以依次收到.
+
+ -------------------- plug X
+ | /
+ plug A ->| bridge_id = 1 /--plug Y
+ | \
+ | \ plug Z
+ --------------------
+
+*******************************************************************************************************************/
+
+
+/*
+ bridge_name: 全局唯一, 与之前的project使用不同的命名空间, 可以重名,
+ 但是bridge_id与project_req_id不能通用!!!
+
+ rw_mode类似fopen函数的参数, 只支持两个模式:
+ "r": 只读,bridge_name不存在则返回错误;
+ "w": bridge_name不存在则创建一个新的bridge,
+ 注意, 如果其他插件已经创建了同名bridge, 再次用"w"模式打开, 不能像fopen以截断模式重新打开一个文件, 其实bridge_id是一样的!
+ 建议先用"r"模式验证是否有无同名bridge;
+
+ 返回值是bridge_id,
+ >=0 : success
+ <0 : error
+*/
+int stream_bridge_build(const char *bridge_name, const char *rw_mode);
+
+
+typedef void stream_bridge_free_cb_t(const struct streaminfo *stream, int bridge_id, void *data);
+typedef int stream_bridge_sync_cb_t(const struct streaminfo *stream, int bridge_id, void *data);
+
+
+/*
+ 注意: free函数最多只能有一个, 后面会覆盖前面的函数指针, 多次重复注册只保留最后一个!
+*/
+int stream_bridge_register_data_free_cb(int bridge_id, stream_bridge_free_cb_t *free_cb_fun);
+
+/* sync回调函数可以是多个, 调用stream_bridge_sync_data_put()时, 会调用所有注册的callback函数 */
+int stream_bridge_register_data_sync_cb(int bridge_id, stream_bridge_sync_cb_t *sync_cb_fun);
+
+/*
+ 返回值:
+ 0: 没有stream_bridge_sync_cb_t函数;
+ -1: error
+ N: 成功调用stream_bridge_sync_cb_t函数的数量;
+*/
+int stream_bridge_sync_data_put(const struct streaminfo *stream, int bridge_id, void *data);
+
+/* 等同于之前的project_req_add_xxx, 注意如果data是malloc的内存, 要注册stream_bridge_free_cb_t, streaminfo在close时会自动free */
+int stream_bridge_async_data_put(const struct streaminfo *stream, int bridge_id, void *data);
+
+/*
+ 返回值:
+ 非NULL: 插件曾经调用stream_bridge_async_data_put()存储的data值.
+ NULL : 此时会产生一种歧义, 如果stream_bridge_async_data_put()就是存了一个空指针, 或者利用C的特性, 存了一个整数0,
+ 如何区别是错误还是原始数据就是0?
+
+ 如果返回值是NULL的情况下, 插件要再判断一下errno, errno == ENODATA (61), 说明确实没有插件曾经存储过数据, 是个错误.
+*/
+void *stream_bridge_async_data_get(const struct streaminfo *stream, int bridge_id); /* 等同于之前的project_get_xxx */
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
+
diff --git a/include/public/stream_inc/stream_project.h b/include/public/stream_inc/stream_project.h
index 76fc365..1a967ec 100644
--- a/include/public/stream_inc/stream_project.h
+++ b/include/public/stream_inc/stream_project.h
@@ -7,7 +7,7 @@
extern "C" {
#endif
-#define STREAM_PROJECT_H_VERSION (20210224)
+#define STREAM_PROJECT_H_VERSION (20210422)
#define PROJECT_REQ_NAME_MAX_LEN (64)
@@ -96,11 +96,6 @@ int project_producer_register(const char *project_req_name, const char *project_
int project_customer_register(const char *project_req_name, const char *project_req_val_type);
-typedef int project_notify_cb_t(const struct streaminfo *stream, int project_req_id, void *a_packet, void *user_arg);
-int project_customer_register_with_notify(const char *project_req_name, const char *project_req_val_type, project_notify_cb_t callback, void *user_arg);
-int project_producer_notify(const struct streaminfo *stream, int project_req_id, void *a_packet);
-
-
/*
Function project_req_add_struct(): 'project_req_value' must be a pointer to heap memory(obtain by malloc).
@@ -156,6 +151,7 @@ unsigned long project_req_get_ulong(const struct streaminfo *stream, int project
*/
const void *project_req_get_struct(const struct streaminfo *stream, int project_req_id);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c
index dd96301..0607892 100644
--- a/src/dealpkt/deal_tcp.c
+++ b/src/dealpkt/deal_tcp.c
@@ -733,6 +733,7 @@ static struct streamindex *tcp_add_new_stream_bysyn(struct streamindex *pindex,
/* LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */
pstream_pr->pproject = project_requirement_create(pstream->threadnum);
+ pstream_pr->stream_bridge = stream_bridge_create_per_stream(pstream->threadnum);
/* 2015-12-28 lijia add */
if(G_TCP_FLOW_STAT_PROJECT_ID != -1){
@@ -982,6 +983,7 @@ static struct streamindex *tcp_add_new_stream_bydata(struct streamindex *pindex,
/* 2014-03-26 LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */
pstream_pr->pproject = project_requirement_create(threadnum);
+ pstream_pr->stream_bridge = stream_bridge_create_per_stream(pstream->threadnum);
/* 2016-07-14 lijia add */
if(G_TCP_FLOW_STAT_PROJECT_ID != -1){
@@ -1272,6 +1274,8 @@ int tcp_free_stream(struct streamindex *pindex, const void *this_ip_hdr, const v
project_requirement_destroy(threadnum, pstream_pr->pproject);
pstream_pr->pproject = NULL;
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+ pstream_pr->stream_bridge = NULL;
if(pstream_pr->timeout > link_default_nopkt_time){ /* ���Ӷ��г�ʱʱ��, ���ټ��� */
sapp_global_mthread[threadnum].tcp_stream_special_timeout_num--;
@@ -1406,6 +1410,9 @@ static int tcp_reset_stream(struct streamindex *pindex,const void *this_iphdr,
project_requirement_destroy(threadnum, pstream_pr->pproject);
pstream_pr->pproject = NULL;
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+ pstream_pr->stream_bridge = NULL;
+
/* �ָ�֮ǰ�洢��raw_pkt */
pstream_pr->raw_pkt = saved_raw_pkt;
diff --git a/src/dealpkt/deal_udp.c b/src/dealpkt/deal_udp.c
index 7b72603..ae4bcb5 100644
--- a/src/dealpkt/deal_udp.c
+++ b/src/dealpkt/deal_udp.c
@@ -204,6 +204,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex,
/* 2014-03-26 LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */
pstream_udp_pr->pproject = project_requirement_create(threadnum);
+ pstream_udp_pr->stream_bridge = stream_bridge_create_per_stream(threadnum);
/* 2016-07-14 lijia add */
if(G_UDP_FLOW_STAT_PROJECT_ID != -1){
@@ -264,6 +265,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex, struct
pstream_udp->dir=createdir;
pstream_udp_pr->pproject = project_requirement_create(pstream_udp->threadnum);
+ pstream_udp_pr->stream_bridge = stream_bridge_create_per_stream(threadnum);
return a_udp;
}
@@ -308,6 +310,10 @@ void udp_free_stream(struct streamindex *pindex)
project_requirement_destroy(threadnum, pstream_pr->pproject);
pstream_pr->pproject = NULL;
+
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+ pstream_pr->stream_bridge = NULL;
+
if(pstream_pr->timeout > udp_reset_time){ /* ���Ӷ��г�ʱʱ��, ���ټ��� */
sapp_global_mthread[threadnum].udp_stream_special_timeout_num--;
}
@@ -359,7 +365,10 @@ static int udp_reset_stream_bytime(struct streamindex *pindex)
stream_process_udp(pstream,NULL,NULL,NULL,&(pdetail_pr->apme),&(pstream->opstate));
project_requirement_destroy(threadnum, pstream_pr->pproject);
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+
pstream_pr->pproject = project_requirement_create(threadnum);
+ pstream_pr->stream_bridge = stream_bridge_create_per_stream(threadnum);
/* 2015-12-28 lijia add, ������, ���������ڴ� */
if(G_UDP_FLOW_STAT_PROJECT_ID != -1){
@@ -517,6 +526,7 @@ int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_
pstream->pdetail=(void *)pdetail;
pstream_pr->pproject = project_requirement_create(thread_num);
+ pstream_pr->stream_bridge = stream_bridge_create_per_stream(thread_num);
if(G_UDP_FLOW_STAT_PROJECT_ID != -1){
pdetail_pr.flow_stat = (struct udp_flow_stat *)dictator_malloc(thread_num, sizeof(struct udp_flow_stat));
@@ -527,6 +537,9 @@ int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_
ret=stream_process_udp(pstream, (const void *)this_iphdr, (const void *)udph, raw_pkt,&tmpme,&(pstream->opstate));
project_requirement_destroy(thread_num, pstream_pr->pproject);
pstream_pr->pproject = NULL;
+
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+ pstream_pr->stream_bridge = NULL;
}
else
{
@@ -701,6 +714,7 @@ int dealipv6udppkt(struct streamindex *pindex,const struct mesa_ip6_hdr *a_packe
pstream_pr->offset_to_ip_hdr = (char *)udph - (char *)a_packet;
pstream_pr->pproject = project_requirement_create(thread_num);
+ pstream_pr->stream_bridge = stream_bridge_create_per_stream(thread_num);
if(G_UDP_FLOW_STAT_PROJECT_ID != -1){
pdetail_pr->flow_stat = (struct udp_flow_stat *)dictator_malloc(thread_num, sizeof(struct udp_flow_stat));
@@ -711,6 +725,8 @@ int dealipv6udppkt(struct streamindex *pindex,const struct mesa_ip6_hdr *a_packe
ret=stream_process_udp(pstream, (const void *)a_packet, (const void *)udph, raw_pkt,&tmpme, &pstream->opstate);
project_requirement_destroy(thread_num, pstream_pr->pproject);
pstream_pr->pproject = NULL;
+ stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge);
+ pstream_pr->stream_bridge = NULL;
}
else
{
diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c
index f342fd2..5250321 100644
--- a/src/dealpkt/stream_manage.c
+++ b/src/dealpkt/stream_manage.c
@@ -517,6 +517,10 @@ int init_stream_manage(int threadcount)
//add 20140626 ��ʼ��procject�ӿ�
project_requirement_global_init();
+ if(stream_bridge_init() < 0){
+ exit(1);
+ }
+
return 0;
}
diff --git a/src/project/CMakeLists.txt b/src/project/CMakeLists.txt
index 822e387..dfeceab 100644
--- a/src/project/CMakeLists.txt
+++ b/src/project/CMakeLists.txt
@@ -10,4 +10,4 @@ include_directories(${MESA_SDK_PREFIX}/include)
include_directories(${MESA_SDK_PREFIX}/include/MESA)
-add_library(project STATIC project_requirement.c raw_ip_frag_list.c )
+add_library(project STATIC project_requirement.c raw_ip_frag_list.c stream_bridge.c )
diff --git a/src/project/project_requirement.c b/src/project/project_requirement.c
index 97e2f2c..76e7e81 100644
--- a/src/project/project_requirement.c
+++ b/src/project/project_requirement.c
@@ -31,18 +31,12 @@ struct pro_req_type{
char *type_str;
};
-typedef struct{
- project_notify_cb_t *callback_fun;
- void *notify_callback_user_arg;
-}project_callback_para_t;
-
typedef struct project_req_m{
int project_req_id;
char project_req_name[PROJECT_REQ_NAME_MAX_LEN];
project_req_free_t *free_cb;
int project_req_type;
- MESA_list_t nofity_callback_fun_list;
}project_req_manage_t;
struct project_req_info{
@@ -172,57 +166,6 @@ int project_customer_register(const char *project_req_name, const char *project_
return __project_find_id_by_name(project_req_name, project_req_val_type);
}
-int project_customer_register_with_notify(const char *project_req_name, const char *project_req_val_type, project_notify_cb_t callback_fun, void *user_arg)
-{
- int project_id;
- struct MESA_list *new_list_node;
- project_callback_para_t *new_callback;
-
- if(NULL == callback_fun){
- return -1;
- }
-
- project_id = project_customer_register(project_req_name, project_req_val_type);
- if(project_id < 0){
- return -1;
- }
-
- new_list_node = (struct MESA_list *)malloc(sizeof(struct MESA_list));
- new_callback = (project_callback_para_t *)malloc(sizeof(project_callback_para_t));
- new_callback->callback_fun = callback_fun;
- new_callback->notify_callback_user_arg = user_arg;
-
- new_list_node->quiddity = new_callback;
-
- MESA_list_add(&G_PROJECT_REQ_MANAGE[project_id].nofity_callback_fun_list, new_list_node);
-
- return project_id;
-}
-
-
-int project_producer_notify(const struct streaminfo *stream, int project_req_id, void *a_packet)
-{
- MESA_list_t *list_node;
- project_callback_para_t *callback_para;
-
- if((project_req_id < 0) || (project_req_id >= PROJECT_REQ_MAX_PLUG_NUM)){
- sapp_runtime_log(30, "project_producer_notify() error, invalid project_id:%d", project_req_id);
- return -1;
- }
-
- list_node = G_PROJECT_REQ_MANAGE[project_req_id].nofity_callback_fun_list.nextele;
- while(list_node != &G_PROJECT_REQ_MANAGE[project_req_id].nofity_callback_fun_list){
- callback_para = (project_callback_para_t *)list_node->quiddity;
- if(callback_para->callback_fun){
- callback_para->callback_fun(stream, project_req_id, a_packet, callback_para->notify_callback_user_arg);
- }
- list_node = list_node->nextele;
- }
-
- return 0;
-}
-
-
int parse_project_req_conf(void)
{
@@ -287,6 +230,48 @@ int parse_project_req_conf(void)
return 0;
}
+
+static int project_req_conf_is_exist(const char *check_name)
+{
+ char *save_ptr;
+ const char *delim = "\t ";
+ char line_buf[1024];
+ FILE *fp;
+ char *preq_name;
+ int line_num = 0;
+
+ fp = fopen(ABBR_PROJECT_LIST_CONF_FILE, "r");
+ if(NULL == fp){
+ sapp_log(RLOG_LV_DEBUG, 1, 1, "Open '%s' error: %s", ABBR_PROJECT_LIST_CONF_FILE, strerror(errno));
+ return 0;
+ }
+
+ while(fgets(line_buf, 1024, fp) != NULL){
+ if('#' == line_buf[0] || ' ' == line_buf[0] || '\r' == line_buf[0] || '\n' == line_buf[0]){
+ line_num++;
+ continue;
+ }
+ del_last_rn(line_buf, 1024);
+
+ preq_name = strtok_r(line_buf, delim, &save_ptr);
+ if(NULL == preq_name){
+ printf("invalid conf in %s, line %d\n", ABBR_PROJECT_LIST_CONF_FILE, line_num);
+ return 0;
+ }
+
+ if(strncasecmp(preq_name, check_name, strlen(preq_name)) == 0){
+ return 1;
+ }
+
+ while(strtok_r(NULL, delim, &save_ptr));
+ line_num++;
+ }
+
+ fclose(fp);
+ return 0;
+}
+
+
static int __project_req_add(struct streaminfo *stream, int project_req_id, long long project_req_value, int pro_req_type)
{
project_req_info_t *pinfo;
@@ -578,10 +563,6 @@ static void project_env_init()
MESA_htable_destroy(G_PROJECT_NAME_HASH, NULL);
}
}
-
- for(i = 0; i < PROJECT_REQ_MAX_PLUG_NUM; i++){
- MESA_list_init_head(&G_PROJECT_REQ_MANAGE[i].nofity_callback_fun_list);
- }
return;
}
@@ -595,10 +576,17 @@ int project_requirement_global_init(void)
//parse_project_req_conf();
/* ƽ̨����projectģ���ʼ��, �����Ƿ�����ȡ���������ļ����� */
- project_req_terminal_tag_init();
- raw_ip_frag_list_v4_init();
- raw_ip_frag_list_v6_init();
+ //project_req_terminal_tag_init();
+ /* projectע���Ϊ��̬, ����project_list.conf������,
+ ����������Щ���ù���Ϊ����ǰ����,���Ǽ��һ��project_list.conf�Ƿ����, �������򲻿��� */
+ if(project_req_conf_is_exist(RAW_FRAG_V4_MODULE_NAME)){
+ raw_ip_frag_list_v4_init();
+ }
+ if(project_req_conf_is_exist(RAW_FRAG_V6_MODULE_NAME)){
+ raw_ip_frag_list_v6_init();
+ }
+
tcp_flow_stat_init();
udp_flow_stat_init();
diff --git a/src/project/raw_ip_frag_list.c b/src/project/raw_ip_frag_list.c
index 9dc2d45..e8aae44 100644
--- a/src/project/raw_ip_frag_list.c
+++ b/src/project/raw_ip_frag_list.c
@@ -9,10 +9,6 @@ extern "C" {
#include "sapp_api.h"
#include "sapp_private_api.h"
-
-#define RAW_FRAG_V4_MODULE_NAME "ipv4_frag_list"
-#define RAW_FRAG_V6_MODULE_NAME "ipv6_frag_list"
-
//extern int g_packet_io_thread_num;
static int g_raw_ip_frag_list_v4_plugid = -1;
diff --git a/src/project/stream_bridge.c b/src/project/stream_bridge.c
new file mode 100644
index 0000000..7d3a2a7
--- /dev/null
+++ b/src/project/stream_bridge.c
@@ -0,0 +1,286 @@
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+#include "sapp_api.h"
+#include "sapp_private_api.h"
+#include "MESA_htable.h"
+#include "MESA_list.h"
+
+#define STREAM_BRIDEG_MAX_NUM (128)
+
+#define STREAM_BRIDGE_MAGIC_SUCC (0x62726765)
+
+typedef struct{
+ int magic_num; /* 表示当前bridge曾经注册成功过 */
+ int bridge_id;
+ char bridge_name[NAME_MAX];
+ stream_bridge_free_cb_t *async_data_free_cb;
+ MESA_list_t sync_callback_fun_list; /* 支持多个接收者 */
+}stream_bridge_manage_t;
+
+struct stream_bridge_item{
+ long long value;
+ char valid_flag; /* 用于识别初始状态值为0, 和外部调用者设置了0, 两种值相同但不同的状态 */
+}__attribute__((packed, aligned(1)));
+typedef struct stream_bridge_item stream_bridge_item_t;
+
+
+static int g_stream_bridge_actual_num;
+static stream_bridge_manage_t g_stream_bridge_manage[STREAM_BRIDEG_MAX_NUM];
+
+static MESA_htable_handle g_stream_bridge_name2id_htable;
+
+static MESA_htable_handle stream_bridge_name2id_htable_create(void)
+{
+ MESA_htable_handle htable;
+ int ret, opt_int;
+
+ htable = MESA_htable_born();
+ assert(htable != NULL);
+
+ opt_int = 0;
+ MESA_htable_set_opt(htable, MHO_THREAD_SAFE, &opt_int, sizeof(int));
+
+ opt_int = 1024 * 32;
+ MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, &opt_int, sizeof(int));
+
+ opt_int = 1000;
+ MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, &opt_int, sizeof(int));
+
+ opt_int = 0;
+ MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, &opt_int, sizeof(int));
+
+ opt_int = 0;
+ MESA_htable_set_opt(htable, MHO_AUTO_UPDATE_TIME, &opt_int, sizeof(int));
+
+ opt_int = 0;
+ MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, &opt_int, sizeof(int));
+
+ opt_int = HASH_ELIMINATE_ALGO_FIFO;
+ MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, &opt_int, sizeof(int));
+
+ ret = MESA_htable_mature(htable);
+ if(ret < 0){
+ return NULL;
+ }
+
+ return htable;
+}
+
+
+static int stream_bridge_name2id(const char *bridge_name)
+{
+ int *bridge_id;
+
+ bridge_id = (int *)MESA_htable_search(g_stream_bridge_name2id_htable, (const unsigned char *)bridge_name, strlen(bridge_name));
+ if(NULL == bridge_id){
+ return -1;
+ }
+
+ return *bridge_id;
+}
+
+static int stream_bridege_create_new_bridge(const char *bridge_name)
+{
+ int *bridge_id, ret;
+
+ bridge_id = (int *)malloc(sizeof(int));
+
+ *bridge_id = g_stream_bridge_actual_num;
+
+ ret = MESA_htable_add(g_stream_bridge_name2id_htable, (const unsigned char *)bridge_name, strlen(bridge_name), (void *)bridge_id);
+ assert(ret >= 0);
+
+ memset(&g_stream_bridge_manage[g_stream_bridge_actual_num], 0, sizeof(stream_bridge_manage_t));
+ g_stream_bridge_manage[g_stream_bridge_actual_num].magic_num = STREAM_BRIDGE_MAGIC_SUCC;
+ g_stream_bridge_manage[g_stream_bridge_actual_num].bridge_id = g_stream_bridge_actual_num;
+ strncpy(g_stream_bridge_manage[g_stream_bridge_actual_num].bridge_name,bridge_name, strlen(bridge_name));
+ MESA_list_init_head(&g_stream_bridge_manage[g_stream_bridge_actual_num].sync_callback_fun_list);
+
+ g_stream_bridge_actual_num++;
+
+ return *bridge_id;
+}
+
+
+/* 新建一个流结构时调用 */
+void *stream_bridge_create_per_stream(int thread_num)
+{
+ stream_bridge_item_t *stream_bridge_array;
+
+ if(0 == g_stream_bridge_actual_num){
+ return NULL;
+ }
+
+ stream_bridge_array = (stream_bridge_item_t *)dictator_malloc(thread_num, sizeof(stream_bridge_item_t) * g_stream_bridge_actual_num);
+ memset(stream_bridge_array, 0, sizeof(stream_bridge_item_t) * g_stream_bridge_actual_num);
+
+ return (void *)stream_bridge_array;
+}
+
+void stream_bridge_destroy_per_stream(const struct streaminfo *stream, void *pbridge)
+{
+ int i;
+ stream_bridge_item_t *bridge_array;
+
+ if(NULL == pbridge){
+ return;
+ }
+
+ bridge_array = (stream_bridge_item_t *)(pbridge);
+
+ for(i = 0; i < g_stream_bridge_actual_num; i++){
+ if((g_stream_bridge_manage[i].async_data_free_cb) && (bridge_array[i].value != 0)){
+ g_stream_bridge_manage[i].async_data_free_cb(stream, i, (void *)(bridge_array[i].value));
+ }
+ }
+
+ dictator_free(stream->threadnum, pbridge);
+
+ return;
+}
+
+
+int stream_bridge_build(const char *bridge_name, const char *rw_mode)
+{
+ int bridge_id;
+
+ if(strlen(bridge_name) >= NAME_MAX){
+ sapp_runtime_log(RLOG_LV_FATAL, "stream_bridge_build() error, bridge name too long:%s", bridge_name);
+ errno = EINVAL ;
+ return -1;
+ }
+
+ bridge_id = stream_bridge_name2id(bridge_name);
+
+ if(bridge_id < 0){
+ if(strncasecmp(rw_mode, "r", 1) == 0){
+ return -1;
+ }else if(strncasecmp(rw_mode, "w", 1) == 0){
+ bridge_id = stream_bridege_create_new_bridge(bridge_name);
+ }else{
+ sapp_runtime_log(RLOG_LV_FATAL, "stream_bridge_build() error, unknown mode:%s", rw_mode);
+ return -1;
+ }
+ }
+
+ return bridge_id;
+}
+
+
+int stream_bridge_register_data_sync_cb(int bridge_id, stream_bridge_sync_cb_t *cb_fun)
+{
+ MESA_list_t *new_list_node;
+
+ if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){
+ errno = EINVAL ;
+ return -1;
+ }
+
+ new_list_node = (struct MESA_list *)malloc(sizeof(struct MESA_list));
+ new_list_node->quiddity = cb_fun;
+ MESA_list_add(&g_stream_bridge_manage[bridge_id].sync_callback_fun_list, new_list_node);
+
+ return 0;
+}
+
+int stream_bridge_register_data_free_cb(int bridge_id, stream_bridge_free_cb_t *cb_fun)
+{
+ if((bridge_id >= 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){
+ return -1;
+ }
+
+ g_stream_bridge_manage[bridge_id].async_data_free_cb = cb_fun;
+
+ return 0;
+}
+
+
+int stream_bridge_async_data_put(const struct streaminfo *stream, int bridge_id, void *data)
+{
+ stream_bridge_item_t *bridge_item_array;
+ struct streaminfo_private *stream_pr;
+
+ if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){
+ errno = EINVAL ;
+ return -1;
+ }
+
+ stream_pr = (struct streaminfo_private *)stream;
+ bridge_item_array = (stream_bridge_item_t *)stream_pr->stream_bridge;
+
+ bridge_item_array[bridge_id].value = (long long)data;
+ bridge_item_array[bridge_id].valid_flag = 1;
+
+ return 0;
+}
+
+void *stream_bridge_async_data_get(const struct streaminfo *stream, int bridge_id)
+{
+ void *result;
+ stream_bridge_item_t *bridge_item_array;
+ struct streaminfo_private *stream_pr;
+
+ if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){
+ errno = EINVAL ;
+ return NULL;
+ }
+
+ stream_pr = (struct streaminfo_private *)stream;
+ bridge_item_array = (stream_bridge_item_t *)stream_pr->stream_bridge;
+
+ result = (void *)bridge_item_array[bridge_id].value;
+ if(NULL == result){
+ if(0 == bridge_item_array[bridge_id].valid_flag){
+ errno = ENODATA;
+ }
+ }
+
+ return result;
+
+}
+
+
+int stream_bridge_sync_data_put(const struct streaminfo *stream, int bridge_id, void *data)
+{
+ stream_bridge_sync_cb_t *sync_cb_fun;
+ struct streaminfo_private *stream_pr;
+ MESA_list_t *list_node;
+ int callback_num = 0;
+
+ if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){
+ errno = EINVAL ;
+ return -1;
+ }
+
+ if(MESA_list_is_empty(&g_stream_bridge_manage[bridge_id].sync_callback_fun_list)){
+ return 0;
+ }
+
+ list_node = g_stream_bridge_manage[bridge_id].sync_callback_fun_list.nextele;
+ while(list_node != &g_stream_bridge_manage[bridge_id].sync_callback_fun_list){
+ sync_cb_fun = (stream_bridge_sync_cb_t *)list_node->quiddity;
+ sync_cb_fun(stream, bridge_id, data);
+ list_node = list_node->nextele;
+ callback_num++;
+ }
+
+ return callback_num;
+
+}
+
+
+int stream_bridge_init(void)
+{
+ g_stream_bridge_name2id_htable = stream_bridge_name2id_htable_create();
+ assert(g_stream_bridge_name2id_htable);
+
+ return 0;
+}
+
+#ifdef __cplusplus
+}
+#endif
+
+
diff --git a/test/test_app_sapp.c b/test/test_app_sapp.c
index 8d16edc..3c54f64 100644
--- a/test/test_app_sapp.c
+++ b/test/test_app_sapp.c
@@ -945,8 +945,8 @@ test_set_stream_timeout(pstream, pme, thread_seq, a_packet);
{
printf("%17s: %s, ","tcpstream-plug", printaddr(&(pstream->addr), pstream->threadnum));
//printf("%17s: %s, ","tcpstream-plug", printaddr_r(&(pstream->addr), addr_str_buf, 1024));
- printf("opstate=%d, server pkt=%d, count=%d, client pkt=%d, count=%d\n",
- pstream->opstate,
+ printf("final_dir=%d, server pkt=%d, count=%d, client pkt=%d, count=%d\n",
+ pstream->dir,
raw_pdetail->serverpktnum, raw_pdetail->serverbytes,
raw_pdetail->clientpktnum, raw_pdetail->clientbytes);
free(plug_stat);
@@ -1120,8 +1120,8 @@ char testtcpApp_allpkt(struct streaminfo *pstream,void **pme, int thread_seq,voi
{
printf("%20s: %s, ", "tcpallstream-plug", printaddr(&(pstream->addr), pstream->threadnum));
//printf("index=%d,state=%d ",*(int*)((char *)(pstream)-8),*(char *)((char *)(pstream)-4));
- printf("out:%d, pktstate=%d, opstate=%d, server-pkt=%u, server-count=%lu, client-pkt=%u, client-count=%lu, ",
- pdetail_pr->link_state,pstream->pktstate,pstream->opstate,
+ printf("out:%d, final_dir=%d server-pkt=%u, server-count=%lu, client-pkt=%u, client-count=%lu, ",
+ pdetail_pr->link_state, pstream->dir,
tcpallflow->C2S_all_pkt, tcpallflow->C2S_all_byte,
tcpallflow->S2C_all_pkt, tcpallflow->S2C_all_byte);
printf("total-pkt=%u, ", tcpallflow->C2S_all_pkt + tcpallflow->S2C_all_pkt);