summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2020-08-13 19:20:12 +0800
committer李仁杰 <[email protected]>2020-08-13 19:20:12 +0800
commit7946812e0e4d1d307fe8df707e0875154504d11d (patch)
tree002eca4799782b6687c93e3f676ec73c9f857eb0
parent5307549fa36657b88fa9e52f655d805b9922fb9d (diff)
Update lirenjie_vxlan_sapp_20200803.c 20200813,合并PUSH_BUFFER_COUNT个消息
-rw-r--r--lirenjie_vxlan_sapp_20200803.c84
1 files changed, 57 insertions, 27 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c
index 57270fa..82976d7 100644
--- a/lirenjie_vxlan_sapp_20200803.c
+++ b/lirenjie_vxlan_sapp_20200803.c
@@ -19,8 +19,8 @@
#include "stream.h"
#include "rdkafka.h"
#include "libGSJ.h"
-//#include "cJSON.h"
-
+#include "cJSON.h"
+#include "MESA_list_queue.h"
int version_20190827_1605;
static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT);
@@ -34,6 +34,7 @@ extern int g_business_plug_type;
#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803
#define ENABLE_GSJ_THREAD 1 //是否启用GSJ
+#define ENABLE_TIMER 0
#define MAX_LOG_INFO_LEN 256
#define MAX_VPN_ID_NUM 512
@@ -92,6 +93,9 @@ pthread_t count;
//timer thread
pthread_t timer;
int time_out[32] = {0}; //1:time out,push data
+int push_buffer_count[32] = {0};
+char *thread_push_buffer[32];
+#define PUSH_BUFFER_COUNT 10
struct traffic_info
{
@@ -582,7 +586,7 @@ static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo
}
}
-static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
+static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream, int thread_seq)
{
char protocol[8];
char vxlan_sip_ip_str[INET_ADDRSTRLEN];
@@ -632,7 +636,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
- "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -642,7 +646,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
#if ENABLE_GSJ_THREAD
- push_data_to_GSJ(info, strlen(info));
+ // push_data_to_GSJ(info, strlen(info));
#endif
break;
case ADDR_TYPE_IPV6:
@@ -652,7 +656,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
- "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -661,7 +665,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
#if ENABLE_GSJ_THREAD
- push_data_to_GSJ(info, strlen(info));
+ // push_data_to_GSJ(info, strlen(info));
#endif
break;
case ADDR_TYPE_ARP:
@@ -671,7 +675,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
- "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -679,12 +683,26 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
#if ENABLE_GSJ_THREAD
- push_data_to_GSJ(info, strlen(info));
+ // push_data_to_GSJ(info, strlen(info));
#endif
break;
default:
break;
}
+
+#if ENABLE_GSJ_THREAD
+ if(push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT){
+ memset(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq])-1, 0, 1);
+ push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq]));
+
+ push_buffer_count[thread_seq] = 0;
+ memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN);
+ }
+ else{
+ push_buffer_count[thread_seq]++;
+ memcpy(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq]), info, strlen(info));
+ }
+#endif
}
static int tcp_flow_id = -1;
char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
@@ -777,12 +795,12 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
// tinfo->stat_time = time(0);
gettimeofday(&tinfo->stat_time, NULL);
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
free(tinfo);
return DEFAULT_RETURN_VALUE;
}
-
+#if ENABLE_TIMER
/*每隔N秒发一次 */
if (pstream->pktstate == OP_STATE_DATA)
{
@@ -813,7 +831,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
push_count[thread_seq]++;
#endif
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
if (tcp_flow_id != -1)
{
@@ -826,7 +844,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
time_out[thread_seq] = 0;
}
}
-
+#endif
return DEFAULT_RETURN_VALUE;
error_drop:
@@ -973,7 +991,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
// tinfo->stat_time = time(0);
gettimeofday(&tinfo->stat_time, NULL);
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
free(tinfo);
// free(tinfo_new);
// free(time_old);
@@ -981,7 +999,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
// printf("\n");
return DEFAULT_RETURN_VALUE;
}
-
+#if ENABLE_TIMER
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
@@ -1019,7 +1037,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
push_count[thread_seq]++;
#endif
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
if (tcp_flow_id != -1)
{
@@ -1035,7 +1053,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
// free(time_new);
}
-
+#endif
return DEFAULT_RETURN_VALUE;
error_drop:
@@ -1214,7 +1232,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
// tinfo->stat_time = time(0);
gettimeofday(&tinfo->stat_time, NULL);
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
free(tinfo);
// free(tinfo_new);
// free(time_old);
@@ -1222,7 +1240,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
// printf("\n");
return DEFAULT_RETURN_VALUE;
}
-
+#if ENABLE_TIMER
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
@@ -1261,7 +1279,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
push_count[thread_seq]++;
#endif
- print_traffic_info(tinfo, pstream);
+ print_traffic_info(tinfo, pstream, thread_seq);
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
@@ -1277,7 +1295,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
// free(time_new);
}
-
+#endif
return DEFAULT_RETURN_VALUE;
error_drop:
@@ -1318,7 +1336,7 @@ void count_work()
all_pkt, close_pkt, all_push_count, all_error_count);
}
}
-
+#if ENABLE_TIMER
void timer_work()
{
while (1)
@@ -1327,14 +1345,14 @@ void timer_work()
int i = 0;
for (i; i < 32; i++)
{
- if (!time_out[i])
- {
- time_out[i] = 1;
- }
+ // if (!time_out[i])
+ // {
+ time_out[i] = 1;
+ // }
}
}
}
-
+#endif
int CHAR_INIT()
{
int demo_plugid = 51;
@@ -1447,6 +1465,7 @@ int CHAR_INIT()
}
#endif
+#if ENABLE_TIMER
int timer_thread;
timer_thread = pthread_create(&timer, NULL, (void *)timer_work, NULL);
if (timer_thread == 0)
@@ -1458,6 +1477,12 @@ int CHAR_INIT()
{
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start failed\n");
}
+#endif
+ //发送缓冲区
+ int i=0;
+ for(i; i<32; i++){
+ thread_push_buffer[i] = (char *)calloc(1,PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN*sizeof(char));
+ }
// 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n");
return demo_plugid;
@@ -1475,5 +1500,10 @@ void LRJ_APP_DESTROY()
}
MESA_destroy_runtime_log_handle(runtime_log_handler);
MESA_destroy_runtime_log_handle(kafka_log_handler);
+ //free thread_push_buffer
+ int i=0;
+ for(i; i<32; i++){
+ free(thread_push_buffer[i]);
+ }
printf("TEST_APP_DESTORY out...\n");
}