summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2020-08-05 10:36:35 +0800
committer李仁杰 <[email protected]>2020-08-05 10:36:35 +0800
commitb5449af4e1795eb0601bd9b22b881b9c3a095ccf (patch)
treec1461d8bfcba89333c5f5062daa152a9d067a6b1
parent89c5b1184efb62e0d390cdc83ff7f90bdc3cd1ac (diff)
Update lirenjie_vxlan_sapp_20200803.c 新建线程进行计时
-rw-r--r--lirenjie_vxlan_sapp_20200803.c257
1 files changed, 161 insertions, 96 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c
index e7c85b3..afcc1fe 100644
--- a/lirenjie_vxlan_sapp_20200803.c
+++ b/lirenjie_vxlan_sapp_20200803.c
@@ -31,7 +31,9 @@ extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action));
extern int g_business_plug_type;
-#define ENABLE_COUNT_THREAD 0 //是否启用统计功能.added by lrj at 20200803
+#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803
+#define ENABLE_GSJ_THREAD 1 //是否启用GSJ
+
#define MAX_LOG_INFO_LEN 256
#define MAX_VPN_ID_NUM 512
#define MAX_TRAFFIC_INFO_LEN 1024
@@ -84,7 +86,11 @@ static char *topic = "G_BACK_TRAFFIC_STATISTIC_new";
//GSJ thread
pthread_t GSJ_Work;
+//count thread
pthread_t count;
+//timer thread
+pthread_t timer;
+int time_out = 0; //1:time out,push data
struct traffic_info
{
@@ -634,7 +640,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
+#if ENABLE_GSJ_THREAD
push_data_to_GSJ(info, strlen(info));
+#endif
break;
case ADDR_TYPE_IPV6:
snprintf(info, MAX_TRAFFIC_INFO_LEN,
@@ -651,7 +659,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
+#if ENABLE_GSJ_THREAD
push_data_to_GSJ(info, strlen(info));
+#endif
break;
case ADDR_TYPE_ARP:
snprintf(info, MAX_TRAFFIC_INFO_LEN,
@@ -667,7 +677,9 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
+#if ENABLE_GSJ_THREAD
push_data_to_GSJ(info, strlen(info));
+#endif
break;
default:
break;
@@ -685,11 +697,13 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
- time_t *time_old;
- struct traffic_info *tinfo_new;
+ // time_t *time_old;
+ // struct traffic_info *tinfo_new;
+
#if ENABLE_COUNT_THREAD
all_stream_pkt_num[thread_seq]++;
#endif
+
if (-1 == tcp_flow_id)
{
tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
@@ -702,9 +716,9 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (pstream->opstate == OP_STATE_PENDING)
{
- *pme = (void *)calloc(3, sizeof(void *));
- time_old = (time_t *)calloc(1, sizeof(time_t));
- tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ // *pme = (void *)calloc(2, sizeof(void *));
+ // time_old = (time_t *)calloc(1, sizeof(time_t));
+ // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
@@ -742,16 +756,17 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/* 应用层协议类型 用目的端口表示*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
- *tinfo_new = *tinfo;
-
- ((void **)*pme)[0] = tinfo;
- ((void **)*pme)[1] = tinfo_new;
- time(time_old);
- ((void **)*pme)[2] = time_old;
+ // *tinfo_new = *tinfo;
+ *pme = tinfo;
+ // ((void **)*pme)[0] = tinfo;
+ // ((void **)*pme)[1] = tinfo_new;
+ // time(time_old);
+ // ((void **)*pme)[2] = time_old;
}
- tinfo = (struct traffic_info *)(((void **)*pme)[0]);
- tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
- time_old = (time_t *)(((void **)*pme)[2]);
+ tinfo = (struct traffic_info *)(*pme);
+ // tinfo = (struct traffic_info *)(((void **)*pme)[0]);
+ // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
+ // time_old = (time_t *)(((void **)*pme)[2]);
/* 自己统计包数字节数*/ /*
if(raw_pdetail->datalen > 0)
{
@@ -791,10 +806,10 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (tcp_flow_id != -1)
{
struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes;
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
}
else
{
@@ -816,9 +831,9 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
print_traffic_info(tinfo, pstream);
free(tinfo);
- free(tinfo_new);
- free(time_old);
- free(*pme);
+ // free(tinfo_new);
+ // free(time_old);
+ // free(*pme);
// printf("\n");
return DEFAULT_RETURN_VALUE;
}
@@ -826,48 +841,54 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
- time_t *time_new;
- time_new = (time_t *)calloc(1, sizeof(time_t));
- time(time_new);
- if (*time_new - *time_old >= 120)
+ // time_t *time_new;
+ // time_new = (time_t *)calloc(1, sizeof(time_t));
+ // time(time_new);
+ // if (*time_new - *time_old >= 120)
+ if (time_out)
{
if (tcp_flow_id != -1)
{
struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num;
- tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num;
- tinfo_new->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes;
- tinfo_new->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes;
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
}
else
{
- tinfo_new->C2S_pkt_num = 0;
- tinfo_new->S2C_pkt_num = 0;
- tinfo_new->C2S_bytes = 0;
- tinfo_new->S2C_bytes = 0;
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
}
/* layer_addr */
- tinfo_new->addr = pstream->addr;
+ tinfo->addr = pstream->addr;
+ // tinfo_new->addr = pstream->addr;
/* STAT_TIME */
// tinfo->stat_time = time(0);
- gettimeofday(&tinfo_new->stat_time, NULL);
+ // gettimeofday(&tinfo_new->stat_time, NULL);
+ gettimeofday(&tinfo->stat_time, NULL);
- print_traffic_info(tinfo_new, pstream);
+#if ENABLE_COUNT_THREAD
+ close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
+ close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
+ push_count[thread_seq]++;
+#endif
+ print_traffic_info(tinfo, pstream);
if (tcp_flow_id != -1)
{
struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt;
- tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt;
- tinfo_new->C2S_bytes = tflow->C2S_all_byte;
- tinfo_new->S2C_bytes = tflow->S2C_all_byte;
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
+ tinfo->C2S_bytes = tflow->C2S_all_byte;
+ tinfo->S2C_bytes = tflow->S2C_all_byte;
}
-
// free(time_new);
-
- time(time_old);
+ // time(time_old);
}
- free(time_new);
+ // free(time_new);
}
return DEFAULT_RETURN_VALUE;
@@ -875,11 +896,11 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
error_drop:
#if ENABLE_COUNT_THREAD
error_count[thread_seq]++;
-#endif
+#endif
free(tinfo);
- free(tinfo_new);
- free(time_old);
- free(*pme);
+ // free(tinfo_new);
+ // free(time_old);
+ // free(*pme);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
@@ -915,11 +936,13 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
- time_t *time_old;
- struct traffic_info *tinfo_new;
+ // time_t *time_old;
+ // struct traffic_info *tinfo_new;
+
#if ENABLE_COUNT_THREAD
all_stream_pkt_num[thread_seq]++;
#endif
+
if (is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0)
{ //add by lijia 20190604, drop BFD keepalive packet.
return APP_STATE_DROPME;
@@ -937,9 +960,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (pstream->opstate == OP_STATE_PENDING)
{
- *pme = (void *)calloc(3, sizeof(void *));
- time_old = (time_t *)calloc(1, sizeof(time_t));
- tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ // *pme = (void *)calloc(2, sizeof(void *));
+ // time_old = (time_t *)calloc(1, sizeof(time_t));
+ // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
@@ -978,16 +1001,17 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/* 应用层协议类型 用目的端口表示*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
- *tinfo_new = *tinfo;
-
- ((void **)*pme)[0] = tinfo;
- ((void **)*pme)[1] = tinfo_new;
- time(time_old);
- ((void **)*pme)[2] = time_old;
+ // *tinfo_new = *tinfo;
+ *pme = tinfo;
+ // ((void **)*pme)[0] = tinfo;
+ // ((void **)*pme)[1] = tinfo_new;
+ // time(time_old);
+ // ((void **)*pme)[2] = time_old;
}
- tinfo = (struct traffic_info *)(((void **)*pme)[0]);
- tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
- time_old = (time_t *)(((void **)*pme)[2]);
+ tinfo = (struct traffic_info *)(*pme);
+ // tinfo = (struct traffic_info *)(((void **)*pme)[0]);
+ // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
+ // time_old = (time_t *)(((void **)*pme)[2]);
/* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
// if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
@@ -1014,10 +1038,10 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes;
+ tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_byte - tinfo->S2C_bytes;
}
else
{
@@ -1047,9 +1071,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
print_traffic_info(tinfo, pstream);
free(tinfo);
- free(tinfo_new);
- free(time_old);
- free(*pme);
+ // free(tinfo_new);
+ // free(time_old);
+ // free(*pme);
// printf("\n");
return DEFAULT_RETURN_VALUE;
}
@@ -1057,47 +1081,57 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
- time_t *time_new;
- time_new = (time_t *)calloc(1, sizeof(time_t));
- time(time_new);
- if (*time_new - *time_old >= 120)
+ // time_t *time_new;
+ // time_new = (time_t *)calloc(1, sizeof(time_t));
+ // time(time_new);
+ // if (*time_new - *time_old >= 120)
+ if (time_out)
{
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
- tinfo_new->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num;
- tinfo_new->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num;
- tinfo_new->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes;
- tinfo_new->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes;
+ tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_byte - tinfo->S2C_bytes;
}
else
{
- tinfo_new->C2S_pkt_num = 0;
- tinfo_new->S2C_pkt_num = 0;
- tinfo_new->C2S_bytes = 0;
- tinfo_new->S2C_bytes = 0;
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
}
/* layer_addr */
- tinfo_new->addr = pstream->addr;
+ tinfo->addr = pstream->addr;
+ // tinfo_new->addr = pstream->addr;
/* STAT_TIME */
// tinfo->stat_time = time(0);
- gettimeofday(&tinfo_new->stat_time, NULL);
+ // gettimeofday(&tinfo_new->stat_time, NULL);
+ gettimeofday(&tinfo->stat_time, NULL);
+
+#if ENABLE_COUNT_THREAD
+ close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
+ close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
+ push_count[thread_seq]++;
+#endif
- print_traffic_info(tinfo_new, pstream);
+ print_traffic_info(tinfo, pstream);
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
- tinfo_new->C2S_pkt_num = tflow->C2S_pkt;
- tinfo_new->S2C_pkt_num = tflow->S2C_pkt;
- tinfo_new->C2S_bytes = tflow->C2S_byte;
- tinfo_new->S2C_bytes = tflow->S2C_byte;
+ tinfo->C2S_pkt_num = tflow->C2S_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt;
+ tinfo->C2S_bytes = tflow->C2S_byte;
+ tinfo->S2C_bytes = tflow->S2C_byte;
}
// free(time_new);
- time(time_old);
+ // time(time_old);
}
- free(time_new);
+
+ // free(time_new);
}
return DEFAULT_RETURN_VALUE;
@@ -1107,9 +1141,9 @@ error_drop:
error_count[thread_seq]++;
#endif
free(tinfo);
- free(tinfo_new);
- free(time_old);
- free(*pme);
+ // free(tinfo_new);
+ // free(time_old);
+ // free(*pme);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
@@ -1122,7 +1156,7 @@ void count_work()
{
while (1)
{
- sleep(60);
+ sleep(180);
int i = 0;
UINT64 all_pkt = 0;
UINT64 close_pkt = 0;
@@ -1141,6 +1175,22 @@ void count_work()
}
}
+void timer_work()
+{
+ while (1)
+ {
+ sleep(120);
+ if (!time_out)
+ {
+ time_out = 1;
+ }
+ else
+ {
+ time_out = 0;
+ }
+ }
+}
+
int CHAR_INIT()
{
int demo_plugid = 51;
@@ -1224,7 +1274,7 @@ int CHAR_INIT()
// MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
// return -1;
// }
-
+#if ENABLE_GSJ_THREAD
//GSJ Work thread start
int ret_thread;
ret_thread = pthread_create(&GSJ_Work, NULL, (void *)Work, NULL);
@@ -1237,6 +1287,8 @@ int CHAR_INIT()
{
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start failed\n");
}
+#endif
+
#if ENABLE_COUNT_THREAD
int count_thread;
count_thread = pthread_create(&count, NULL, (void *)count_work, NULL);
@@ -1250,6 +1302,19 @@ int CHAR_INIT()
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "count thread start failed\n");
}
#endif
+
+ int timer_thread;
+ timer_thread = pthread_create(&timer, NULL, (void *)timer_work, NULL);
+ if (timer_thread == 0)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start success\n");
+ pthread_detach(timer);
+ }
+ else
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "timer thread start failed\n");
+ }
+
// 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n");
return demo_plugid;
}