summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2019-07-31 20:06:52 +0800
committer李仁杰 <[email protected]>2019-07-31 20:06:52 +0800
commit87d252769423a1bbd013f711454014d3b4807ea9 (patch)
tree273ccc31be24fd8c1dbd4396d8c7280460b6d0a2
parentac3109b014ec4182e24da4edf0f1a87624d936e5 (diff)
Update lirenjie_vxlan_sapp.c,kafka初始化添加参数,修改函数push_data_to_kafka(功能待测)
-rw-r--r--lirenjie_vxlan_sapp.c372
1 files changed, 270 insertions, 102 deletions
diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c
index 0a93fbc..695bbc4 100644
--- a/lirenjie_vxlan_sapp.c
+++ b/lirenjie_vxlan_sapp.c
@@ -12,39 +12,62 @@
#include <signal.h>
#include <netinet/ip.h>
#include <netinet/ip6.h>
+#include <netinet/udp.h>
#include "MESA_handle_logger.h"
+#include "MESA_prof_load.h"
#include "stream.h"
-#include "gdev_keepalive.h"
#include "rdkafka.h"
+
+int version_20190730;
+
+#define DEFAULT_RETURN_VALUE (APP_STATE_GIVEME | APP_STATE_DROPPKT)
+
+extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order);
+extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
+extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action));
+extern int g_business_plug_type;
+
#define MAX_LOG_INFO_LEN 256
#define MAX_TRAFFIC_INFO_LEN 1024
-const char *module_name = "lirenjie_vxlan";
-const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
-const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
-const char *gdev_conf_path = "./conf/gdev.conf";
-const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
-void *runtime_log_handler;
-void *kafka_log_handler;
-char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
-unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
-unsigned char flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+static const char *module_name = "lirenjie_vxlan";
+static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
+static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
+static const char *gdev_conf_path = "./conf/gdev.conf";
+static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
+static void *runtime_log_handler;
+static void *kafka_log_handler;
+//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
+static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */
+
+#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */
+
+static unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+
+enum flow_type_t{
+ FLOW_TYPE_REFLUX = 0, /* 回流 */
+ FLOW_TYPE_INJECT = 1, /* 回注 */
+};
+static unsigned int flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
/* kafka */
-const int PRODUCER_INIT_FAILED = -1;
-const int PRODUCER_INIT_SUCCESS = 0;
-const int PUSH_DATA_FAILED = -1;
-const int PUSH_DATA_SUCCESS = 0;
-int partition;
+static const int PRODUCER_INIT_FAILED = -1;
+static const int PRODUCER_INIT_SUCCESS = 0;
+static const int PUSH_DATA_FAILED = -1;
+static const int PUSH_DATA_SUCCESS = 0;
+static int partition;
//rd
-rd_kafka_t *kafka_producer;
-rd_kafka_conf_t *conf;
+static rd_kafka_t *kafka_producer;
+static rd_kafka_conf_t *conf;
// topic
-rd_kafka_topic_t *rkt;
-rd_kafka_topic_conf_t *topic_conf;
+static rd_kafka_topic_t *rkt;
+static rd_kafka_topic_conf_t *topic_conf;
// char errstr[512]={0};
-char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
-char *topic = "G_BACK_TRAFFIC_STATISTIC";
+//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
+//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092";
+char brokers[128];
+
+static char *topic = "G_BACK_TRAFFIC_STATISTIC_new";
struct traffic_info
{
@@ -57,11 +80,13 @@ struct traffic_info
//struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday
// time_t stat_time; // 秒级
struct timeval stat_time; //微妙级
- char vx_ip_header_src_ip[INET_ADDRSTRLEN];
+ unsigned int vx_ip_header_src_ip_net;
+ unsigned int vx_ip_header_dst_ip_net;
unsigned short vx_UDP_header_src_port;
unsigned short vx_UDP_header_dst_port;
struct layer_addr addr;
- unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
+ //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
+ int vxlan_vpn_id;
unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806
/* ipv4 src_ip dst_ip identification fragment_offset*/
char ipv4_sip[INET_ADDRSTRLEN];
@@ -90,10 +115,10 @@ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
-int init_kafka(int partition_, char *brokers_, char *topic_)
+static int init_kafka(int partition_, char *brokers_, char *topic_)
{
char tmp[16];
- char errstr[512];
+ char errstr[1024];
partition = partition_;
/* Kafka configuration */
conf = rd_kafka_conf_new();
@@ -102,13 +127,16 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr));
/*topic configuration*/
topic_conf = rd_kafka_topic_conf_new();
if (conf == NULL)
{
//fprintf(stderr, "***** Failed to create new conf *******\n");
- MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** Failed to create new conf *******");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** Failed to create new conf *******");
return PRODUCER_INIT_FAILED;
}
kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
@@ -116,8 +144,8 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
{
/*fprintf(stderr, "***** kafka_producer is null *******\n");
fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/
- MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** kafka_producer is null, *******");
- MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"*****Failed to create new producer: %s*******\n", errstr);
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** kafka_producer is null, *******");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"*****Failed to create new producer: %s*******\n", errstr);
return PRODUCER_INIT_FAILED;
}
@@ -127,7 +155,7 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0)
{
//fprintf(stderr, "****** No valid brokers specified********\n");
- MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"****** No valid brokers specified********");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"****** No valid brokers specified********");
return PRODUCER_INIT_FAILED;
}
/* Create topic */
@@ -136,13 +164,13 @@ int init_kafka(int partition_, char *brokers_, char *topic_)
return PRODUCER_INIT_SUCCESS;
}
-void kafka_destroy()
+static void kafka_destroy()
{
rd_kafka_topic_destroy(rkt);
rd_kafka_destroy(kafka_producer);
}
-int push_data_to_kafka(char *buffer, int buf_len)
+static int push_data_to_kafka(char *buffer, int buf_len)
{
int ret;
if (buffer == NULL)
@@ -150,6 +178,7 @@ int push_data_to_kafka(char *buffer, int buf_len)
return 0;
}
ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
if (ret == -1)
{
/*fprintf(stderr,
@@ -161,17 +190,19 @@ int push_data_to_kafka(char *buffer, int buf_len)
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
- rd_kafka_poll(kafka_producer, 0);
+ //rd_kafka_poll(kafka_producer, 0);
+ return PUSH_DATA_FAILED;
}
/*fprintf(stderr, "%% Sent %zd bytes to topic "
"%s partition %i\n",
buf_len, rd_kafka_topic_name(rkt), partition);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
buf_len, rd_kafka_topic_name(rkt), partition);
- rd_kafka_poll(kafka_producer, 0);
+ //rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_SUCCESS;
}
+#if 0
unsigned char get_service_id(struct streaminfo *pstream)
{
int ret;
@@ -236,8 +267,9 @@ unsigned char get_service_id(struct streaminfo *pstream)
return service_id;
}
+#endif
-unsigned short get_proto_type(struct streaminfo *pstream)
+static unsigned short get_proto_type(struct streaminfo *pstream)
{
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
{
@@ -256,7 +288,24 @@ unsigned short get_proto_type(struct streaminfo *pstream)
}
}
-unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
+static int get_vpnid_from_stream(struct streaminfo *pstream)
+{
+ int vpn_id_net_order;
+ int ret;
+
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_VPNID, &vpn_id_net_order);
+ if (ret >= 0)
+ {
+ return ntohl(vpn_id_net_order);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+#if 0
+static unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
{
int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
@@ -272,7 +321,8 @@ unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
}
}
-unsigned char get_service_id_from_sport(struct streaminfo *pstream)
+
+static unsigned char get_service_id_from_sport(struct streaminfo *pstream)
{
int ret;
unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
@@ -291,22 +341,58 @@ unsigned char get_service_id_from_sport(struct streaminfo *pstream)
return 0;
}
}
+#endif
+
-void get_vx_ip_header_src_ip(struct streaminfo *pstream, struct traffic_info *tinfo)
+static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net)
{
- int gdev_ip;
- int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip);
+ if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) ==
+ (ntohl(local_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK)){
+ return 1;
+ }
+
+ return 0;
+}
+
+static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo)
+{
+ int gdev_ip_net;
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net);
if (ret >= 0)
{
- inet_ntop(AF_INET, &gdev_ip, tinfo->vx_ip_header_src_ip, INET_ADDRSTRLEN);
+ if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){
+ flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网段, 说明是回注包 */
+ }else{
+ flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网段, 说明是回流包 */
+ }
+#if 0
+ if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, "vxlan src and dst ip is equal!");
+ ret = -1;
+ }
+#endif
+
+ if(FLOW_TYPE_REFLUX == flow_type){
+ tinfo->vx_ip_header_src_ip_net = gdev_ip_net;
+ tinfo->vx_ip_header_dst_ip_net = sapp_keepalive_reflux_ip_net;
+ ret = 0;
+ }else{
+ tinfo->vx_ip_header_src_ip_net = sapp_keepalive_reflux_ip_net;
+ tinfo->vx_ip_header_dst_ip_net = gdev_ip_net;
+ ret = 0;
+ }
}
else
{
- memset(tinfo->vx_ip_header_src_ip, 0, INET_ADDRSTRLEN);
+ tinfo->vx_ip_header_src_ip_net = 0;
+ tinfo->vx_ip_header_dst_ip_net = 0;
+ ret = -1;
}
+
+ return ret;
}
-unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream)
+static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream)
{
unsigned short vxlan_sport;
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
@@ -321,7 +407,8 @@ unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream)
}
/* 获取pstream中的四元组 */
-void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
+#if 0
+static void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
{
// printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
@@ -380,8 +467,9 @@ void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
{
}
}
+#endif
-void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
+static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
{
// printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
@@ -393,7 +481,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
struct ip *ip_hdr = (struct ip *)rawpkt;
tinfo->ipv4_id = ntohs(ip_hdr->ip_id);
tinfo->ipv4_off = ntohs(ip_hdr->ip_off);
-
+#if 0
char info[MAX_LOG_INFO_LEN] = {0};
if (tinfo->service_id > 0)
{
@@ -408,6 +496,8 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
tinfo->ipv4_dip, ntohs(tuple4_v4->dest));
}
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+#endif
+
}
else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
{
@@ -421,6 +511,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen);
tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt;
tinfo->ipv6_limit = ip6_head->ip6_hlim;
+#if 0
char info[MAX_LOG_INFO_LEN] = {0};
if (tinfo->service_id > 0)
@@ -435,6 +526,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
}
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,sizeof(info));
+#endif
}
else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
{
@@ -442,26 +534,29 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const
}
}
-void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
+static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
{
char protocol[8];
+ char vxlan_sip_ip_str[INET_ADDRSTRLEN];
+ char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+
switch (tinfo->protocol)
{
- case 1:
- sprintf(protocol, "%s", "IPv4_TCP");
- break;
- case 2:
- sprintf(protocol, "%s", "IPv4_UDP");
- break;
- case 3:
- sprintf(protocol, "%s", "IPv6_TCP");
- break;
- case 4:
- sprintf(protocol, "%s", "IPv6_UDP");
- break;
- default:
- sprintf(protocol, "%s", "others");
- break;
+ case 1:
+ sprintf(protocol, "%s", "IPv4_TCP");
+ break;
+ case 2:
+ sprintf(protocol, "%s", "IPv4_UDP");
+ break;
+ case 3:
+ sprintf(protocol, "%s", "IPv6_TCP");
+ break;
+ case 4:
+ sprintf(protocol, "%s", "IPv6_UDP");
+ break;
+ default:
+ sprintf(protocol, "%s", "others");
+ break;
}
/*
printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ",
@@ -476,6 +571,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit);
*/
char info[MAX_TRAFFIC_INFO_LEN] = {0};
+
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
switch (tinfo->addr.addrtype)
{
@@ -483,14 +581,16 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
snprintf(info, MAX_TRAFFIC_INFO_LEN,
"{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
"\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
- "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
- "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%d,"
+ "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
+ "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
"\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
- tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
- tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
- tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,flow_type);
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
+ tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,
+ //ipv6 stat is NULL,
+ flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
@@ -498,13 +598,13 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
snprintf(info, MAX_TRAFFIC_INFO_LEN,
"{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
"\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
- "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
+ "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
"\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
- tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
- tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length,
tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
@@ -519,8 +619,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
"\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
- tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
- tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,flow_type);
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
+ flow_type);
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
push_data_to_kafka(info,strlen(info));
break;
@@ -531,8 +632,8 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
{
- printf("TCP_ENTRY_ALL SUCCESS!!!\n");
- return APP_STATE_GIVEME;
+ //printf("TCP_ENTRY_ALL SUCCESS!!!\n");
+ return DEFAULT_RETURN_VALUE;
}
static int tcp_flow_id = -1;
@@ -554,32 +655,35 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
- tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
/* PROTOCOL */
switch (pstream->addr.addrtype)
{
- case ADDR_TYPE_IPV4:
- tinfo->protocol = 1; // IPV4_TCP
- tinfo->vx_type = 0x0800;
- break;
- case ADDR_TYPE_IPV6:
- tinfo->protocol = 3; // IPV6_TCP
- tinfo->vx_type = 0x86DD;
- break;
- case ADDR_TYPE_ARP:
- tinfo->protocol = 0;
- tinfo->vx_type = 0x0806;
- break;
- default:
- tinfo->protocol = 0;
- tinfo->vx_type = 0;
- break;
+ case ADDR_TYPE_IPV4:
+ tinfo->protocol = 1; // IPV4_TCP
+ tinfo->vx_type = 0x0800;
+ break;
+ case ADDR_TYPE_IPV6:
+ tinfo->protocol = 3; // IPV6_TCP
+ tinfo->vx_type = 0x86DD;
+ break;
+ case ADDR_TYPE_ARP:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0x0806;
+ break;
+ default:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0;
+ break;
}
/* vx_UDP_header_src_port dst_port*/
tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
tinfo->vx_UDP_header_dst_port = 4789;
/* vx_ip_header_src_ip dst_ip*/
- get_vx_ip_header_src_ip(pstream, tinfo);
+ if(get_vxlan_ip_addr(pstream, tinfo) < 0){
+ goto error_drop;
+ }
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
/* 应用层协议类型 用目的端口表示 */
@@ -603,7 +707,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
}
*/
- if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
+ if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
{
//printf("TCP_ENTRY SUCCESS!!!\n");
/* 获取包数字节数 */
@@ -633,15 +737,48 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
free(tinfo);
// printf("\n");
}
- return APP_STATE_GIVEME;
+ return DEFAULT_RETURN_VALUE;
+
+error_drop:
+ free(tinfo);
+ return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
+/*
+ add by lijia 20190604.
+*/
+static inline int is_gdev_keepalive_pkt(const struct ip *iphdr)
+{
+ const struct udphdr *udh;
+
+ if(NULL == iphdr){
+ return 0;
+ }
+
+ if(iphdr->ip_p != 17){
+ return 0;
+ }
+
+ udh = (struct udphdr *)((char *)iphdr + iphdr->ip_hl*4);
+
+ if(udh->dest == ntohs(3784)){
+ return 1;
+ }
+
+ return 0;
+}
+
+
static int udp_flow_id = -1;
char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
{
struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
+ if(is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0){ //add by lijia 20190604, drop BFD keepalive packet.
+ return APP_STATE_DROPME;
+ }
+
if (-1 == udp_flow_id)
{
udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct");
@@ -655,7 +792,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
- tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
+
/* PROTOCOL */
switch (pstream->addr.addrtype)
{
@@ -680,7 +819,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
tinfo->vx_UDP_header_dst_port = 4789;
/* vx_ip_header_src_ip dst_ip*/
- get_vx_ip_header_src_ip(pstream, tinfo);
+ if(get_vxlan_ip_addr(pstream, tinfo) < 0){
+ goto error_drop;
+ }
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
/* 应用层协议类型 用目的端口表示 */
@@ -690,7 +831,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
tinfo = (struct traffic_info *)(*pme);
- if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
+ if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
{
//printf("UDP_ENTRY SUCCESS!!!\n");
@@ -728,7 +869,16 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
free(tinfo);
// printf("\n");
}
- return APP_STATE_GIVEME;
+ return DEFAULT_RETURN_VALUE;
+
+error_drop:
+ free(tinfo);
+ return APP_STATE_DROPME | APP_STATE_DROPPKT;
+}
+
+static char return_action_cb_fun(int net_conn_mode, char plug_action)
+{
+ return 0; //所有包默认都DROP
}
int CHAR_INIT()
@@ -736,8 +886,14 @@ int CHAR_INIT()
int demo_plugid = 51;
runtime_log_handler = NULL;
kafka_log_handler = NULL;
- runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, RLOG_LV_INFO);
- kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, RLOG_LV_INFO);
+ char str_tmp[128];
+
+ int log_level = 30;
+
+ MESA_load_profile_int_def(entrance_id_path, "LOG", "log_level", &log_level, 30);
+
+ runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level);
+ kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level);
if (runtime_log_handler == NULL || kafka_log_handler == NULL)
{
/* code */
@@ -747,9 +903,17 @@ int CHAR_INIT()
/* ENTRANCE_ID */
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0);
/* FLOW_TYPE */
- MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, 0);
+ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX);
+ /* Brokers */
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#");
/* vx_ip_header_dst_ip */
- MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", vx_ip_header_dst_ip, INET_ADDRSTRLEN, "0.0.0.0");
+ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#");
+ if('#' == str_tmp[0]){
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"can't get %s->sendto_gdev_ip!!", gdev_conf_path);
+ return -1;
+ }
+ inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net);
+
/* kafka初始化 */
if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
{
@@ -757,6 +921,9 @@ int CHAR_INIT()
return -1;
}
+ platform_register_action_judge(return_action_cb_fun);
+ g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */
+
// 函数实现自定义
// 只要求函数返回值为插件ID;
//printf("INIT SUCCESS!!!\n");
@@ -777,3 +944,4 @@ void LRJ_APP_DESTROY()
MESA_destroy_runtime_log_handle(kafka_log_handler);
printf("TEST_APP_DESTORY out...\n");
}
+