diff options
| author | 李仁杰 <[email protected]> | 2019-07-31 20:06:52 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2019-07-31 20:06:52 +0800 |
| commit | 87d252769423a1bbd013f711454014d3b4807ea9 (patch) | |
| tree | 273ccc31be24fd8c1dbd4396d8c7280460b6d0a2 | |
| parent | ac3109b014ec4182e24da4edf0f1a87624d936e5 (diff) | |
Update lirenjie_vxlan_sapp.c,kafka初始化添加参数,修改函数push_data_to_kafka(功能待测)
| -rw-r--r-- | lirenjie_vxlan_sapp.c | 372 |
1 files changed, 270 insertions, 102 deletions
diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c index 0a93fbc..695bbc4 100644 --- a/lirenjie_vxlan_sapp.c +++ b/lirenjie_vxlan_sapp.c @@ -12,39 +12,62 @@ #include <signal.h> #include <netinet/ip.h> #include <netinet/ip6.h> +#include <netinet/udp.h> #include "MESA_handle_logger.h" +#include "MESA_prof_load.h" #include "stream.h" -#include "gdev_keepalive.h" #include "rdkafka.h" + +int version_20190730; + +#define DEFAULT_RETURN_VALUE (APP_STATE_GIVEME | APP_STATE_DROPPKT) + +extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order); +extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order); +extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action)); +extern int g_business_plug_type; + #define MAX_LOG_INFO_LEN 256 #define MAX_TRAFFIC_INFO_LEN 1024 -const char *module_name = "lirenjie_vxlan"; -const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; -const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; -const char *gdev_conf_path = "./conf/gdev.conf"; -const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf"; -void *runtime_log_handler; -void *kafka_log_handler; -char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip -unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ -unsigned char flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ +static const char *module_name = "lirenjie_vxlan"; +static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; +static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; +static const char *gdev_conf_path = "./conf/gdev.conf"; +static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf"; +static void *runtime_log_handler; +static void *kafka_log_handler; +//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip +static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */ + +#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */ + +static unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ + +enum flow_type_t{ + FLOW_TYPE_REFLUX = 0, /* 回流 */ + FLOW_TYPE_INJECT = 1, /* 回注 */ +}; +static unsigned int flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ /* kafka */ -const int PRODUCER_INIT_FAILED = -1; -const int PRODUCER_INIT_SUCCESS = 0; -const int PUSH_DATA_FAILED = -1; -const int PUSH_DATA_SUCCESS = 0; -int partition; +static const int PRODUCER_INIT_FAILED = -1; +static const int PRODUCER_INIT_SUCCESS = 0; +static const int PUSH_DATA_FAILED = -1; +static const int PUSH_DATA_SUCCESS = 0; +static int partition; //rd -rd_kafka_t *kafka_producer; -rd_kafka_conf_t *conf; +static rd_kafka_t *kafka_producer; +static rd_kafka_conf_t *conf; // topic -rd_kafka_topic_t *rkt; -rd_kafka_topic_conf_t *topic_conf; +static rd_kafka_topic_t *rkt; +static rd_kafka_topic_conf_t *topic_conf; // char errstr[512]={0}; -char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092"; -char *topic = "G_BACK_TRAFFIC_STATISTIC"; +//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092"; +//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092"; +char brokers[128]; + +static char *topic = "G_BACK_TRAFFIC_STATISTIC_new"; struct traffic_info { @@ -57,11 +80,13 @@ struct traffic_info //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday // time_t stat_time; // 秒级 struct timeval stat_time; //微妙级 - char vx_ip_header_src_ip[INET_ADDRSTRLEN]; + unsigned int vx_ip_header_src_ip_net; + unsigned int vx_ip_header_dst_ip_net; unsigned short vx_UDP_header_src_port; unsigned short vx_UDP_header_dst_port; struct layer_addr addr; - unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */ + //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */ + int vxlan_vpn_id; unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806 /* ipv4 src_ip dst_ip identification fragment_offset*/ char ipv4_sip[INET_ADDRSTRLEN]; @@ -90,10 +115,10 @@ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } -int init_kafka(int partition_, char *brokers_, char *topic_) +static int init_kafka(int partition_, char *brokers_, char *topic_) { char tmp[16]; - char errstr[512]; + char errstr[1024]; partition = partition_; /* Kafka configuration */ conf = rd_kafka_conf_new(); @@ -102,13 +127,16 @@ int init_kafka(int partition_, char *brokers_, char *topic_) /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(conf, "request.required.acks", "1", kafka_errstr, sizeof(kafka_errstr)); /*topic configuration*/ topic_conf = rd_kafka_topic_conf_new(); if (conf == NULL) { //fprintf(stderr, "***** Failed to create new conf *******\n"); - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** Failed to create new conf *******"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** Failed to create new conf *******"); return PRODUCER_INIT_FAILED; } kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); @@ -116,8 +144,8 @@ int init_kafka(int partition_, char *brokers_, char *topic_) { /*fprintf(stderr, "***** kafka_producer is null *******\n"); fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/ - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** kafka_producer is null, *******"); - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"*****Failed to create new producer: %s*******\n", errstr); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"***** kafka_producer is null, *******"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"*****Failed to create new producer: %s*******\n", errstr); return PRODUCER_INIT_FAILED; } @@ -127,7 +155,7 @@ int init_kafka(int partition_, char *brokers_, char *topic_) if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0) { //fprintf(stderr, "****** No valid brokers specified********\n"); - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"****** No valid brokers specified********"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"****** No valid brokers specified********"); return PRODUCER_INIT_FAILED; } /* Create topic */ @@ -136,13 +164,13 @@ int init_kafka(int partition_, char *brokers_, char *topic_) return PRODUCER_INIT_SUCCESS; } -void kafka_destroy() +static void kafka_destroy() { rd_kafka_topic_destroy(rkt); rd_kafka_destroy(kafka_producer); } -int push_data_to_kafka(char *buffer, int buf_len) +static int push_data_to_kafka(char *buffer, int buf_len) { int ret; if (buffer == NULL) @@ -150,6 +178,7 @@ int push_data_to_kafka(char *buffer, int buf_len) return 0; } ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); if (ret == -1) { /*fprintf(stderr, @@ -161,17 +190,19 @@ int push_data_to_kafka(char *buffer, int buf_len) rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error())); /* Poll to handle delivery reports */ - rd_kafka_poll(kafka_producer, 0); + //rd_kafka_poll(kafka_producer, 0); + return PUSH_DATA_FAILED; } /*fprintf(stderr, "%% Sent %zd bytes to topic " "%s partition %i\n", buf_len, rd_kafka_topic_name(rkt), partition);*/ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", buf_len, rd_kafka_topic_name(rkt), partition); - rd_kafka_poll(kafka_producer, 0); + //rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_SUCCESS; } +#if 0 unsigned char get_service_id(struct streaminfo *pstream) { int ret; @@ -236,8 +267,9 @@ unsigned char get_service_id(struct streaminfo *pstream) return service_id; } +#endif -unsigned short get_proto_type(struct streaminfo *pstream) +static unsigned short get_proto_type(struct streaminfo *pstream) { if (pstream->addr.addrtype == ADDR_TYPE_IPV4) { @@ -256,7 +288,24 @@ unsigned short get_proto_type(struct streaminfo *pstream) } } -unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream) +static int get_vpnid_from_stream(struct streaminfo *pstream) +{ + int vpn_id_net_order; + int ret; + + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_VPNID, &vpn_id_net_order); + if (ret >= 0) + { + return ntohl(vpn_id_net_order); + } + else + { + return 0; + } +} + +#if 0 +static unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream) { int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id); @@ -272,7 +321,8 @@ unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream) } } -unsigned char get_service_id_from_sport(struct streaminfo *pstream) + +static unsigned char get_service_id_from_sport(struct streaminfo *pstream) { int ret; unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ @@ -291,22 +341,58 @@ unsigned char get_service_id_from_sport(struct streaminfo *pstream) return 0; } } +#endif + -void get_vx_ip_header_src_ip(struct streaminfo *pstream, struct traffic_info *tinfo) +static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net) { - int gdev_ip; - int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip); + if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) == + (ntohl(local_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK)){ + return 1; + } + + return 0; +} + +static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo) +{ + int gdev_ip_net; + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net); if (ret >= 0) { - inet_ntop(AF_INET, &gdev_ip, tinfo->vx_ip_header_src_ip, INET_ADDRSTRLEN); + if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){ + flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网段, 说明是回注包 */ + }else{ + flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网段, 说明是回流包 */ + } +#if 0 + if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */ + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, "vxlan src and dst ip is equal!"); + ret = -1; + } +#endif + + if(FLOW_TYPE_REFLUX == flow_type){ + tinfo->vx_ip_header_src_ip_net = gdev_ip_net; + tinfo->vx_ip_header_dst_ip_net = sapp_keepalive_reflux_ip_net; + ret = 0; + }else{ + tinfo->vx_ip_header_src_ip_net = sapp_keepalive_reflux_ip_net; + tinfo->vx_ip_header_dst_ip_net = gdev_ip_net; + ret = 0; + } } else { - memset(tinfo->vx_ip_header_src_ip, 0, INET_ADDRSTRLEN); + tinfo->vx_ip_header_src_ip_net = 0; + tinfo->vx_ip_header_dst_ip_net = 0; + ret = -1; } + + return ret; } -unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream) +static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream) { unsigned short vxlan_sport; int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); @@ -321,7 +407,8 @@ unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream) } /* 获取pstream中的四元组 */ -void get_tuple4(struct streaminfo *pstream, unsigned char service_id) +#if 0 +static void get_tuple4(struct streaminfo *pstream, unsigned char service_id) { // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); if (pstream->addr.addrtype == ADDR_TYPE_IPV4) @@ -380,8 +467,9 @@ void get_tuple4(struct streaminfo *pstream, unsigned char service_id) { } } +#endif -void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt) +static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt) { // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); if (pstream->addr.addrtype == ADDR_TYPE_IPV4) @@ -393,7 +481,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const struct ip *ip_hdr = (struct ip *)rawpkt; tinfo->ipv4_id = ntohs(ip_hdr->ip_id); tinfo->ipv4_off = ntohs(ip_hdr->ip_off); - +#if 0 char info[MAX_LOG_INFO_LEN] = {0}; if (tinfo->service_id > 0) { @@ -408,6 +496,8 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const tinfo->ipv4_dip, ntohs(tuple4_v4->dest)); } // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); +#endif + } else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) { @@ -421,6 +511,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen); tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt; tinfo->ipv6_limit = ip6_head->ip6_hlim; +#if 0 char info[MAX_LOG_INFO_LEN] = {0}; if (tinfo->service_id > 0) @@ -435,6 +526,7 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const } // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,sizeof(info)); +#endif } else if (pstream->addr.addrtype == ADDR_TYPE_ARP) { @@ -442,26 +534,29 @@ void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const } } -void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) +static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) { char protocol[8]; + char vxlan_sip_ip_str[INET_ADDRSTRLEN]; + char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + switch (tinfo->protocol) { - case 1: - sprintf(protocol, "%s", "IPv4_TCP"); - break; - case 2: - sprintf(protocol, "%s", "IPv4_UDP"); - break; - case 3: - sprintf(protocol, "%s", "IPv6_TCP"); - break; - case 4: - sprintf(protocol, "%s", "IPv6_UDP"); - break; - default: - sprintf(protocol, "%s", "others"); - break; + case 1: + sprintf(protocol, "%s", "IPv4_TCP"); + break; + case 2: + sprintf(protocol, "%s", "IPv4_UDP"); + break; + case 3: + sprintf(protocol, "%s", "IPv6_TCP"); + break; + case 4: + sprintf(protocol, "%s", "IPv6_UDP"); + break; + default: + sprintf(protocol, "%s", "others"); + break; } /* printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ", @@ -476,6 +571,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit); */ char info[MAX_TRAFFIC_INFO_LEN] = {0}; + + inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); switch (tinfo->addr.addrtype) { @@ -483,14 +581,16 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) snprintf(info, MAX_TRAFFIC_INFO_LEN, "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d," "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," - "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," - "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%d," + "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," + "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u," "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}", tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, - tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, - tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id, - tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,flow_type); + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, + tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off, + //ipv6 stat is NULL, + flow_type); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; @@ -498,13 +598,13 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) snprintf(info, MAX_TRAFFIC_INFO_LEN, "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d," "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," - "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," + "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d," "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d}", tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, - tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, - tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id, + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length, tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); @@ -519,8 +619,9 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}", tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, - tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, - tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,flow_type); + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, + flow_type); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); push_data_to_kafka(info,strlen(info)); break; @@ -531,8 +632,8 @@ void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) { - printf("TCP_ENTRY_ALL SUCCESS!!!\n"); - return APP_STATE_GIVEME; + //printf("TCP_ENTRY_ALL SUCCESS!!!\n"); + return DEFAULT_RETURN_VALUE; } static int tcp_flow_id = -1; @@ -554,32 +655,35 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 - tinfo->service_id = get_service_id_from_vxlanid(pstream); + //tinfo->service_id = get_service_id_from_vxlanid(pstream); + tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); /* PROTOCOL */ switch (pstream->addr.addrtype) { - case ADDR_TYPE_IPV4: - tinfo->protocol = 1; // IPV4_TCP - tinfo->vx_type = 0x0800; - break; - case ADDR_TYPE_IPV6: - tinfo->protocol = 3; // IPV6_TCP - tinfo->vx_type = 0x86DD; - break; - case ADDR_TYPE_ARP: - tinfo->protocol = 0; - tinfo->vx_type = 0x0806; - break; - default: - tinfo->protocol = 0; - tinfo->vx_type = 0; - break; + case ADDR_TYPE_IPV4: + tinfo->protocol = 1; // IPV4_TCP + tinfo->vx_type = 0x0800; + break; + case ADDR_TYPE_IPV6: + tinfo->protocol = 3; // IPV6_TCP + tinfo->vx_type = 0x86DD; + break; + case ADDR_TYPE_ARP: + tinfo->protocol = 0; + tinfo->vx_type = 0x0806; + break; + default: + tinfo->protocol = 0; + tinfo->vx_type = 0; + break; } /* vx_UDP_header_src_port dst_port*/ tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); tinfo->vx_UDP_header_dst_port = 4789; /* vx_ip_header_src_ip dst_ip*/ - get_vx_ip_header_src_ip(pstream, tinfo); + if(get_vxlan_ip_addr(pstream, tinfo) < 0){ + goto error_drop; + } /* IPv4、IPv6头部信息 */ get_ip_detail(pstream, tinfo, raw_pkt); /* 应用层协议类型 用目的端口表示 */ @@ -603,7 +707,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } } */ - if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0) + if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) { //printf("TCP_ENTRY SUCCESS!!!\n"); /* 获取包数字节数 */ @@ -633,15 +737,48 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi free(tinfo); // printf("\n"); } - return APP_STATE_GIVEME; + return DEFAULT_RETURN_VALUE; + +error_drop: + free(tinfo); + return APP_STATE_DROPME | APP_STATE_DROPPKT; } +/* + add by lijia 20190604. +*/ +static inline int is_gdev_keepalive_pkt(const struct ip *iphdr) +{ + const struct udphdr *udh; + + if(NULL == iphdr){ + return 0; + } + + if(iphdr->ip_p != 17){ + return 0; + } + + udh = (struct udphdr *)((char *)iphdr + iphdr->ip_hl*4); + + if(udh->dest == ntohs(3784)){ + return 1; + } + + return 0; +} + + static int udp_flow_id = -1; char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) { struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail; struct traffic_info *tinfo; + if(is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0){ //add by lijia 20190604, drop BFD keepalive packet. + return APP_STATE_DROPME; + } + if (-1 == udp_flow_id) { udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct"); @@ -655,7 +792,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 - tinfo->service_id = get_service_id_from_vxlanid(pstream); + //tinfo->service_id = get_service_id_from_vxlanid(pstream); + tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); + /* PROTOCOL */ switch (pstream->addr.addrtype) { @@ -680,7 +819,9 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); tinfo->vx_UDP_header_dst_port = 4789; /* vx_ip_header_src_ip dst_ip*/ - get_vx_ip_header_src_ip(pstream, tinfo); + if(get_vxlan_ip_addr(pstream, tinfo) < 0){ + goto error_drop; + } /* IPv4、IPv6头部信息 */ get_ip_detail(pstream, tinfo, raw_pkt); /* 应用层协议类型 用目的端口表示 */ @@ -690,7 +831,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } tinfo = (struct traffic_info *)(*pme); - if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0) + if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) { //printf("UDP_ENTRY SUCCESS!!!\n"); @@ -728,7 +869,16 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi free(tinfo); // printf("\n"); } - return APP_STATE_GIVEME; + return DEFAULT_RETURN_VALUE; + +error_drop: + free(tinfo); + return APP_STATE_DROPME | APP_STATE_DROPPKT; +} + +static char return_action_cb_fun(int net_conn_mode, char plug_action) +{ + return 0; //所有包默认都DROP } int CHAR_INIT() @@ -736,8 +886,14 @@ int CHAR_INIT() int demo_plugid = 51; runtime_log_handler = NULL; kafka_log_handler = NULL; - runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, RLOG_LV_INFO); - kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, RLOG_LV_INFO); + char str_tmp[128]; + + int log_level = 30; + + MESA_load_profile_int_def(entrance_id_path, "LOG", "log_level", &log_level, 30); + + runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level); + kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level); if (runtime_log_handler == NULL || kafka_log_handler == NULL) { /* code */ @@ -747,9 +903,17 @@ int CHAR_INIT() /* ENTRANCE_ID */ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0); /* FLOW_TYPE */ - MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, 0); + MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX); + /* Brokers */ + MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#"); /* vx_ip_header_dst_ip */ - MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", vx_ip_header_dst_ip, INET_ADDRSTRLEN, "0.0.0.0"); + MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#"); + if('#' == str_tmp[0]){ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"can't get %s->sendto_gdev_ip!!", gdev_conf_path); + return -1; + } + inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net); + /* kafka初始化 */ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) { @@ -757,6 +921,9 @@ int CHAR_INIT() return -1; } + platform_register_action_judge(return_action_cb_fun); + g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */ + // 函数实现自定义 // 只要求函数返回值为插件ID; //printf("INIT SUCCESS!!!\n"); @@ -777,3 +944,4 @@ void LRJ_APP_DESTROY() MESA_destroy_runtime_log_handle(kafka_log_handler); printf("TEST_APP_DESTORY out...\n"); } + |
