From 5361249e7e11bb240894e55d0c1d1205abc564e0 Mon Sep 17 00:00:00 2001 From: 李仁杰 Date: Wed, 9 Sep 2020 11:08:34 +0800 Subject: Update lirenjie_vxlan_sapp_20200803.c 修改为单包统计二元组信息,修复了二元组中有乱码的情况(考虑了hash表的key长度)。 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lirenjie_vxlan_sapp_20200803.c | 140 ++++++++++++++++++++++++++--------------- 1 file changed, 88 insertions(+), 52 deletions(-) diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c index fbea892..1f843d9 100644 --- a/lirenjie_vxlan_sapp_20200803.c +++ b/lirenjie_vxlan_sapp_20200803.c @@ -54,7 +54,7 @@ static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop #define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */ -static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认�?*/ +static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认为0*/ UINT64 all_stream_pkt_num[64] = {0}; UINT64 close_stream_pkt_num[64] = {0}; @@ -161,6 +161,13 @@ static int kafka_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opa return 1; } +static int32_t partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque) +{ + int low = 1; + int high = partition_cnt - 1; + return (low + (rand() % ((high - low) + 1))); +} + static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, char *topic_sum) { char tmp[16]; @@ -179,15 +186,20 @@ static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "api.version.request", "false", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "broker.version.fallback", "0.9.0.1", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); - rd_kafka_conf_set(conf, "queue.buffering.max.messages", "2000000", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5000000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); /*topic configuration*/ topic_conf_traffic_stat = rd_kafka_topic_conf_new(); topic_conf_sum = rd_kafka_topic_conf_new(); + rd_kafka_topic_conf_set_partitioner_cb(topic_conf_traffic_stat, partitioner); + rd_kafka_topic_conf_set_partitioner_cb(topic_conf_sum, partitioner); + kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); if (kafka_producer == NULL) { @@ -241,7 +253,8 @@ static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic rd_kafka_topic_name(topic), partition, rd_kafka_err2str(rd_kafka_last_error())); /* Poll to handle delivery reports */ - rd_kafka_poll(kafka_producer, 0); + rd_kafka_poll(kafka_producer, 1000); + // exit(-1); return PUSH_DATA_FAILED; } /*fprintf(stderr, "%% Sent %zd bytes to topic " @@ -249,7 +262,7 @@ static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic buf_len, rd_kafka_topic_name(rkt), partition);*/ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i", buf_len, rd_kafka_topic_name(topic), partition); - // rd_kafka_poll(kafka_producer, 0); + rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_SUCCESS; } @@ -615,6 +628,8 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps char protocol[8]; char vxlan_sip_ip_str[INET_ADDRSTRLEN]; char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + memset(vxlan_sip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN); + memset(vxlan_dip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN); switch (tinfo->protocol) { @@ -634,7 +649,8 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps sprintf(protocol, "%s", "others"); break; } - char info[MAX_TRAFFIC_INFO_LEN] = {0}; + char info[MAX_TRAFFIC_INFO_LEN]; + memset(info, 0, sizeof(char) * MAX_TRAFFIC_INFO_LEN); inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); @@ -717,6 +733,9 @@ int ip_mod(struct traffic_info *tinfo) { char vxlan_sip_ip_str[INET_ADDRSTRLEN]; char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + memset(vxlan_sip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN); + memset(vxlan_dip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN); + inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); @@ -726,7 +745,7 @@ int ip_mod(struct traffic_info *tinfo) { return 1; } - if (d % 16 == 1) + if (d % 32 == 31) { return 1; } @@ -735,7 +754,7 @@ int ip_mod(struct traffic_info *tinfo) { return 1; } - if (d % 16 == 1) + if (d % 32 == 31) { return 1; } @@ -753,8 +772,8 @@ long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *u if (cnt == NULL) { cnt = (UINT64 *)calloc(1, sizeof(UINT64)); - // *cnt = 1; - *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; + *cnt = 1; + // *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; if (MESA_htable_add(vlanid_ip_htable_handle, key, size, cnt) >= 0) { return 1; @@ -766,31 +785,39 @@ long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *u } else { - // *cnt = *cnt + 1; - *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; + *cnt = *cnt + 1; + // *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; } return 1; } void vlanid_ip_stat(struct traffic_info *tinfo) { - uchar vlanid_ip_key[21]; - /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key - FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key - */ - memset(vlanid_ip_key, 0, sizeof(char) * 21); - char vxlan_sip_ip_str[INET_ADDRSTRLEN]; - char vxlan_dip_ip_str[INET_ADDRSTRLEN]; - inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); - inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); + uchar *vxlanid_ip_key; + char *vxlan_sip_ip_str; + char *vxlan_dip_ip_str; + vxlanid_ip_key = calloc(1, sizeof(uchar) * 21); + vxlan_sip_ip_str = calloc(1, sizeof(char) * INET_ADDRSTRLEN); + vxlan_dip_ip_str = calloc(1, sizeof(char) * INET_ADDRSTRLEN); + + if (inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN) == NULL || + inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN) == NULL) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "inet_ntop error: sip:%u, dip:%u\n", + tinfo->vx_ip_header_src_ip_net, tinfo->vx_ip_header_dst_ip_net); + return; + } + /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key + FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key + */ if (flow_type == (unsigned int)FLOW_TYPE_REFLUX) { - snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str); + snprintf(vxlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str); } else if (flow_type == (unsigned int)FLOW_TYPE_INJECT) { - snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str); + snprintf(vxlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str); } else { @@ -799,21 +826,24 @@ void vlanid_ip_stat(struct traffic_info *tinfo) } hash_cb_fun_t *search_cb = vlanid_ip_htable_search_cb; long search_cb_ret = 0; - MESA_htable_search_cb(vlanid_ip_htable_handle, vlanid_ip_key, strlen(vlanid_ip_key), search_cb, tinfo, &search_cb_ret); + MESA_htable_search_cb(vlanid_ip_htable_handle, vxlanid_ip_key, strlen(vxlanid_ip_key), search_cb, tinfo, &search_cb_ret); if (search_cb_ret == 0) { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_search_cb failed\n"); } + free(vxlanid_ip_key); + free(vxlan_sip_ip_str); + free(vxlan_dip_ip_str); } static int tcp_flow_id = -1; char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq) { - // if (!ip_mod(tinfo)) - // { - // free(tinfo); - // return DEFAULT_RETURN_VALUE; - // } + if (!ip_mod(tinfo)) + { + free(tinfo); + return DEFAULT_RETURN_VALUE; + } if (tcp_flow_id != -1) { struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); @@ -833,12 +863,12 @@ char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, i close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; #endif - vlanid_ip_stat(tinfo); - if (!ip_mod(tinfo)) - { - free(tinfo); - return DEFAULT_RETURN_VALUE; - } + // vlanid_ip_stat(tinfo); + // if (!ip_mod(tinfo)) + // { + // free(tinfo); + // return DEFAULT_RETURN_VALUE; + // } #if ENABLE_COUNT_THREAD push_count[thread_seq]++; @@ -974,7 +1004,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const } tinfo = (struct traffic_info *)(*pme); - // vlanid_ip_stat(tinfo); + vlanid_ip_stat(tinfo); if (pstream->pktstate == OP_STATE_CLOSE) { @@ -1061,7 +1091,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } tinfo = (struct traffic_info *)(*pme); - // vlanid_ip_stat(tinfo); + vlanid_ip_stat(tinfo); if (pstream->opstate == OP_STATE_CLOSE) { @@ -1182,15 +1212,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } tinfo = (struct traffic_info *)(*pme); - // vlanid_ip_stat(tinfo); + vlanid_ip_stat(tinfo); if (pstream->opstate == OP_STATE_CLOSE) { - // if (!ip_mod(tinfo)) - // { - // free(tinfo); - // return DEFAULT_RETURN_VALUE; - // } + if (!ip_mod(tinfo)) + { + free(tinfo); + return DEFAULT_RETURN_VALUE; + } if (pdetail != NULL) { if (udp_flow_id != -1) @@ -1220,12 +1250,12 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; #endif - vlanid_ip_stat(tinfo); - if (!ip_mod(tinfo)) - { - free(tinfo); - return DEFAULT_RETURN_VALUE; - } + // vlanid_ip_stat(tinfo); + // if (!ip_mod(tinfo)) + // { + // free(tinfo); + // return DEFAULT_RETURN_VALUE; + // } #if ENABLE_COUNT_THREAD push_count[thread_seq]++; @@ -1363,19 +1393,25 @@ void timer_work() void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *user) { - char info[64] = {0}; + char vxlan_ip_key[21] = {0}; + memcpy(vxlan_ip_key, key, size); + char *info; + info = calloc(1, sizeof(char) * 64); + if (flow_type == FLOW_TYPE_REFLUX) { - snprintf(info, 64, "hlsum#%s#%llu", key, *(UINT64 *)data); + snprintf(info, 64, "hlsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data); } else { - snprintf(info, 64, "hzsum#%s#%llu", key, *(UINT64 *)data); + snprintf(info, 64, "hzsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data); } push_data_to_kafka(info, strlen(info), rkt_sum); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info); vlanid_ip_pkt_num += *(UINT64 *)data; vlanid_ip_push_count++; - // *(UINT64 *)data = 0; + *(UINT64 *)data = 0; + free(info); } void vlanid_ip_thread_work() -- cgit v1.2.3