diff options
| author | 李仁杰 <[email protected]> | 2020-08-05 10:36:35 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2020-08-05 10:36:35 +0800 |
| commit | b5449af4e1795eb0601bd9b22b881b9c3a095ccf (patch) | |
| tree | c1461d8bfcba89333c5f5062daa152a9d067a6b1 | |
| parent | 89c5b1184efb62e0d390cdc83ff7f90bdc3cd1ac (diff) | |
Update lirenjie_vxlan_sapp_20200803.c 新建线程进行计时
| -rw-r--r-- | lirenjie_vxlan_sapp_20200803.c | 257 |
1 files changed, 161 insertions, 96 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c index e7c85b3..afcc1fe 100644 --- a/lirenjie_vxlan_sapp_20200803.c +++ b/lirenjie_vxlan_sapp_20200803.c @@ -31,7 +31,9 @@ extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order); extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action)); extern int g_business_plug_type; -#define ENABLE_COUNT_THREAD 0 //是否启用统计功能.added by lrj at 20200803 +#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803 +#define ENABLE_GSJ_THREAD 1 //是否启用GSJ + #define MAX_LOG_INFO_LEN 256 #define MAX_VPN_ID_NUM 512 #define MAX_TRAFFIC_INFO_LEN 1024 @@ -84,7 +86,11 @@ static char *topic = "G_BACK_TRAFFIC_STATISTIC_new"; //GSJ thread pthread_t GSJ_Work; +//count thread pthread_t count; +//timer thread +pthread_t timer; +int time_out = 0; //1:time out,push data struct traffic_info { @@ -634,7 +640,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); +#if ENABLE_GSJ_THREAD push_data_to_GSJ(info, strlen(info)); +#endif break; case ADDR_TYPE_IPV6: snprintf(info, MAX_TRAFFIC_INFO_LEN, @@ -651,7 +659,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); +#if ENABLE_GSJ_THREAD push_data_to_GSJ(info, strlen(info)); +#endif break; case ADDR_TYPE_ARP: snprintf(info, MAX_TRAFFIC_INFO_LEN, @@ -667,7 +677,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); +#if ENABLE_GSJ_THREAD push_data_to_GSJ(info, strlen(info)); +#endif break; default: break; @@ -685,11 +697,13 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail; struct traffic_info *tinfo; - time_t *time_old; - struct traffic_info *tinfo_new; + // time_t *time_old; + // struct traffic_info *tinfo_new; + #if ENABLE_COUNT_THREAD all_stream_pkt_num[thread_seq]++; #endif + if (-1 == tcp_flow_id) { tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); @@ -702,9 +716,9 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (pstream->opstate == OP_STATE_PENDING) { - *pme = (void *)calloc(3, sizeof(void *)); - time_old = (time_t *)calloc(1, sizeof(time_t)); - tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + // *pme = (void *)calloc(2, sizeof(void *)); + // time_old = (time_t *)calloc(1, sizeof(time_t)); + // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); @@ -742,16 +756,17 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /* 应用层协议类型 用目的端口表示*/ tinfo->PROTO_TYPE = get_proto_type(pstream); - *tinfo_new = *tinfo; - - ((void **)*pme)[0] = tinfo; - ((void **)*pme)[1] = tinfo_new; - time(time_old); - ((void **)*pme)[2] = time_old; + // *tinfo_new = *tinfo; + *pme = tinfo; + // ((void **)*pme)[0] = tinfo; + // ((void **)*pme)[1] = tinfo_new; + // time(time_old); + // ((void **)*pme)[2] = time_old; } - tinfo = (struct traffic_info *)(((void **)*pme)[0]); - tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); - time_old = (time_t *)(((void **)*pme)[2]); + tinfo = (struct traffic_info *)(*pme); + // tinfo = (struct traffic_info *)(((void **)*pme)[0]); + // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); + // time_old = (time_t *)(((void **)*pme)[2]); /* 自己统计包数字节数*/ /* if(raw_pdetail->datalen > 0) { @@ -791,10 +806,10 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (tcp_flow_id != -1) { struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes; + tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; } else { @@ -816,9 +831,9 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi print_traffic_info(tinfo, pstream); free(tinfo); - free(tinfo_new); - free(time_old); - free(*pme); + // free(tinfo_new); + // free(time_old); + // free(*pme); // printf("\n"); return DEFAULT_RETURN_VALUE; } @@ -826,48 +841,54 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { - time_t *time_new; - time_new = (time_t *)calloc(1, sizeof(time_t)); - time(time_new); - if (*time_new - *time_old >= 120) + // time_t *time_new; + // time_new = (time_t *)calloc(1, sizeof(time_t)); + // time(time_new); + // if (*time_new - *time_old >= 120) + if (time_out) { if (tcp_flow_id != -1) { struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num; - tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num; - tinfo_new->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes; - tinfo_new->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes; + tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; } else { - tinfo_new->C2S_pkt_num = 0; - tinfo_new->S2C_pkt_num = 0; - tinfo_new->C2S_bytes = 0; - tinfo_new->S2C_bytes = 0; + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; } /* layer_addr */ - tinfo_new->addr = pstream->addr; + tinfo->addr = pstream->addr; + // tinfo_new->addr = pstream->addr; /* STAT_TIME */ // tinfo->stat_time = time(0); - gettimeofday(&tinfo_new->stat_time, NULL); + // gettimeofday(&tinfo_new->stat_time, NULL); + gettimeofday(&tinfo->stat_time, NULL); - print_traffic_info(tinfo_new, pstream); +#if ENABLE_COUNT_THREAD + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; + push_count[thread_seq]++; +#endif + print_traffic_info(tinfo, pstream); if (tcp_flow_id != -1) { struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt; - tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt; - tinfo_new->C2S_bytes = tflow->C2S_all_byte; - tinfo_new->S2C_bytes = tflow->S2C_all_byte; + tinfo->C2S_pkt_num = tflow->C2S_all_pkt; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt; + tinfo->C2S_bytes = tflow->C2S_all_byte; + tinfo->S2C_bytes = tflow->S2C_all_byte; } - // free(time_new); - - time(time_old); + // time(time_old); } - free(time_new); + // free(time_new); } return DEFAULT_RETURN_VALUE; @@ -875,11 +896,11 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi error_drop: #if ENABLE_COUNT_THREAD error_count[thread_seq]++; -#endif +#endif free(tinfo); - free(tinfo_new); - free(time_old); - free(*pme); + // free(tinfo_new); + // free(time_old); + // free(*pme); return APP_STATE_DROPME | APP_STATE_DROPPKT; } @@ -915,11 +936,13 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail; struct traffic_info *tinfo; - time_t *time_old; - struct traffic_info *tinfo_new; + // time_t *time_old; + // struct traffic_info *tinfo_new; + #if ENABLE_COUNT_THREAD all_stream_pkt_num[thread_seq]++; #endif + if (is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0) { //add by lijia 20190604, drop BFD keepalive packet. return APP_STATE_DROPME; @@ -937,9 +960,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (pstream->opstate == OP_STATE_PENDING) { - *pme = (void *)calloc(3, sizeof(void *)); - time_old = (time_t *)calloc(1, sizeof(time_t)); - tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + // *pme = (void *)calloc(2, sizeof(void *)); + // time_old = (time_t *)calloc(1, sizeof(time_t)); + // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); @@ -978,16 +1001,17 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /* 应用层协议类型 用目的端口表示*/ tinfo->PROTO_TYPE = get_proto_type(pstream); - *tinfo_new = *tinfo; - - ((void **)*pme)[0] = tinfo; - ((void **)*pme)[1] = tinfo_new; - time(time_old); - ((void **)*pme)[2] = time_old; + // *tinfo_new = *tinfo; + *pme = tinfo; + // ((void **)*pme)[0] = tinfo; + // ((void **)*pme)[1] = tinfo_new; + // time(time_old); + // ((void **)*pme)[2] = time_old; } - tinfo = (struct traffic_info *)(((void **)*pme)[0]); - tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); - time_old = (time_t *)(((void **)*pme)[2]); + tinfo = (struct traffic_info *)(*pme); + // tinfo = (struct traffic_info *)(((void **)*pme)[0]); + // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); + // time_old = (time_t *)(((void **)*pme)[2]); /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) @@ -1014,10 +1038,10 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes; + tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_byte - tinfo->S2C_bytes; } else { @@ -1047,9 +1071,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi print_traffic_info(tinfo, pstream); free(tinfo); - free(tinfo_new); - free(time_old); - free(*pme); + // free(tinfo_new); + // free(time_old); + // free(*pme); // printf("\n"); return DEFAULT_RETURN_VALUE; } @@ -1057,47 +1081,57 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { - time_t *time_new; - time_new = (time_t *)calloc(1, sizeof(time_t)); - time(time_new); - if (*time_new - *time_old >= 120) + // time_t *time_new; + // time_new = (time_t *)calloc(1, sizeof(time_t)); + // time(time_new); + // if (*time_new - *time_old >= 120) + if (time_out) { if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); - tinfo_new->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num; - tinfo_new->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num; - tinfo_new->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes; - tinfo_new->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes; + tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_byte - tinfo->S2C_bytes; } else { - tinfo_new->C2S_pkt_num = 0; - tinfo_new->S2C_pkt_num = 0; - tinfo_new->C2S_bytes = 0; - tinfo_new->S2C_bytes = 0; + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; } /* layer_addr */ - tinfo_new->addr = pstream->addr; + tinfo->addr = pstream->addr; + // tinfo_new->addr = pstream->addr; /* STAT_TIME */ // tinfo->stat_time = time(0); - gettimeofday(&tinfo_new->stat_time, NULL); + // gettimeofday(&tinfo_new->stat_time, NULL); + gettimeofday(&tinfo->stat_time, NULL); + +#if ENABLE_COUNT_THREAD + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; + push_count[thread_seq]++; +#endif - print_traffic_info(tinfo_new, pstream); + print_traffic_info(tinfo, pstream); if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); - tinfo_new->C2S_pkt_num = tflow->C2S_pkt; - tinfo_new->S2C_pkt_num = tflow->S2C_pkt; - tinfo_new->C2S_bytes = tflow->C2S_byte; - tinfo_new->S2C_bytes = tflow->S2C_byte; + tinfo->C2S_pkt_num = tflow->C2S_pkt; + tinfo->S2C_pkt_num = tflow->S2C_pkt; + tinfo->C2S_bytes = tflow->C2S_byte; + tinfo->S2C_bytes = tflow->S2C_byte; } // free(time_new); - time(time_old); + // time(time_old); } - free(time_new); + + // free(time_new); } return DEFAULT_RETURN_VALUE; @@ -1107,9 +1141,9 @@ error_drop: error_count[thread_seq]++; #endif free(tinfo); - free(tinfo_new); - free(time_old); - free(*pme); + // free(tinfo_new); + // free(time_old); + // free(*pme); return APP_STATE_DROPME | APP_STATE_DROPPKT; } @@ -1122,7 +1156,7 @@ void count_work() { while (1) { - sleep(60); + sleep(180); int i = 0; UINT64 all_pkt = 0; UINT64 close_pkt = 0; @@ -1141,6 +1175,22 @@ void count_work() } } +void timer_work() +{ + while (1) + { + sleep(120); + if (!time_out) + { + time_out = 1; + } + else + { + time_out = 0; + } + } +} + int CHAR_INIT() { int demo_plugid = 51; @@ -1224,7 +1274,7 @@ int CHAR_INIT() // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); // return -1; // } - +#if ENABLE_GSJ_THREAD //GSJ Work thread start int ret_thread; ret_thread = pthread_create(&GSJ_Work, NULL, (void *)Work, NULL); @@ -1237,6 +1287,8 @@ int CHAR_INIT() { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start failed\n"); } +#endif + #if ENABLE_COUNT_THREAD int count_thread; count_thread = pthread_create(&count, NULL, (void *)count_work, NULL); @@ -1250,6 +1302,19 @@ int CHAR_INIT() MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "count thread start failed\n"); } #endif + + int timer_thread; + timer_thread = pthread_create(&timer, NULL, (void *)timer_work, NULL); + if (timer_thread == 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start success\n"); + pthread_detach(timer); + } + else + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start failed\n"); + } + // 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n"); return demo_plugid; } |
