diff options
| author | 李仁杰 <[email protected]> | 2020-08-13 19:20:12 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2020-08-13 19:20:12 +0800 |
| commit | 7946812e0e4d1d307fe8df707e0875154504d11d (patch) | |
| tree | 002eca4799782b6687c93e3f676ec73c9f857eb0 | |
| parent | 5307549fa36657b88fa9e52f655d805b9922fb9d (diff) | |
Update lirenjie_vxlan_sapp_20200803.c 20200813,合并PUSH_BUFFER_COUNT个消息
| -rw-r--r-- | lirenjie_vxlan_sapp_20200803.c | 84 |
1 files changed, 57 insertions, 27 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c index 57270fa..82976d7 100644 --- a/lirenjie_vxlan_sapp_20200803.c +++ b/lirenjie_vxlan_sapp_20200803.c @@ -19,8 +19,8 @@ #include "stream.h" #include "rdkafka.h" #include "libGSJ.h" -//#include "cJSON.h" - +#include "cJSON.h" +#include "MESA_list_queue.h" int version_20190827_1605; static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT); @@ -34,6 +34,7 @@ extern int g_business_plug_type; #define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803 #define ENABLE_GSJ_THREAD 1 //是否启用GSJ +#define ENABLE_TIMER 0 #define MAX_LOG_INFO_LEN 256 #define MAX_VPN_ID_NUM 512 @@ -92,6 +93,9 @@ pthread_t count; //timer thread pthread_t timer; int time_out[32] = {0}; //1:time out,push data +int push_buffer_count[32] = {0}; +char *thread_push_buffer[32]; +#define PUSH_BUFFER_COUNT 10 struct traffic_info { @@ -582,7 +586,7 @@ static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo } } -static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) +static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream, int thread_seq) { char protocol[8]; char vxlan_sip_ip_str[INET_ADDRSTRLEN]; @@ -632,7 +636,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u," "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," - "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -642,7 +646,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); #if ENABLE_GSJ_THREAD - push_data_to_GSJ(info, strlen(info)); + // push_data_to_GSJ(info, strlen(info)); #endif break; case ADDR_TYPE_IPV6: @@ -652,7 +656,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d," - "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -661,7 +665,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); #if ENABLE_GSJ_THREAD - push_data_to_GSJ(info, strlen(info)); + // push_data_to_GSJ(info, strlen(info)); #endif break; case ADDR_TYPE_ARP: @@ -671,7 +675,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," - "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -679,12 +683,26 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); #if ENABLE_GSJ_THREAD - push_data_to_GSJ(info, strlen(info)); + // push_data_to_GSJ(info, strlen(info)); #endif break; default: break; } + +#if ENABLE_GSJ_THREAD + if(push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT){ + memset(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq])-1, 0, 1); + push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq])); + + push_buffer_count[thread_seq] = 0; + memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN); + } + else{ + push_buffer_count[thread_seq]++; + memcpy(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq]), info, strlen(info)); + } +#endif } static int tcp_flow_id = -1; char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) @@ -777,12 +795,12 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const // tinfo->stat_time = time(0); gettimeofday(&tinfo->stat_time, NULL); - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); free(tinfo); return DEFAULT_RETURN_VALUE; } - +#if ENABLE_TIMER /*每隔N秒发一次 */ if (pstream->pktstate == OP_STATE_DATA) { @@ -813,7 +831,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; push_count[thread_seq]++; #endif - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); if (tcp_flow_id != -1) { @@ -826,7 +844,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const time_out[thread_seq] = 0; } } - +#endif return DEFAULT_RETURN_VALUE; error_drop: @@ -973,7 +991,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi // tinfo->stat_time = time(0); gettimeofday(&tinfo->stat_time, NULL); - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); free(tinfo); // free(tinfo_new); // free(time_old); @@ -981,7 +999,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi // printf("\n"); return DEFAULT_RETURN_VALUE; } - +#if ENABLE_TIMER /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { @@ -1019,7 +1037,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; push_count[thread_seq]++; #endif - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); if (tcp_flow_id != -1) { @@ -1035,7 +1053,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } // free(time_new); } - +#endif return DEFAULT_RETURN_VALUE; error_drop: @@ -1214,7 +1232,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi // tinfo->stat_time = time(0); gettimeofday(&tinfo->stat_time, NULL); - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); free(tinfo); // free(tinfo_new); // free(time_old); @@ -1222,7 +1240,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi // printf("\n"); return DEFAULT_RETURN_VALUE; } - +#if ENABLE_TIMER /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { @@ -1261,7 +1279,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi push_count[thread_seq]++; #endif - print_traffic_info(tinfo, pstream); + print_traffic_info(tinfo, pstream, thread_seq); if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); @@ -1277,7 +1295,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi // free(time_new); } - +#endif return DEFAULT_RETURN_VALUE; error_drop: @@ -1318,7 +1336,7 @@ void count_work() all_pkt, close_pkt, all_push_count, all_error_count); } } - +#if ENABLE_TIMER void timer_work() { while (1) @@ -1327,14 +1345,14 @@ void timer_work() int i = 0; for (i; i < 32; i++) { - if (!time_out[i]) - { - time_out[i] = 1; - } + // if (!time_out[i]) + // { + time_out[i] = 1; + // } } } } - +#endif int CHAR_INIT() { int demo_plugid = 51; @@ -1447,6 +1465,7 @@ int CHAR_INIT() } #endif +#if ENABLE_TIMER int timer_thread; timer_thread = pthread_create(&timer, NULL, (void *)timer_work, NULL); if (timer_thread == 0) @@ -1458,6 +1477,12 @@ int CHAR_INIT() { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start failed\n"); } +#endif + //发送缓冲区 + int i=0; + for(i; i<32; i++){ + thread_push_buffer[i] = (char *)calloc(1,PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN*sizeof(char)); + } // 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n"); return demo_plugid; @@ -1475,5 +1500,10 @@ void LRJ_APP_DESTROY() } MESA_destroy_runtime_log_handle(runtime_log_handler); MESA_destroy_runtime_log_handle(kafka_log_handler); + //free thread_push_buffer + int i=0; + for(i; i<32; i++){ + free(thread_push_buffer[i]); + } printf("TEST_APP_DESTORY out...\n"); } |
