diff options
| author | 李佳 <[email protected]> | 2021-04-26 02:08:18 +0000 |
|---|---|---|
| committer | 李佳 <[email protected]> | 2021-04-26 02:08:18 +0000 |
| commit | 6c467abc5c68c6ed5caab660761c2b525a0deae4 (patch) | |
| tree | d84b2267905d2b2e530eb9612604f42a38c125e8 | |
| parent | 91bc8e3880ecf5c7f5a18a1392c7109c9babe0f8 (diff) | |
Feature v4.2 stream bridgev4.2.33
| -rw-r--r-- | CMakeLists.txt | 1 | ||||
| -rw-r--r-- | include/private/project_requirement.h | 4 | ||||
| -rw-r--r-- | include/private/stream_internal.h | 5 | ||||
| -rw-r--r-- | include/public/stream.h | 3 | ||||
| -rw-r--r-- | include/public/stream_inc/stream_bridge.h | 100 | ||||
| -rw-r--r-- | include/public/stream_inc/stream_project.h | 8 | ||||
| -rw-r--r-- | src/dealpkt/deal_tcp.c | 7 | ||||
| -rw-r--r-- | src/dealpkt/deal_udp.c | 16 | ||||
| -rw-r--r-- | src/dealpkt/stream_manage.c | 4 | ||||
| -rw-r--r-- | src/project/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/project/project_requirement.c | 116 | ||||
| -rw-r--r-- | src/project/raw_ip_frag_list.c | 4 | ||||
| -rw-r--r-- | src/project/stream_bridge.c | 286 | ||||
| -rw-r--r-- | test/test_app_sapp.c | 8 |
14 files changed, 484 insertions, 80 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 6e77057..b4e73ea 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -268,6 +268,7 @@ install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_proxy.h DES install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_rawpkt.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER) install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_tunnel.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER) install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/sapp_inject.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER) +install(FILES ${PROJECT_SOURCE_DIR}/include/public/stream_inc/stream_bridge.h DESTINATION /opt/MESA/include/MESA/stream_inc COMPONENT HEADER) install(FILES ${CMAKE_BINARY_DIR}/src/entry/libsapp_dev.a DESTINATION . COMPONENT HEADER) diff --git a/include/private/project_requirement.h b/include/private/project_requirement.h index 44a17da..7c0db8a 100644 --- a/include/private/project_requirement.h +++ b/include/private/project_requirement.h @@ -10,6 +10,10 @@ extern "C" {
#endif
+#define RAW_FRAG_V4_MODULE_NAME "ipv4_frag_list"
+#define RAW_FRAG_V6_MODULE_NAME "ipv6_frag_list"
+
+
void *project_requirement_create(int thread_num);
void project_requirement_destroy(int thread_num, void *pproject);
int raw_ip_frag_list_stream_attach(struct streaminfo *stream);
diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h index 7953013..2927400 100644 --- a/include/private/stream_internal.h +++ b/include/private/stream_internal.h @@ -191,6 +191,8 @@ struct streaminfo_private char has_duplicate_pkt; /* ��ǰ�������й��ظ����ݰ�, ����������Ӧ��ÿ������� */ char __pad3[6]; /* ===8 bytes=== */ + + void *stream_bridge; /* 20210425 lijia add, ����project����, ֧��ͬ�����첽����ģʽ, ԭ��project�����첽ģʽ */ }; struct tcpdetail_private @@ -391,6 +393,9 @@ int sapp_identify_broad_multicast_init(void); int plugin_manage_init(void); void *plugin_get_plug_entry(const char *plug_name, const char *plug_entry_type); int libsapp_setup_env(int argc, char *argv[]); +void *stream_bridge_create_per_stream(int thread_num); +void stream_bridge_destroy_per_stream(const struct streaminfo *stream, void *pbridge); + #ifdef __cplusplus } diff --git a/include/public/stream.h b/include/public/stream.h index b332ff6..c380340 100644 --- a/include/public/stream.h +++ b/include/public/stream.h @@ -4,6 +4,7 @@ #include "stream_inc/stream_base.h"
#include "stream_inc/stream_proxy.h"
#include "stream_inc/stream_project.h"
+#include "stream_inc/stream_bridge.h"
#include "stream_inc/stream_inject.h"
#include "stream_inc/stream_control.h"
#include "stream_inc/stream_entry.h"
@@ -12,7 +13,7 @@ #include "stream_inc/sapp_inject.h"
-#define STREAM_H_VERSION (20191118)
+#define STREAM_H_VERSION (20210425)
#endif
diff --git a/include/public/stream_inc/stream_bridge.h b/include/public/stream_inc/stream_bridge.h new file mode 100644 index 0000000..3164854 --- /dev/null +++ b/include/public/stream_inc/stream_bridge.h @@ -0,0 +1,100 @@ +#ifndef _SAPP_STREAM_BRIDGE_H_ +#define _SAPP_STREAM_BRIDGE_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include <errno.h> + + +/***************************************** stream bridge API ************************************************* + + stream_bridge_xxx接口用于在不同插件之间, 创建一个通道, 支持同步和异步两种方式互相通信, 支持多对多的关系. + + 同步模式采用callback注册方式, 避免直接使用extern声明函数, 动态、灵活、可扩展. + 异步模式是在streaminfo中保留一份数据, 功能同之前的project_req_xxx系列接口. + + 假设插件plug A,X,Y,Z都注册同一个bridge name:"bridge_demo", bridge_id=1, + + plug A使用异步模式在streaminfo, bridge=1的空间存储了一份数据, receiver X,Y,Z可以在streaminfo生存周期内取数据. + ------------------- + | streaminfo | plug X + |-----------------| / + plug A-->|bridge1 -> data1 |/--plug Y + |-----------------|\ + |bridge2 -> data2 | \ plug Z + |-----------------| + |bridge3 -> data3 | + ------------------- + + + plugA发送一条同步消息, receiver X,Y,Z都可以依次收到. + + -------------------- plug X + | / + plug A ->| bridge_id = 1 /--plug Y + | \ + | \ plug Z + -------------------- + +*******************************************************************************************************************/ + + +/* + bridge_name: 全局唯一, 与之前的project使用不同的命名空间, 可以重名, + 但是bridge_id与project_req_id不能通用!!! + + rw_mode类似fopen函数的参数, 只支持两个模式: + "r": 只读,bridge_name不存在则返回错误; + "w": bridge_name不存在则创建一个新的bridge, + 注意, 如果其他插件已经创建了同名bridge, 再次用"w"模式打开, 不能像fopen以截断模式重新打开一个文件, 其实bridge_id是一样的! + 建议先用"r"模式验证是否有无同名bridge; + + 返回值是bridge_id, + >=0 : success + <0 : error +*/ +int stream_bridge_build(const char *bridge_name, const char *rw_mode); + + +typedef void stream_bridge_free_cb_t(const struct streaminfo *stream, int bridge_id, void *data); +typedef int stream_bridge_sync_cb_t(const struct streaminfo *stream, int bridge_id, void *data); + + +/* + 注意: free函数最多只能有一个, 后面会覆盖前面的函数指针, 多次重复注册只保留最后一个! +*/ +int stream_bridge_register_data_free_cb(int bridge_id, stream_bridge_free_cb_t *free_cb_fun); + +/* sync回调函数可以是多个, 调用stream_bridge_sync_data_put()时, 会调用所有注册的callback函数 */ +int stream_bridge_register_data_sync_cb(int bridge_id, stream_bridge_sync_cb_t *sync_cb_fun); + +/* + 返回值: + 0: 没有stream_bridge_sync_cb_t函数; + -1: error + N: 成功调用stream_bridge_sync_cb_t函数的数量; +*/ +int stream_bridge_sync_data_put(const struct streaminfo *stream, int bridge_id, void *data); + +/* 等同于之前的project_req_add_xxx, 注意如果data是malloc的内存, 要注册stream_bridge_free_cb_t, streaminfo在close时会自动free */ +int stream_bridge_async_data_put(const struct streaminfo *stream, int bridge_id, void *data); + +/* + 返回值: + 非NULL: 插件曾经调用stream_bridge_async_data_put()存储的data值. + NULL : 此时会产生一种歧义, 如果stream_bridge_async_data_put()就是存了一个空指针, 或者利用C的特性, 存了一个整数0, + 如何区别是错误还是原始数据就是0? + + 如果返回值是NULL的情况下, 插件要再判断一下errno, errno == ENODATA (61), 说明确实没有插件曾经存储过数据, 是个错误. +*/ +void *stream_bridge_async_data_get(const struct streaminfo *stream, int bridge_id); /* 等同于之前的project_get_xxx */ + + +#ifdef __cplusplus +} +#endif + +#endif + diff --git a/include/public/stream_inc/stream_project.h b/include/public/stream_inc/stream_project.h index 76fc365..1a967ec 100644 --- a/include/public/stream_inc/stream_project.h +++ b/include/public/stream_inc/stream_project.h @@ -7,7 +7,7 @@ extern "C" {
#endif
-#define STREAM_PROJECT_H_VERSION (20210224)
+#define STREAM_PROJECT_H_VERSION (20210422)
#define PROJECT_REQ_NAME_MAX_LEN (64)
@@ -96,11 +96,6 @@ int project_producer_register(const char *project_req_name, const char *project_ int project_customer_register(const char *project_req_name, const char *project_req_val_type);
-typedef int project_notify_cb_t(const struct streaminfo *stream, int project_req_id, void *a_packet, void *user_arg);
-int project_customer_register_with_notify(const char *project_req_name, const char *project_req_val_type, project_notify_cb_t callback, void *user_arg);
-int project_producer_notify(const struct streaminfo *stream, int project_req_id, void *a_packet);
-
-
/*
Function project_req_add_struct(): 'project_req_value' must be a pointer to heap memory(obtain by malloc).
@@ -156,6 +151,7 @@ unsigned long project_req_get_ulong(const struct streaminfo *stream, int project */
const void *project_req_get_struct(const struct streaminfo *stream, int project_req_id);
+
#ifdef __cplusplus
}
#endif
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c index dd96301..0607892 100644 --- a/src/dealpkt/deal_tcp.c +++ b/src/dealpkt/deal_tcp.c @@ -733,6 +733,7 @@ static struct streamindex *tcp_add_new_stream_bysyn(struct streamindex *pindex, /* LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */ pstream_pr->pproject = project_requirement_create(pstream->threadnum); + pstream_pr->stream_bridge = stream_bridge_create_per_stream(pstream->threadnum); /* 2015-12-28 lijia add */ if(G_TCP_FLOW_STAT_PROJECT_ID != -1){ @@ -982,6 +983,7 @@ static struct streamindex *tcp_add_new_stream_bydata(struct streamindex *pindex, /* 2014-03-26 LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */ pstream_pr->pproject = project_requirement_create(threadnum); + pstream_pr->stream_bridge = stream_bridge_create_per_stream(pstream->threadnum); /* 2016-07-14 lijia add */ if(G_TCP_FLOW_STAT_PROJECT_ID != -1){ @@ -1272,6 +1274,8 @@ int tcp_free_stream(struct streamindex *pindex, const void *this_ip_hdr, const v project_requirement_destroy(threadnum, pstream_pr->pproject); pstream_pr->pproject = NULL; + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->stream_bridge = NULL; if(pstream_pr->timeout > link_default_nopkt_time){ /* ���Ӷ��г�ʱʱ��, ���ټ��� */ sapp_global_mthread[threadnum].tcp_stream_special_timeout_num--; @@ -1406,6 +1410,9 @@ static int tcp_reset_stream(struct streamindex *pindex,const void *this_iphdr, project_requirement_destroy(threadnum, pstream_pr->pproject); pstream_pr->pproject = NULL; + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->stream_bridge = NULL; + /* �ָ�֮ǰ�洢��raw_pkt */ pstream_pr->raw_pkt = saved_raw_pkt; diff --git a/src/dealpkt/deal_udp.c b/src/dealpkt/deal_udp.c index 7b72603..ae4bcb5 100644 --- a/src/dealpkt/deal_udp.c +++ b/src/dealpkt/deal_udp.c @@ -204,6 +204,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex, /* 2014-03-26 LiJia add, ijЩ��Ŀ����Ӧ�ó�ʼ��, ���ǩ��ԭʼIP��Ƭ�б��� */ pstream_udp_pr->pproject = project_requirement_create(threadnum); + pstream_udp_pr->stream_bridge = stream_bridge_create_per_stream(threadnum); /* 2016-07-14 lijia add */ if(G_UDP_FLOW_STAT_PROJECT_ID != -1){ @@ -264,6 +265,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex, struct pstream_udp->dir=createdir; pstream_udp_pr->pproject = project_requirement_create(pstream_udp->threadnum); + pstream_udp_pr->stream_bridge = stream_bridge_create_per_stream(threadnum); return a_udp; } @@ -308,6 +310,10 @@ void udp_free_stream(struct streamindex *pindex) project_requirement_destroy(threadnum, pstream_pr->pproject); pstream_pr->pproject = NULL; + + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->stream_bridge = NULL; + if(pstream_pr->timeout > udp_reset_time){ /* ���Ӷ��г�ʱʱ��, ���ټ��� */ sapp_global_mthread[threadnum].udp_stream_special_timeout_num--; } @@ -359,7 +365,10 @@ static int udp_reset_stream_bytime(struct streamindex *pindex) stream_process_udp(pstream,NULL,NULL,NULL,&(pdetail_pr->apme),&(pstream->opstate)); project_requirement_destroy(threadnum, pstream_pr->pproject); + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->pproject = project_requirement_create(threadnum); + pstream_pr->stream_bridge = stream_bridge_create_per_stream(threadnum); /* 2015-12-28 lijia add, ������, ���������ڴ� */ if(G_UDP_FLOW_STAT_PROJECT_ID != -1){ @@ -517,6 +526,7 @@ int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_ pstream->pdetail=(void *)pdetail; pstream_pr->pproject = project_requirement_create(thread_num); + pstream_pr->stream_bridge = stream_bridge_create_per_stream(thread_num); if(G_UDP_FLOW_STAT_PROJECT_ID != -1){ pdetail_pr.flow_stat = (struct udp_flow_stat *)dictator_malloc(thread_num, sizeof(struct udp_flow_stat)); @@ -527,6 +537,9 @@ int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_ ret=stream_process_udp(pstream, (const void *)this_iphdr, (const void *)udph, raw_pkt,&tmpme,&(pstream->opstate)); project_requirement_destroy(thread_num, pstream_pr->pproject); pstream_pr->pproject = NULL; + + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->stream_bridge = NULL; } else { @@ -701,6 +714,7 @@ int dealipv6udppkt(struct streamindex *pindex,const struct mesa_ip6_hdr *a_packe pstream_pr->offset_to_ip_hdr = (char *)udph - (char *)a_packet; pstream_pr->pproject = project_requirement_create(thread_num); + pstream_pr->stream_bridge = stream_bridge_create_per_stream(thread_num); if(G_UDP_FLOW_STAT_PROJECT_ID != -1){ pdetail_pr->flow_stat = (struct udp_flow_stat *)dictator_malloc(thread_num, sizeof(struct udp_flow_stat)); @@ -711,6 +725,8 @@ int dealipv6udppkt(struct streamindex *pindex,const struct mesa_ip6_hdr *a_packe ret=stream_process_udp(pstream, (const void *)a_packet, (const void *)udph, raw_pkt,&tmpme, &pstream->opstate); project_requirement_destroy(thread_num, pstream_pr->pproject); pstream_pr->pproject = NULL; + stream_bridge_destroy_per_stream(pstream, pstream_pr->stream_bridge); + pstream_pr->stream_bridge = NULL; } else { diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c index f342fd2..5250321 100644 --- a/src/dealpkt/stream_manage.c +++ b/src/dealpkt/stream_manage.c @@ -517,6 +517,10 @@ int init_stream_manage(int threadcount) //add 20140626 ��ʼ��procject�ӿ� project_requirement_global_init(); + if(stream_bridge_init() < 0){ + exit(1); + } + return 0; } diff --git a/src/project/CMakeLists.txt b/src/project/CMakeLists.txt index 822e387..dfeceab 100644 --- a/src/project/CMakeLists.txt +++ b/src/project/CMakeLists.txt @@ -10,4 +10,4 @@ include_directories(${MESA_SDK_PREFIX}/include) include_directories(${MESA_SDK_PREFIX}/include/MESA) -add_library(project STATIC project_requirement.c raw_ip_frag_list.c ) +add_library(project STATIC project_requirement.c raw_ip_frag_list.c stream_bridge.c ) diff --git a/src/project/project_requirement.c b/src/project/project_requirement.c index 97e2f2c..76e7e81 100644 --- a/src/project/project_requirement.c +++ b/src/project/project_requirement.c @@ -31,18 +31,12 @@ struct pro_req_type{ char *type_str;
};
-typedef struct{
- project_notify_cb_t *callback_fun;
- void *notify_callback_user_arg;
-}project_callback_para_t;
-
typedef struct project_req_m{
int project_req_id;
char project_req_name[PROJECT_REQ_NAME_MAX_LEN];
project_req_free_t *free_cb;
int project_req_type;
- MESA_list_t nofity_callback_fun_list;
}project_req_manage_t;
struct project_req_info{
@@ -172,57 +166,6 @@ int project_customer_register(const char *project_req_name, const char *project_ return __project_find_id_by_name(project_req_name, project_req_val_type);
}
-int project_customer_register_with_notify(const char *project_req_name, const char *project_req_val_type, project_notify_cb_t callback_fun, void *user_arg)
-{
- int project_id;
- struct MESA_list *new_list_node;
- project_callback_para_t *new_callback;
-
- if(NULL == callback_fun){
- return -1;
- }
-
- project_id = project_customer_register(project_req_name, project_req_val_type);
- if(project_id < 0){
- return -1;
- }
-
- new_list_node = (struct MESA_list *)malloc(sizeof(struct MESA_list));
- new_callback = (project_callback_para_t *)malloc(sizeof(project_callback_para_t));
- new_callback->callback_fun = callback_fun;
- new_callback->notify_callback_user_arg = user_arg;
-
- new_list_node->quiddity = new_callback;
-
- MESA_list_add(&G_PROJECT_REQ_MANAGE[project_id].nofity_callback_fun_list, new_list_node);
-
- return project_id;
-}
-
-
-int project_producer_notify(const struct streaminfo *stream, int project_req_id, void *a_packet)
-{
- MESA_list_t *list_node;
- project_callback_para_t *callback_para;
-
- if((project_req_id < 0) || (project_req_id >= PROJECT_REQ_MAX_PLUG_NUM)){
- sapp_runtime_log(30, "project_producer_notify() error, invalid project_id:%d", project_req_id);
- return -1;
- }
-
- list_node = G_PROJECT_REQ_MANAGE[project_req_id].nofity_callback_fun_list.nextele;
- while(list_node != &G_PROJECT_REQ_MANAGE[project_req_id].nofity_callback_fun_list){
- callback_para = (project_callback_para_t *)list_node->quiddity;
- if(callback_para->callback_fun){
- callback_para->callback_fun(stream, project_req_id, a_packet, callback_para->notify_callback_user_arg);
- }
- list_node = list_node->nextele;
- }
-
- return 0;
-}
-
-
int parse_project_req_conf(void)
{
@@ -287,6 +230,48 @@ int parse_project_req_conf(void) return 0;
}
+
+static int project_req_conf_is_exist(const char *check_name)
+{
+ char *save_ptr;
+ const char *delim = "\t ";
+ char line_buf[1024];
+ FILE *fp;
+ char *preq_name;
+ int line_num = 0;
+
+ fp = fopen(ABBR_PROJECT_LIST_CONF_FILE, "r");
+ if(NULL == fp){
+ sapp_log(RLOG_LV_DEBUG, 1, 1, "Open '%s' error: %s", ABBR_PROJECT_LIST_CONF_FILE, strerror(errno));
+ return 0;
+ }
+
+ while(fgets(line_buf, 1024, fp) != NULL){
+ if('#' == line_buf[0] || ' ' == line_buf[0] || '\r' == line_buf[0] || '\n' == line_buf[0]){
+ line_num++;
+ continue;
+ }
+ del_last_rn(line_buf, 1024);
+
+ preq_name = strtok_r(line_buf, delim, &save_ptr);
+ if(NULL == preq_name){
+ printf("invalid conf in %s, line %d\n", ABBR_PROJECT_LIST_CONF_FILE, line_num);
+ return 0;
+ }
+
+ if(strncasecmp(preq_name, check_name, strlen(preq_name)) == 0){
+ return 1;
+ }
+
+ while(strtok_r(NULL, delim, &save_ptr));
+ line_num++;
+ }
+
+ fclose(fp);
+ return 0;
+}
+
+
static int __project_req_add(struct streaminfo *stream, int project_req_id, long long project_req_value, int pro_req_type)
{
project_req_info_t *pinfo;
@@ -578,10 +563,6 @@ static void project_env_init() MESA_htable_destroy(G_PROJECT_NAME_HASH, NULL);
}
}
-
- for(i = 0; i < PROJECT_REQ_MAX_PLUG_NUM; i++){
- MESA_list_init_head(&G_PROJECT_REQ_MANAGE[i].nofity_callback_fun_list);
- }
return;
}
@@ -595,10 +576,17 @@ int project_requirement_global_init(void) //parse_project_req_conf();
/* ƽ̨����projectģ���ʼ��, �����Ƿ�����ȡ���������ļ����� */
- project_req_terminal_tag_init();
- raw_ip_frag_list_v4_init();
- raw_ip_frag_list_v6_init();
+ //project_req_terminal_tag_init();
+ /* projectע���Ϊ��̬, ����project_list.conf������,
+ ����������Щ���ù���Ϊ����ǰ����,���Ǽ��һ��project_list.conf�Ƿ����, ���������� */
+ if(project_req_conf_is_exist(RAW_FRAG_V4_MODULE_NAME)){
+ raw_ip_frag_list_v4_init();
+ }
+ if(project_req_conf_is_exist(RAW_FRAG_V6_MODULE_NAME)){
+ raw_ip_frag_list_v6_init();
+ }
+
tcp_flow_stat_init();
udp_flow_stat_init();
diff --git a/src/project/raw_ip_frag_list.c b/src/project/raw_ip_frag_list.c index 9dc2d45..e8aae44 100644 --- a/src/project/raw_ip_frag_list.c +++ b/src/project/raw_ip_frag_list.c @@ -9,10 +9,6 @@ extern "C" { #include "sapp_api.h"
#include "sapp_private_api.h"
-
-#define RAW_FRAG_V4_MODULE_NAME "ipv4_frag_list"
-#define RAW_FRAG_V6_MODULE_NAME "ipv6_frag_list"
-
//extern int g_packet_io_thread_num;
static int g_raw_ip_frag_list_v4_plugid = -1;
diff --git a/src/project/stream_bridge.c b/src/project/stream_bridge.c new file mode 100644 index 0000000..7d3a2a7 --- /dev/null +++ b/src/project/stream_bridge.c @@ -0,0 +1,286 @@ +#ifdef __cplusplus +extern "C" { +#endif + +#include "sapp_api.h" +#include "sapp_private_api.h" +#include "MESA_htable.h" +#include "MESA_list.h" + +#define STREAM_BRIDEG_MAX_NUM (128) + +#define STREAM_BRIDGE_MAGIC_SUCC (0x62726765) + +typedef struct{ + int magic_num; /* 表示当前bridge曾经注册成功过 */ + int bridge_id; + char bridge_name[NAME_MAX]; + stream_bridge_free_cb_t *async_data_free_cb; + MESA_list_t sync_callback_fun_list; /* 支持多个接收者 */ +}stream_bridge_manage_t; + +struct stream_bridge_item{ + long long value; + char valid_flag; /* 用于识别初始状态值为0, 和外部调用者设置了0, 两种值相同但不同的状态 */ +}__attribute__((packed, aligned(1))); +typedef struct stream_bridge_item stream_bridge_item_t; + + +static int g_stream_bridge_actual_num; +static stream_bridge_manage_t g_stream_bridge_manage[STREAM_BRIDEG_MAX_NUM]; + +static MESA_htable_handle g_stream_bridge_name2id_htable; + +static MESA_htable_handle stream_bridge_name2id_htable_create(void) +{ + MESA_htable_handle htable; + int ret, opt_int; + + htable = MESA_htable_born(); + assert(htable != NULL); + + opt_int = 0; + MESA_htable_set_opt(htable, MHO_THREAD_SAFE, &opt_int, sizeof(int)); + + opt_int = 1024 * 32; + MESA_htable_set_opt(htable, MHO_HASH_SLOT_SIZE, &opt_int, sizeof(int)); + + opt_int = 1000; + MESA_htable_set_opt(htable, MHO_HASH_MAX_ELEMENT_NUM, &opt_int, sizeof(int)); + + opt_int = 0; + MESA_htable_set_opt(htable, MHO_EXPIRE_TIME, &opt_int, sizeof(int)); + + opt_int = 0; + MESA_htable_set_opt(htable, MHO_AUTO_UPDATE_TIME, &opt_int, sizeof(int)); + + opt_int = 0; + MESA_htable_set_opt(htable, MHO_SCREEN_PRINT_CTRL, &opt_int, sizeof(int)); + + opt_int = HASH_ELIMINATE_ALGO_FIFO; + MESA_htable_set_opt(htable, MHO_ELIMIMINATE_TYPE, &opt_int, sizeof(int)); + + ret = MESA_htable_mature(htable); + if(ret < 0){ + return NULL; + } + + return htable; +} + + +static int stream_bridge_name2id(const char *bridge_name) +{ + int *bridge_id; + + bridge_id = (int *)MESA_htable_search(g_stream_bridge_name2id_htable, (const unsigned char *)bridge_name, strlen(bridge_name)); + if(NULL == bridge_id){ + return -1; + } + + return *bridge_id; +} + +static int stream_bridege_create_new_bridge(const char *bridge_name) +{ + int *bridge_id, ret; + + bridge_id = (int *)malloc(sizeof(int)); + + *bridge_id = g_stream_bridge_actual_num; + + ret = MESA_htable_add(g_stream_bridge_name2id_htable, (const unsigned char *)bridge_name, strlen(bridge_name), (void *)bridge_id); + assert(ret >= 0); + + memset(&g_stream_bridge_manage[g_stream_bridge_actual_num], 0, sizeof(stream_bridge_manage_t)); + g_stream_bridge_manage[g_stream_bridge_actual_num].magic_num = STREAM_BRIDGE_MAGIC_SUCC; + g_stream_bridge_manage[g_stream_bridge_actual_num].bridge_id = g_stream_bridge_actual_num; + strncpy(g_stream_bridge_manage[g_stream_bridge_actual_num].bridge_name,bridge_name, strlen(bridge_name)); + MESA_list_init_head(&g_stream_bridge_manage[g_stream_bridge_actual_num].sync_callback_fun_list); + + g_stream_bridge_actual_num++; + + return *bridge_id; +} + + +/* 新建一个流结构时调用 */ +void *stream_bridge_create_per_stream(int thread_num) +{ + stream_bridge_item_t *stream_bridge_array; + + if(0 == g_stream_bridge_actual_num){ + return NULL; + } + + stream_bridge_array = (stream_bridge_item_t *)dictator_malloc(thread_num, sizeof(stream_bridge_item_t) * g_stream_bridge_actual_num); + memset(stream_bridge_array, 0, sizeof(stream_bridge_item_t) * g_stream_bridge_actual_num); + + return (void *)stream_bridge_array; +} + +void stream_bridge_destroy_per_stream(const struct streaminfo *stream, void *pbridge) +{ + int i; + stream_bridge_item_t *bridge_array; + + if(NULL == pbridge){ + return; + } + + bridge_array = (stream_bridge_item_t *)(pbridge); + + for(i = 0; i < g_stream_bridge_actual_num; i++){ + if((g_stream_bridge_manage[i].async_data_free_cb) && (bridge_array[i].value != 0)){ + g_stream_bridge_manage[i].async_data_free_cb(stream, i, (void *)(bridge_array[i].value)); + } + } + + dictator_free(stream->threadnum, pbridge); + + return; +} + + +int stream_bridge_build(const char *bridge_name, const char *rw_mode) +{ + int bridge_id; + + if(strlen(bridge_name) >= NAME_MAX){ + sapp_runtime_log(RLOG_LV_FATAL, "stream_bridge_build() error, bridge name too long:%s", bridge_name); + errno = EINVAL ; + return -1; + } + + bridge_id = stream_bridge_name2id(bridge_name); + + if(bridge_id < 0){ + if(strncasecmp(rw_mode, "r", 1) == 0){ + return -1; + }else if(strncasecmp(rw_mode, "w", 1) == 0){ + bridge_id = stream_bridege_create_new_bridge(bridge_name); + }else{ + sapp_runtime_log(RLOG_LV_FATAL, "stream_bridge_build() error, unknown mode:%s", rw_mode); + return -1; + } + } + + return bridge_id; +} + + +int stream_bridge_register_data_sync_cb(int bridge_id, stream_bridge_sync_cb_t *cb_fun) +{ + MESA_list_t *new_list_node; + + if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){ + errno = EINVAL ; + return -1; + } + + new_list_node = (struct MESA_list *)malloc(sizeof(struct MESA_list)); + new_list_node->quiddity = cb_fun; + MESA_list_add(&g_stream_bridge_manage[bridge_id].sync_callback_fun_list, new_list_node); + + return 0; +} + +int stream_bridge_register_data_free_cb(int bridge_id, stream_bridge_free_cb_t *cb_fun) +{ + if((bridge_id >= 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){ + return -1; + } + + g_stream_bridge_manage[bridge_id].async_data_free_cb = cb_fun; + + return 0; +} + + +int stream_bridge_async_data_put(const struct streaminfo *stream, int bridge_id, void *data) +{ + stream_bridge_item_t *bridge_item_array; + struct streaminfo_private *stream_pr; + + if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){ + errno = EINVAL ; + return -1; + } + + stream_pr = (struct streaminfo_private *)stream; + bridge_item_array = (stream_bridge_item_t *)stream_pr->stream_bridge; + + bridge_item_array[bridge_id].value = (long long)data; + bridge_item_array[bridge_id].valid_flag = 1; + + return 0; +} + +void *stream_bridge_async_data_get(const struct streaminfo *stream, int bridge_id) +{ + void *result; + stream_bridge_item_t *bridge_item_array; + struct streaminfo_private *stream_pr; + + if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){ + errno = EINVAL ; + return NULL; + } + + stream_pr = (struct streaminfo_private *)stream; + bridge_item_array = (stream_bridge_item_t *)stream_pr->stream_bridge; + + result = (void *)bridge_item_array[bridge_id].value; + if(NULL == result){ + if(0 == bridge_item_array[bridge_id].valid_flag){ + errno = ENODATA; + } + } + + return result; + +} + + +int stream_bridge_sync_data_put(const struct streaminfo *stream, int bridge_id, void *data) +{ + stream_bridge_sync_cb_t *sync_cb_fun; + struct streaminfo_private *stream_pr; + MESA_list_t *list_node; + int callback_num = 0; + + if((bridge_id < 0) || (bridge_id >= STREAM_BRIDEG_MAX_NUM) || (g_stream_bridge_manage[bridge_id].magic_num != STREAM_BRIDGE_MAGIC_SUCC)){ + errno = EINVAL ; + return -1; + } + + if(MESA_list_is_empty(&g_stream_bridge_manage[bridge_id].sync_callback_fun_list)){ + return 0; + } + + list_node = g_stream_bridge_manage[bridge_id].sync_callback_fun_list.nextele; + while(list_node != &g_stream_bridge_manage[bridge_id].sync_callback_fun_list){ + sync_cb_fun = (stream_bridge_sync_cb_t *)list_node->quiddity; + sync_cb_fun(stream, bridge_id, data); + list_node = list_node->nextele; + callback_num++; + } + + return callback_num; + +} + + +int stream_bridge_init(void) +{ + g_stream_bridge_name2id_htable = stream_bridge_name2id_htable_create(); + assert(g_stream_bridge_name2id_htable); + + return 0; +} + +#ifdef __cplusplus +} +#endif + + diff --git a/test/test_app_sapp.c b/test/test_app_sapp.c index 8d16edc..3c54f64 100644 --- a/test/test_app_sapp.c +++ b/test/test_app_sapp.c @@ -945,8 +945,8 @@ test_set_stream_timeout(pstream, pme, thread_seq, a_packet); { printf("%17s: %s, ","tcpstream-plug", printaddr(&(pstream->addr), pstream->threadnum)); //printf("%17s: %s, ","tcpstream-plug", printaddr_r(&(pstream->addr), addr_str_buf, 1024)); - printf("opstate=%d, server pkt=%d, count=%d, client pkt=%d, count=%d\n", - pstream->opstate, + printf("final_dir=%d, server pkt=%d, count=%d, client pkt=%d, count=%d\n", + pstream->dir, raw_pdetail->serverpktnum, raw_pdetail->serverbytes, raw_pdetail->clientpktnum, raw_pdetail->clientbytes); free(plug_stat); @@ -1120,8 +1120,8 @@ char testtcpApp_allpkt(struct streaminfo *pstream,void **pme, int thread_seq,voi { printf("%20s: %s, ", "tcpallstream-plug", printaddr(&(pstream->addr), pstream->threadnum)); //printf("index=%d,state=%d ",*(int*)((char *)(pstream)-8),*(char *)((char *)(pstream)-4)); - printf("out:%d, pktstate=%d, opstate=%d, server-pkt=%u, server-count=%lu, client-pkt=%u, client-count=%lu, ", - pdetail_pr->link_state,pstream->pktstate,pstream->opstate, + printf("out:%d, final_dir=%d server-pkt=%u, server-count=%lu, client-pkt=%u, client-count=%lu, ", + pdetail_pr->link_state, pstream->dir, tcpallflow->C2S_all_pkt, tcpallflow->C2S_all_byte, tcpallflow->S2C_all_pkt, tcpallflow->S2C_all_byte); printf("total-pkt=%u, ", tcpallflow->C2S_all_pkt + tcpallflow->S2C_all_pkt); |
