diff options
| author | 李仁杰 <[email protected]> | 2019-05-09 13:00:28 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2019-05-09 13:00:28 +0800 |
| commit | 89d2a94f8560277fc2a8b32d6430b9731b2043d6 (patch) | |
| tree | d07feea5234b7bfac0bbb0b732245ff689b0d2c3 | |
| parent | 4e1cef039cd01513b4c552bb4a7f9f8f3bba1144 (diff) | |
源码文件
| -rw-r--r-- | lirenjie_vxlan_sapp.c | 779 |
1 files changed, 779 insertions, 0 deletions
diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c new file mode 100644 index 0000000..0a93fbc --- /dev/null +++ b/lirenjie_vxlan_sapp.c @@ -0,0 +1,779 @@ +#include <stdlib.h> +#include <stdio.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netinet/udp.h> +#include <assert.h> +#include <sys/time.h> +#include <time.h> +#include <arpa/inet.h> +#include <syslog.h> +#include <signal.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include "MESA_handle_logger.h" +#include "stream.h" +#include "gdev_keepalive.h" +#include "rdkafka.h" + +#define MAX_LOG_INFO_LEN 256 +#define MAX_TRAFFIC_INFO_LEN 1024 +const char *module_name = "lirenjie_vxlan"; +const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; +const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; +const char *gdev_conf_path = "./conf/gdev.conf"; +const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf"; +void *runtime_log_handler; +void *kafka_log_handler; +char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip +unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ +unsigned char flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ + +/* kafka */ +const int PRODUCER_INIT_FAILED = -1; +const int PRODUCER_INIT_SUCCESS = 0; +const int PUSH_DATA_FAILED = -1; +const int PUSH_DATA_SUCCESS = 0; +int partition; +//rd +rd_kafka_t *kafka_producer; +rd_kafka_conf_t *conf; +// topic +rd_kafka_topic_t *rkt; +rd_kafka_topic_conf_t *topic_conf; +// char errstr[512]={0}; +char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092"; +char *topic = "G_BACK_TRAFFIC_STATISTIC"; + +struct traffic_info +{ + unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0 + unsigned short PROTO_TYPE; //应用层协议类型,用目的端口来表示 + UINT32 C2S_pkt_num; /* C2S, you should better use stream_project.h : struct udp_flow_stat */ + UINT32 S2C_pkt_num; /* S2C, you should better use stream_project.h : struct udp_flow_stat */ + UINT32 C2S_bytes; /* C2S, you should better use stream_project.h : struct udp_flow_stat */ + UINT32 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */ + //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday + // time_t stat_time; // 秒级 + struct timeval stat_time; //微妙级 + char vx_ip_header_src_ip[INET_ADDRSTRLEN]; + unsigned short vx_UDP_header_src_port; + unsigned short vx_UDP_header_dst_port; + struct layer_addr addr; + unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */ + unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806 + /* ipv4 src_ip dst_ip identification fragment_offset*/ + char ipv4_sip[INET_ADDRSTRLEN]; + char ipv4_dip[INET_ADDRSTRLEN]; + unsigned short ipv4_id; + unsigned short ipv4_off; + /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/ + char ipv6_sip[INET6_ADDRSTRLEN]; + char ipv6_dip[INET6_ADDRSTRLEN]; + unsigned char ipv6_bus_type; + unsigned int ipv6_flow_flag; + unsigned short ipv6_load_length; + unsigned char ipv6_next_msg_head; + unsigned char ipv6_limit; +}; + +static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf); +} + +int init_kafka(int partition_, char *brokers_, char *topic_) +{ + char tmp[16]; + char errstr[512]; + partition = partition_; + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + //set logger :register log function + rd_kafka_conf_set_log_cb(conf, logger); + /* Quick termination */ + snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + /*topic configuration*/ + topic_conf = rd_kafka_topic_conf_new(); + + if (conf == NULL) + { + //fprintf(stderr, "***** Failed to create new conf *******\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** Failed to create new conf *******"); + return PRODUCER_INIT_FAILED; + } + kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); + if (kafka_producer == NULL) + { + /*fprintf(stderr, "***** kafka_producer is null *******\n"); + fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** kafka_producer is null, *******"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"*****Failed to create new producer: %s*******\n", errstr); + return PRODUCER_INIT_FAILED; + } + + rd_kafka_set_log_level(kafka_producer, LOG_DEBUG); + + /* Add brokers */ + if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0) + { + //fprintf(stderr, "****** No valid brokers specified********\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"****** No valid brokers specified********"); + return PRODUCER_INIT_FAILED; + } + /* Create topic */ + rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf); + + return PRODUCER_INIT_SUCCESS; +} + +void kafka_destroy() +{ + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(kafka_producer); +} + +int push_data_to_kafka(char *buffer, int buf_len) +{ + int ret; + if (buffer == NULL) + { + return 0; + } + ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + if (ret == -1) + { + /*fprintf(stderr, + "%% Failed to produce to topic %s " + "partition %i: %s\n", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(rd_kafka_last_error()));*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s partition %i: %s", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(rd_kafka_last_error())); + /* Poll to handle delivery reports */ + rd_kafka_poll(kafka_producer, 0); + } + /*fprintf(stderr, "%% Sent %zd bytes to topic " + "%s partition %i\n", + buf_len, rd_kafka_topic_name(rkt), partition);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i", + buf_len, rd_kafka_topic_name(rkt), partition); + rd_kafka_poll(kafka_producer, 0); + return PUSH_DATA_SUCCESS; +} + +unsigned char get_service_id(struct streaminfo *pstream) +{ + int ret; + int gdev_ip; + int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/ + unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ + unsigned char service_id; + + /* 获取vxlan_info结构体 */ + struct vxlan_info *vxlan; + int opt_val_len = sizeof(struct vxlan_info); + ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len); + if (ret > 0) + { + printf("[debug], encap_type:%s, entrance_id:%s, dev_id:%s, link_id:%s, link_dir:%s,inner_smac:%s, inner_dmac:%s,inner_smac_hex:%s, inner_dmac_hex:%s ", + vxlan->encap_type, entrance_id, vxlan->dev_id, vxlan->link_id, vxlan->link_dir, + vxlan->inner_smac, vxlan->inner_dmac, vxlan->inner_smac_hex, vxlan->inner_dmac_hex); + } + else + { + printf("[error], MESA_get_stream_opt get VXLAN_INFO error\n"); + } + + /* get ip */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip); + if (ret >= 0) + { + char tmp_ip_str[32]; + inet_ntop(AF_INET, &gdev_ip, tmp_ip_str, 32); + printf("[debug], get_rawpkt_options get gdev-ip:%s\n", tmp_ip_str); + } + else + { + printf("[error], get_rawpkt_options get gdev-ip error\n"); + } + /* get vxlan_id */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id); + if (ret >= 0) + { + printf("[debug], test_get_rawpkt_options get vlan-id:%d\n", vxlan_id); + + service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id)); + printf("service id from vxlan_id: %u\n", service_id); + } + else + { + printf("[error], test_get_rawpkt_options get vlan-id error\n"); + } + /* get vxlan_port */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport)); + + service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport)); + printf("service id from sport: %u\n", service_id); + } + else + { + printf("[error], test_get_rawpkt_options get sport error\n"); + } + + return service_id; +} + +unsigned short get_proto_type(struct streaminfo *pstream) +{ + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + return ntohs(tuple4_v4->dest); + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 */ + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + return ntohs(tuple4_v6->dest); + } + else + { + return 0; + } +} + +unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream) +{ + int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/ + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id); + unsigned char service_id; + if (ret >= 0) + { + service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id)); + return service_id; + } + else + { + return 0; + } +} + +unsigned char get_service_id_from_sport(struct streaminfo *pstream) +{ + int ret; + unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ + unsigned char service_id; + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + // printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport)); + service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport)); + // printf("service id from sport: %u\n", service_id); + return service_id; + } + else + { + // printf("[error], test_get_rawpkt_options get sport error\n"); + return 0; + } +} + +void get_vx_ip_header_src_ip(struct streaminfo *pstream, struct traffic_info *tinfo) +{ + int gdev_ip; + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip); + if (ret >= 0) + { + inet_ntop(AF_INET, &gdev_ip, tinfo->vx_ip_header_src_ip, INET_ADDRSTRLEN); + } + else + { + memset(tinfo->vx_ip_header_src_ip, 0, INET_ADDRSTRLEN); + } +} + +unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream) +{ + unsigned short vxlan_sport; + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + return ntohs(vxlan_sport); + } + else + { + return 0; + } +} + +/* 获取pstream中的四元组 */ +void get_tuple4(struct streaminfo *pstream, unsigned char service_id) +{ + // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + /* ipv4 */ + char sip[INET_ADDRSTRLEN]; + char dip[INET_ADDRSTRLEN]; + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + inet_ntop(AF_INET, &(tuple4_v4->saddr), sip, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(tuple4_v4->daddr), dip, INET_ADDRSTRLEN); + // printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), dip, ntohs(tuple4_v4->dest)); + + char info[MAX_LOG_INFO_LEN] = {0}; + if (service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v4->source), + dip, ntohs(tuple4_v4->dest), service_id); + printf(info); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), + dip, ntohs(tuple4_v4->dest)); + } + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 */ + char sip[INET6_ADDRSTRLEN]; + char dip[INET6_ADDRSTRLEN]; + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + inet_ntop(AF_INET6, &(tuple4_v6->saddr), sip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN); + printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest)); + + /* 获取业务号 */ + // unsigned char service_id = get_service_id(pstream); + + char info[MAX_LOG_INFO_LEN] = {0}; + if (service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v6->source), + dip, ntohs(tuple4_v6->dest), service_id); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), + dip, ntohs(tuple4_v6->dest)); + } + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); + } + else if (pstream->addr.addrtype == ADDR_TYPE_ARP) + { + } +} + +void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt) +{ + // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + /* ipv4 src_ip dst_ip identification fragment_offset*/ + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + inet_ntop(AF_INET, &(tuple4_v4->saddr), tinfo->ipv4_sip, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(tuple4_v4->daddr), tinfo->ipv4_dip, INET_ADDRSTRLEN); + struct ip *ip_hdr = (struct ip *)rawpkt; + tinfo->ipv4_id = ntohs(ip_hdr->ip_id); + tinfo->ipv4_off = ntohs(ip_hdr->ip_off); + + char info[MAX_LOG_INFO_LEN] = {0}; + if (tinfo->service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source), + tinfo->ipv4_dip, ntohs(tuple4_v4->dest), tinfo->service_id); + // printf(info); + // push_data_to_kafka(info,sizeof(info)); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source), + tinfo->ipv4_dip, ntohs(tuple4_v4->dest)); + } + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/ + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + inet_ntop(AF_INET6, &(tuple4_v6->saddr), tinfo->ipv6_sip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &(tuple4_v6->daddr), tinfo->ipv6_dip, INET6_ADDRSTRLEN); + struct ip6_hdr *ip6_head = (struct ip6_hdr *)rawpkt; + tinfo->ipv6_bus_type = ntohl(ip6_head->ip6_flow) & 0x0FF00000; + tinfo->ipv6_flow_flag = ntohl(ip6_head->ip6_flow) & 0x000FFFFF; + tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen); + tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt; + tinfo->ipv6_limit = ip6_head->ip6_hlim; + + char info[MAX_LOG_INFO_LEN] = {0}; + if (tinfo->service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source), + tinfo->ipv6_dip, ntohs(tuple4_v6->dest), tinfo->service_id); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source), + tinfo->ipv6_dip, ntohs(tuple4_v6->dest)); + } + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); + } + else if (pstream->addr.addrtype == ADDR_TYPE_ARP) + { + return; + } +} + +void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) +{ + char protocol[8]; + switch (tinfo->protocol) + { + case 1: + sprintf(protocol, "%s", "IPv4_TCP"); + break; + case 2: + sprintf(protocol, "%s", "IPv4_UDP"); + break; + case 3: + sprintf(protocol, "%s", "IPv6_TCP"); + break; + case 4: + sprintf(protocol, "%s", "IPv6_UDP"); + break; + default: + sprintf(protocol, "%s", "others"); + break; + } + /* + printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ", + tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes); + printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ", + tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip); + printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ", + tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id); + printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ", + tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off); + printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ", + tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit); + */ + char info[MAX_TRAFFIC_INFO_LEN] = {0}; + + switch (tinfo->addr.addrtype) + { + case ADDR_TYPE_IPV4: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," + "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%d," + "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}", + tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id, + tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,flow_type); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + push_data_to_kafka(info,strlen(info)); + break; + case ADDR_TYPE_IPV6: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," + "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," + "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d," + "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d}", + tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id, + tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length, + tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + push_data_to_kafka(info,strlen(info)); + break; + case ADDR_TYPE_ARP: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," + "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," + "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}", + tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,flow_type); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + push_data_to_kafka(info,strlen(info)); + break; + default: + break; + } +} + +char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + printf("TCP_ENTRY_ALL SUCCESS!!!\n"); + return APP_STATE_GIVEME; +} + +static int tcp_flow_id = -1; +char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail; + struct traffic_info *tinfo; + + if (-1 == tcp_flow_id) + { + tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); + if (-1 == tcp_flow_id) + { + printf("'tcp_flow_stat' is disable, no statistics\n"); + } + } + + if (pstream->opstate == OP_STATE_PENDING) + { + tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 + tinfo->service_id = get_service_id_from_vxlanid(pstream); + /* PROTOCOL */ + switch (pstream->addr.addrtype) + { + case ADDR_TYPE_IPV4: + tinfo->protocol = 1; // IPV4_TCP + tinfo->vx_type = 0x0800; + break; + case ADDR_TYPE_IPV6: + tinfo->protocol = 3; // IPV6_TCP + tinfo->vx_type = 0x86DD; + break; + case ADDR_TYPE_ARP: + tinfo->protocol = 0; + tinfo->vx_type = 0x0806; + break; + default: + tinfo->protocol = 0; + tinfo->vx_type = 0; + break; + } + /* vx_UDP_header_src_port dst_port*/ + tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); + tinfo->vx_UDP_header_dst_port = 4789; + /* vx_ip_header_src_ip dst_ip*/ + get_vx_ip_header_src_ip(pstream, tinfo); + /* IPv4、IPv6头部信息 */ + get_ip_detail(pstream, tinfo, raw_pkt); + /* 应用层协议类型 用目的端口表示 */ + tinfo->PROTO_TYPE = get_proto_type(pstream); + + *pme = tinfo; + } + tinfo = (struct traffic_info *)(*pme); + /* 自己统计包数字节数 *//* + if(raw_pdetail->datalen > 0) + { + if(DIR_C2S == pstream->curdir) + { + tinfo->C2S_pkt_num++; + tinfo->C2S_bytes += raw_pdetail->datalen; + } + else + { + tinfo->S2C_pkt_num++; + tinfo->S2C_bytes += raw_pdetail->datalen; + } + } + */ + if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0) + { + //printf("TCP_ENTRY SUCCESS!!!\n"); + /* 获取包数字节数 */ + tinfo->C2S_pkt_num = raw_pdetail->serverpktnum; + tinfo->S2C_pkt_num = raw_pdetail->clientpktnum; + tinfo->C2S_bytes = raw_pdetail->serverbytes; + tinfo->S2C_bytes = raw_pdetail->clientbytes; + + /* 另一种获取包数字节数的方法 */ + /* + if(tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt; + tinfo->C2S_bytes = tflow->C2S_all_byte; + tinfo->S2C_bytes = tflow->S2C_all_byte; + } + */ + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo->stat_time,NULL); + + print_traffic_info(tinfo, pstream); + free(tinfo); + // printf("\n"); + } + return APP_STATE_GIVEME; +} + +static int udp_flow_id = -1; +char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail; + struct traffic_info *tinfo; + + if (-1 == udp_flow_id) + { + udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct"); + if (-1 == udp_flow_id) + { + printf("'udp_flow_stat' is disable, no statistics\n"); + } + } + + if (pstream->opstate == OP_STATE_PENDING) + { + tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 + tinfo->service_id = get_service_id_from_vxlanid(pstream); + /* PROTOCOL */ + switch (pstream->addr.addrtype) + { + case ADDR_TYPE_IPV4: + tinfo->protocol = 2; // IPV4_UDP + tinfo->vx_type = 0x0800; + break; + case ADDR_TYPE_IPV6: + tinfo->protocol = 4; // IPV6_UDP + tinfo->vx_type = 0x86DD; + break; + case ADDR_TYPE_ARP: + tinfo->protocol = 0; + tinfo->vx_type = 0x0806; + break; + default: + tinfo->protocol = 0; + tinfo->vx_type = 0; + break; + } + /* vx_UDP_header_src_port dst_port*/ + tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); + tinfo->vx_UDP_header_dst_port = 4789; + /* vx_ip_header_src_ip dst_ip*/ + get_vx_ip_header_src_ip(pstream, tinfo); + /* IPv4、IPv6头部信息 */ + get_ip_detail(pstream, tinfo, raw_pkt); + /* 应用层协议类型 用目的端口表示 */ + tinfo->PROTO_TYPE = get_proto_type(pstream); + + *pme = tinfo; + } + tinfo = (struct traffic_info *)(*pme); + + if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0) + { + //printf("UDP_ENTRY SUCCESS!!!\n"); + + if (pdetail != NULL) + { + /* */ + tinfo->C2S_pkt_num = pdetail->serverpktnum; + tinfo->S2C_pkt_num = pdetail->clientpktnum; + tinfo->C2S_bytes = pdetail->serverbytes; + tinfo->S2C_bytes = pdetail->clientbytes; + /* + if(udp_flow_id != -1) + { + struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_pkt; + tinfo->S2C_pkt_num = tflow->S2C_pkt; + tinfo->C2S_bytes = tflow->C2S_byte; + tinfo->S2C_bytes = tflow->S2C_byte; + }*/ + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo->stat_time,NULL); + + print_traffic_info(tinfo, pstream); + free(tinfo); + // printf("\n"); + } + return APP_STATE_GIVEME; +} + +int CHAR_INIT() +{ + int demo_plugid = 51; + runtime_log_handler = NULL; + kafka_log_handler = NULL; + runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, RLOG_LV_INFO); + kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, RLOG_LV_INFO); + if (runtime_log_handler == NULL || kafka_log_handler == NULL) + { + /* code */ + printf("MESA_create_runtime_log_handle failed!!!"); + return -1; + } + /* ENTRANCE_ID */ + MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0); + /* FLOW_TYPE */ + MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, 0); + /* vx_ip_header_dst_ip */ + MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", vx_ip_header_dst_ip, INET_ADDRSTRLEN, "0.0.0.0"); + /* kafka初始化 */ + if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) + { + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); + return -1; + } + + // 函数实现自定义 + // 只要求函数返回值为插件ID; + //printf("INIT SUCCESS!!!\n"); + return demo_plugid; +} + +void LRJ_APP_DESTROY() +{ + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n"); + printf("TEST_APP_DESTORY in...\n"); + kafka_destroy(); + if (runtime_log_handler == NULL) + { + printf("TEST_APP_DESTORY out...\n"); + return; + } + MESA_destroy_runtime_log_handle(runtime_log_handler); + MESA_destroy_runtime_log_handle(kafka_log_handler); + printf("TEST_APP_DESTORY out...\n"); +} |
