summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2020-09-09 11:08:34 +0800
committer李仁杰 <[email protected]>2020-09-09 11:08:34 +0800
commit5361249e7e11bb240894e55d0c1d1205abc564e0 (patch)
tree1c8122fdb20375e4533c10dfc617ab8fefe537ac
parent2d45f7205f9ae06a3fae47c2c4d7c0e99d5d4821 (diff)
Update lirenjie_vxlan_sapp_20200803.c 修改为单包统计二元组信息,修复了二元组中有乱码的情况(考虑了hash表的key长度)。
-rw-r--r--lirenjie_vxlan_sapp_20200803.c140
1 files changed, 88 insertions, 52 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c
index fbea892..1f843d9 100644
--- a/lirenjie_vxlan_sapp_20200803.c
+++ b/lirenjie_vxlan_sapp_20200803.c
@@ -54,7 +54,7 @@ static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop
#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */
-static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认�?*/
+static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认为0*/
UINT64 all_stream_pkt_num[64] = {0};
UINT64 close_stream_pkt_num[64] = {0};
@@ -161,6 +161,13 @@ static int kafka_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opa
return 1;
}
+static int32_t partitioner(const rd_kafka_topic_t *rkt, const void *keydata, size_t keylen, int32_t partition_cnt, void *rkt_opaque, void *msg_opaque)
+{
+ int low = 1;
+ int high = partition_cnt - 1;
+ return (low + (rand() % ((high - low) + 1)));
+}
+
static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, char *topic_sum)
{
char tmp[16];
@@ -179,15 +186,20 @@ static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat,
rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+ rd_kafka_conf_set(conf, "api.version.request", "false", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "broker.version.fallback", "0.9.0.1", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
- rd_kafka_conf_set(conf, "queue.buffering.max.messages", "2000000", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5000000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
/*topic configuration*/
topic_conf_traffic_stat = rd_kafka_topic_conf_new();
topic_conf_sum = rd_kafka_topic_conf_new();
+ rd_kafka_topic_conf_set_partitioner_cb(topic_conf_traffic_stat, partitioner);
+ rd_kafka_topic_conf_set_partitioner_cb(topic_conf_sum, partitioner);
+
kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
if (kafka_producer == NULL)
{
@@ -241,7 +253,8 @@ static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic
rd_kafka_topic_name(topic), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
- rd_kafka_poll(kafka_producer, 0);
+ rd_kafka_poll(kafka_producer, 1000);
+ // exit(-1);
return PUSH_DATA_FAILED;
}
/*fprintf(stderr, "%% Sent %zd bytes to topic "
@@ -249,7 +262,7 @@ static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic
buf_len, rd_kafka_topic_name(rkt), partition);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i",
buf_len, rd_kafka_topic_name(topic), partition);
- // rd_kafka_poll(kafka_producer, 0);
+ rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_SUCCESS;
}
@@ -615,6 +628,8 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
char protocol[8];
char vxlan_sip_ip_str[INET_ADDRSTRLEN];
char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+ memset(vxlan_sip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN);
+ memset(vxlan_dip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN);
switch (tinfo->protocol)
{
@@ -634,7 +649,8 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
sprintf(protocol, "%s", "others");
break;
}
- char info[MAX_TRAFFIC_INFO_LEN] = {0};
+ char info[MAX_TRAFFIC_INFO_LEN];
+ memset(info, 0, sizeof(char) * MAX_TRAFFIC_INFO_LEN);
inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
@@ -717,6 +733,9 @@ int ip_mod(struct traffic_info *tinfo)
{
char vxlan_sip_ip_str[INET_ADDRSTRLEN];
char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+ memset(vxlan_sip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN);
+ memset(vxlan_dip_ip_str, 0, sizeof(char) * INET_ADDRSTRLEN);
+
inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
@@ -726,7 +745,7 @@ int ip_mod(struct traffic_info *tinfo)
{
return 1;
}
- if (d % 16 == 1)
+ if (d % 32 == 31)
{
return 1;
}
@@ -735,7 +754,7 @@ int ip_mod(struct traffic_info *tinfo)
{
return 1;
}
- if (d % 16 == 1)
+ if (d % 32 == 31)
{
return 1;
}
@@ -753,8 +772,8 @@ long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *u
if (cnt == NULL)
{
cnt = (UINT64 *)calloc(1, sizeof(UINT64));
- // *cnt = 1;
- *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
+ *cnt = 1;
+ // *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
if (MESA_htable_add(vlanid_ip_htable_handle, key, size, cnt) >= 0)
{
return 1;
@@ -766,31 +785,39 @@ long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *u
}
else
{
- // *cnt = *cnt + 1;
- *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
+ *cnt = *cnt + 1;
+ // *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
}
return 1;
}
void vlanid_ip_stat(struct traffic_info *tinfo)
{
- uchar vlanid_ip_key[21];
- /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key
- FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key
- */
- memset(vlanid_ip_key, 0, sizeof(char) * 21);
- char vxlan_sip_ip_str[INET_ADDRSTRLEN];
- char vxlan_dip_ip_str[INET_ADDRSTRLEN];
- inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
- inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
+ uchar *vxlanid_ip_key;
+ char *vxlan_sip_ip_str;
+ char *vxlan_dip_ip_str;
+ vxlanid_ip_key = calloc(1, sizeof(uchar) * 21);
+ vxlan_sip_ip_str = calloc(1, sizeof(char) * INET_ADDRSTRLEN);
+ vxlan_dip_ip_str = calloc(1, sizeof(char) * INET_ADDRSTRLEN);
+
+ if (inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN) == NULL ||
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN) == NULL)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "inet_ntop error: sip:%u, dip:%u\n",
+ tinfo->vx_ip_header_src_ip_net, tinfo->vx_ip_header_dst_ip_net);
+ return;
+ }
+ /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key
+ FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key
+ */
if (flow_type == (unsigned int)FLOW_TYPE_REFLUX)
{
- snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str);
+ snprintf(vxlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str);
}
else if (flow_type == (unsigned int)FLOW_TYPE_INJECT)
{
- snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str);
+ snprintf(vxlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str);
}
else
{
@@ -799,21 +826,24 @@ void vlanid_ip_stat(struct traffic_info *tinfo)
}
hash_cb_fun_t *search_cb = vlanid_ip_htable_search_cb;
long search_cb_ret = 0;
- MESA_htable_search_cb(vlanid_ip_htable_handle, vlanid_ip_key, strlen(vlanid_ip_key), search_cb, tinfo, &search_cb_ret);
+ MESA_htable_search_cb(vlanid_ip_htable_handle, vxlanid_ip_key, strlen(vxlanid_ip_key), search_cb, tinfo, &search_cb_ret);
if (search_cb_ret == 0)
{
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_search_cb failed\n");
}
+ free(vxlanid_ip_key);
+ free(vxlan_sip_ip_str);
+ free(vxlan_dip_ip_str);
}
static int tcp_flow_id = -1;
char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq)
{
- // if (!ip_mod(tinfo))
- // {
- // free(tinfo);
- // return DEFAULT_RETURN_VALUE;
- // }
+ if (!ip_mod(tinfo))
+ {
+ free(tinfo);
+ return DEFAULT_RETURN_VALUE;
+ }
if (tcp_flow_id != -1)
{
struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
@@ -833,12 +863,12 @@ char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, i
close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
#endif
- vlanid_ip_stat(tinfo);
- if (!ip_mod(tinfo))
- {
- free(tinfo);
- return DEFAULT_RETURN_VALUE;
- }
+ // vlanid_ip_stat(tinfo);
+ // if (!ip_mod(tinfo))
+ // {
+ // free(tinfo);
+ // return DEFAULT_RETURN_VALUE;
+ // }
#if ENABLE_COUNT_THREAD
push_count[thread_seq]++;
@@ -974,7 +1004,7 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
}
tinfo = (struct traffic_info *)(*pme);
- // vlanid_ip_stat(tinfo);
+ vlanid_ip_stat(tinfo);
if (pstream->pktstate == OP_STATE_CLOSE)
{
@@ -1061,7 +1091,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
tinfo = (struct traffic_info *)(*pme);
- // vlanid_ip_stat(tinfo);
+ vlanid_ip_stat(tinfo);
if (pstream->opstate == OP_STATE_CLOSE)
{
@@ -1182,15 +1212,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
tinfo = (struct traffic_info *)(*pme);
- // vlanid_ip_stat(tinfo);
+ vlanid_ip_stat(tinfo);
if (pstream->opstate == OP_STATE_CLOSE)
{
- // if (!ip_mod(tinfo))
- // {
- // free(tinfo);
- // return DEFAULT_RETURN_VALUE;
- // }
+ if (!ip_mod(tinfo))
+ {
+ free(tinfo);
+ return DEFAULT_RETURN_VALUE;
+ }
if (pdetail != NULL)
{
if (udp_flow_id != -1)
@@ -1220,12 +1250,12 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
#endif
- vlanid_ip_stat(tinfo);
- if (!ip_mod(tinfo))
- {
- free(tinfo);
- return DEFAULT_RETURN_VALUE;
- }
+ // vlanid_ip_stat(tinfo);
+ // if (!ip_mod(tinfo))
+ // {
+ // free(tinfo);
+ // return DEFAULT_RETURN_VALUE;
+ // }
#if ENABLE_COUNT_THREAD
push_count[thread_seq]++;
@@ -1363,19 +1393,25 @@ void timer_work()
void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *user)
{
- char info[64] = {0};
+ char vxlan_ip_key[21] = {0};
+ memcpy(vxlan_ip_key, key, size);
+ char *info;
+ info = calloc(1, sizeof(char) * 64);
+
if (flow_type == FLOW_TYPE_REFLUX)
{
- snprintf(info, 64, "hlsum#%s#%llu", key, *(UINT64 *)data);
+ snprintf(info, 64, "hlsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data);
}
else
{
- snprintf(info, 64, "hzsum#%s#%llu", key, *(UINT64 *)data);
+ snprintf(info, 64, "hzsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data);
}
push_data_to_kafka(info, strlen(info), rkt_sum);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info);
vlanid_ip_pkt_num += *(UINT64 *)data;
vlanid_ip_push_count++;
- // *(UINT64 *)data = 0;
+ *(UINT64 *)data = 0;
+ free(info);
}
void vlanid_ip_thread_work()