diff options
| author | 李仁杰 <[email protected]> | 2020-08-03 14:30:36 +0800 |
|---|---|---|
| committer | 李仁杰 <[email protected]> | 2020-08-03 14:30:36 +0800 |
| commit | 6891f64d9cb4e9e787d189ebd2e9cf7ccf7cc37d (patch) | |
| tree | bcfcab41b07342e6d81ea2f68e67deed77b06cdf | |
| parent | 08accd17495443fbc6b9e7cd14b19cbb4091160f (diff) | |
Add new file at 20200803,程序有内存错误,有待解决
| -rw-r--r-- | lirenjie_vxlan_sapp_20200803.c | 1257 |
1 files changed, 1257 insertions, 0 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c new file mode 100644 index 0000000..cae1ff1 --- /dev/null +++ b/lirenjie_vxlan_sapp_20200803.c @@ -0,0 +1,1257 @@ +#include <stdlib.h> +#include <stdio.h> +#include <netinet/ip.h> +#include <netinet/tcp.h> +#include <netinet/in.h> +#include <netinet/udp.h> +#include <assert.h> +#include <sys/time.h> +#include <time.h> +#include <arpa/inet.h> +#include <syslog.h> +#include <signal.h> +#include <netinet/ip.h> +#include <netinet/ip6.h> +#include <netinet/udp.h> +#include <pthread.h> +#include "MESA_handle_logger.h" +#include "MESA_prof_load.h" +#include "stream.h" +#include "rdkafka.h" +#include "libGSJ.h" + +int version_20190827_1605; + +static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT); + +static int error_coredump = 0; + +extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order); +extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order); +extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action)); +extern int g_business_plug_type; + +#define MAX_LOG_INFO_LEN 256 +#define MAX_VPN_ID_NUM 512 +#define MAX_TRAFFIC_INFO_LEN 1024 +static const char *module_name = "lirenjie_vxlan"; +static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; +static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; +static const char *gdev_conf_path = "./conf/gdev.conf"; +static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf"; +static void *runtime_log_handler; +static void *kafka_log_handler; +//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip +static char sendto_gdev_ip[INET_ADDRSTRLEN]; //conf/gdev.conf sendto_gdev_ip +static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */ +static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop */ + +#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回�? */ + +static unsigned int entrance_id; /* 局点ID 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�?*/ + +UINT64 all_stream_pkt_num[64] = {0}; +UINT64 close_stream_pkt_num[64] = {0}; +UINT64 push_count[64] = {0}; +UINT64 error_count[64] = {0}; + +enum flow_type_t +{ + FLOW_TYPE_REFLUX = 0, /* 回流 */ + FLOW_TYPE_INJECT = 1, /* 回注 */ +}; +static unsigned int flow_type; /* 回流0/回注1 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�? */ + +/* kafka */ +static const int PRODUCER_INIT_FAILED = -1; +static const int PRODUCER_INIT_SUCCESS = 0; +static const int PUSH_DATA_FAILED = -1; +static const int PUSH_DATA_SUCCESS = 0; +static int partition; +//rd +static rd_kafka_t *kafka_producer; +static rd_kafka_conf_t *conf; +// topic +static rd_kafka_topic_t *rkt; +static rd_kafka_topic_conf_t *topic_conf; +// char errstr[512]={0}; +//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092"; +//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092"; +// char brokers[128]; + +static char *topic = "G_BACK_TRAFFIC_STATISTIC_new"; + +//GSJ thread +pthread_t GSJ_Work; +pthread_t count; + +struct traffic_info +{ + unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0 + unsigned short PROTO_TYPE; //应用层协议类型,用目的端口来表示 + UINT32 C2S_pkt_num; /* C2S, you should better use stream_project.h : struct udp_flow_stat */ + UINT32 S2C_pkt_num; /* S2C, you should better use stream_project.h : struct udp_flow_stat */ + UINT64 C2S_bytes; /* C2S, you should better use stream_project.h : struct udp_flow_stat */ + UINT64 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */ + //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday + // time_t stat_time; // 秒级 + struct timeval stat_time; + unsigned int vx_ip_header_src_ip_net; + unsigned int vx_ip_header_dst_ip_net; + unsigned short vx_UDP_header_src_port; + unsigned short vx_UDP_header_dst_port; + struct layer_addr addr; + //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */ + int vxlan_vpn_id; + unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806 + /* ipv4 src_ip dst_ip identification fragment_offset*/ + char ipv4_sip[INET_ADDRSTRLEN]; + char ipv4_dip[INET_ADDRSTRLEN]; + unsigned short ipv4_id; + unsigned short ipv4_off; + /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/ + char ipv6_sip[INET6_ADDRSTRLEN]; + char ipv6_dip[INET6_ADDRSTRLEN]; + unsigned char ipv6_bus_type; + unsigned int ipv6_flow_flag; + unsigned short ipv6_load_length; + unsigned char ipv6_next_msg_head; + unsigned char ipv6_limit; +}; + +static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf) +{ + struct timeval tv; + gettimeofday(&tv, NULL); + /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s", + (int)tv.tv_sec, (int)(tv.tv_usec / 1000), + level, fac, rk ? rd_kafka_name(rk) : NULL, buf); +} + +static int init_kafka(int partition_, char *brokers_, char *topic_) +{ + char tmp[16]; + char errstr[1024]; + partition = partition_; + /* Kafka configuration */ + conf = rd_kafka_conf_new(); + //set logger :register log function + rd_kafka_conf_set_log_cb(conf, logger); + /* Quick termination */ + snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); + /*topic configuration*/ + topic_conf = rd_kafka_topic_conf_new(); + + if (conf == NULL) + { + //fprintf(stderr, "***** Failed to create new conf *******\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******"); + return PRODUCER_INIT_FAILED; + } + kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); + if (kafka_producer == NULL) + { + /*fprintf(stderr, "***** kafka_producer is null *******\n"); + fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** kafka_producer is null, *******"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "*****Failed to create new producer: %s*******\n", errstr); + return PRODUCER_INIT_FAILED; + } + + rd_kafka_set_log_level(kafka_producer, LOG_DEBUG); + + /* Add brokers */ + if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0) + { + //fprintf(stderr, "****** No valid brokers specified********\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "****** No valid brokers specified********"); + return PRODUCER_INIT_FAILED; + } + /* Create topic */ + rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf); + + return PRODUCER_INIT_SUCCESS; +} + +static void kafka_destroy() +{ + rd_kafka_topic_destroy(rkt); + rd_kafka_destroy(kafka_producer); +} + +static int push_data_to_kafka(char *buffer, int buf_len) +{ + int ret; + if (buffer == NULL) + { + return 0; + } + //ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + if (ret == -1) + { + /*fprintf(stderr, + "%% Failed to produce to topic %s " + "partition %i: %s\n", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(rd_kafka_last_error()));*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "%% Failed to produce to topic %s partition %i: %s", + rd_kafka_topic_name(rkt), partition, + rd_kafka_err2str(rd_kafka_last_error())); + /* Poll to handle delivery reports */ + //rd_kafka_poll(kafka_producer, 0); + return PUSH_DATA_FAILED; + } + /*fprintf(stderr, "%% Sent %zd bytes to topic " + "%s partition %i\n", + buf_len, rd_kafka_topic_name(rkt), partition);*/ + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i", + buf_len, rd_kafka_topic_name(rkt), partition); + // rd_kafka_poll(kafka_producer, 0); + return PUSH_DATA_SUCCESS; +} + +static void push_data_to_GSJ(char *buffer, int buf_len) +{ + GoString value = {(const char *)buffer, buf_len}; + GetInfo(value); + // Work(); +} + +#if 0 +unsigned char get_service_id(struct streaminfo *pstream) +{ + int ret; + int gdev_ip; + int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/ + unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ + unsigned char service_id; + + /* 获取vxlan_info结构体*/ + struct vxlan_info *vxlan; + int opt_val_len = sizeof(struct vxlan_info); + ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len); + if (ret > 0) + { + printf("[debug], encap_type:%s, entrance_id:%s, dev_id:%s, link_id:%s, link_dir:%s,inner_smac:%s, inner_dmac:%s,inner_smac_hex:%s, inner_dmac_hex:%s ", + vxlan->encap_type, entrance_id, vxlan->dev_id, vxlan->link_id, vxlan->link_dir, + vxlan->inner_smac, vxlan->inner_dmac, vxlan->inner_smac_hex, vxlan->inner_dmac_hex); + } + else + { + printf("[error], MESA_get_stream_opt get VXLAN_INFO error\n"); + } + + /* get ip */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip); + if (ret >= 0) + { + char tmp_ip_str[32]; + inet_ntop(AF_INET, &gdev_ip, tmp_ip_str, 32); + printf("[debug], get_rawpkt_options get gdev-ip:%s\n", tmp_ip_str); + } + else + { + printf("[error], get_rawpkt_options get gdev-ip error\n"); + } + /* get vxlan_id */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id); + if (ret >= 0) + { + printf("[debug], test_get_rawpkt_options get vlan-id:%d\n", vxlan_id); + + service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id)); + printf("service id from vxlan_id: %u\n", service_id); + } + else + { + printf("[error], test_get_rawpkt_options get vlan-id error\n"); + } + /* get vxlan_port */ + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport)); + + service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport)); + printf("service id from sport: %u\n", service_id); + } + else + { + printf("[error], test_get_rawpkt_options get sport error\n"); + } + + return service_id; +} +#endif + +static unsigned short get_proto_type(struct streaminfo *pstream) +{ + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + return ntohs(tuple4_v4->dest); + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 */ + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + return ntohs(tuple4_v6->dest); + } + else + { + return 0; + } +} + +static int get_vpnid_from_stream(struct streaminfo *pstream) +{ + int vpn_id_net_order; + int ret; + + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_VPNID, &vpn_id_net_order); + if (ret >= 0) + { + return ntohl(vpn_id_net_order); + } + else + { + return 0; + } +} + +#if 0 +static unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream) +{ + int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/ + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id); + unsigned char service_id; + if (ret >= 0) + { + service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id)); + return service_id; + } + else + { + return 0; + } +} + + +static unsigned char get_service_id_from_sport(struct streaminfo *pstream) +{ + int ret; + unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ + unsigned char service_id; + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + // printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport)); + service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport)); + // printf("service id from sport: %u\n", service_id); + return service_id; + } + else + { + // printf("[error], test_get_rawpkt_options get sport error\n"); + return 0; + } +} +#endif + +#if 0 +static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net) +{ + if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) == + (ntohl(local_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK)){ + return 1; + } + + return 0; +} +#endif + +static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo) +{ + int gdev_ip_net = 0, local_dev_ip = 0; + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net); + if (ret >= 0) + { +#if 0 + /* 回流/回注流量已经区分,接入不同的机器 靠配置文件指定*/ + if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){ + flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网�? 说明是回注包 */ + }else{ + flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网�? 说明是回流包 */ + } + + if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */ + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, "vxlan src and dst ip is equal!"); + ret = -1; + } +#endif + + tinfo->vx_ip_header_src_ip_net = gdev_ip_net; + ret = 0; + } + else + { + tinfo->vx_ip_header_src_ip_net = 0; + tinfo->vx_ip_header_dst_ip_net = 0; + ret = -1; + if (error_coredump) + { + assert(0); + } + } + + ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_LOCAL_IP, &local_dev_ip); + if (ret >= 0) + { + tinfo->vx_ip_header_dst_ip_net = local_dev_ip; + ret = 0; + } + else + { + tinfo->vx_ip_header_src_ip_net = 0; + tinfo->vx_ip_header_dst_ip_net = 0; + ret = -1; + if (error_coredump) + { + assert(0); + } + } + + return ret; +} + +static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream) +{ + unsigned short vxlan_sport; + int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport); + if (ret >= 0) + { + return ntohs(vxlan_sport); + } + else + { + return 0; + } +} + +/* 获取pstream中的四元组 */ +#if 0 +static void get_tuple4(struct streaminfo *pstream, unsigned char service_id) +{ + // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + /* ipv4 */ + char sip[INET_ADDRSTRLEN]; + char dip[INET_ADDRSTRLEN]; + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + inet_ntop(AF_INET, &(tuple4_v4->saddr), sip, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(tuple4_v4->daddr), dip, INET_ADDRSTRLEN); + // printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), dip, ntohs(tuple4_v4->dest)); + + char info[MAX_LOG_INFO_LEN] = {0}; + if (service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v4->source), + dip, ntohs(tuple4_v4->dest), service_id); + printf(info); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), + dip, ntohs(tuple4_v4->dest)); + } + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 */ + char sip[INET6_ADDRSTRLEN]; + char dip[INET6_ADDRSTRLEN]; + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + inet_ntop(AF_INET6, &(tuple4_v6->saddr), sip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN); + printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest)); + + /* 获取业务号 */ + // unsigned char service_id = get_service_id(pstream); + + char info[MAX_LOG_INFO_LEN] = {0}; + if (service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v6->source), + dip, ntohs(tuple4_v6->dest), service_id); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), + dip, ntohs(tuple4_v6->dest)); + } + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); + } + else if (pstream->addr.addrtype == ADDR_TYPE_ARP) + { + } +} +#endif + +static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt) +{ + // printf("%s\n", addr_type_to_string((pstream->addr).addrtype)); + if (pstream->addr.addrtype == ADDR_TYPE_IPV4) + { + /* ipv4 src_ip dst_ip identification fragment_offset*/ + struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr); + inet_ntop(AF_INET, &(tuple4_v4->saddr), tinfo->ipv4_sip, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &(tuple4_v4->daddr), tinfo->ipv4_dip, INET_ADDRSTRLEN); + struct ip *ip_hdr = (struct ip *)rawpkt; + tinfo->ipv4_id = ntohs(ip_hdr->ip_id); + tinfo->ipv4_off = ntohs(ip_hdr->ip_off); +#if 0 + char info[MAX_LOG_INFO_LEN] = {0}; + if (tinfo->service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source), + tinfo->ipv4_dip, ntohs(tuple4_v4->dest), tinfo->service_id); + // printf(info); + // push_data_to_kafka(info,sizeof(info)); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source), + tinfo->ipv4_dip, ntohs(tuple4_v4->dest)); + } + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); +#endif + } + else if (pstream->addr.addrtype == ADDR_TYPE_IPV6) + { + /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/ + struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr); + inet_ntop(AF_INET6, &(tuple4_v6->saddr), tinfo->ipv6_sip, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, &(tuple4_v6->daddr), tinfo->ipv6_dip, INET6_ADDRSTRLEN); + struct ip6_hdr *ip6_head = (struct ip6_hdr *)rawpkt; + tinfo->ipv6_bus_type = ntohl(ip6_head->ip6_flow) & 0x0FF00000; + tinfo->ipv6_flow_flag = ntohl(ip6_head->ip6_flow) & 0x000FFFFF; + tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen); + tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt; + tinfo->ipv6_limit = ip6_head->ip6_hlim; +#if 0 + + char info[MAX_LOG_INFO_LEN] = {0}; + if (tinfo->service_id > 0) + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source), + tinfo->ipv6_dip, ntohs(tuple4_v6->dest), tinfo->service_id); + } + else + { + snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source), + tinfo->ipv6_dip, ntohs(tuple4_v6->dest)); + } + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,sizeof(info)); +#endif + } + else if (pstream->addr.addrtype == ADDR_TYPE_ARP) + { + return; + } +} + +static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream) +{ + char protocol[8]; + char vxlan_sip_ip_str[INET_ADDRSTRLEN]; + char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + + switch (tinfo->protocol) + { + case 1: + sprintf(protocol, "%s", "IPv4_TCP"); + break; + case 2: + sprintf(protocol, "%s", "IPv4_UDP"); + break; + case 3: + sprintf(protocol, "%s", "IPv6_TCP"); + break; + case 4: + sprintf(protocol, "%s", "IPv6_UDP"); + break; + default: + sprintf(protocol, "%s", "others"); + break; + } + /* + printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ", + tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes); + printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ", + tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip); + printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ", + tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id); + printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ", + tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off); + printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ", + tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit); + */ + char info[MAX_TRAFFIC_INFO_LEN] = {0}; + + inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); + + switch (tinfo->addr.addrtype) + { + case ADDR_TYPE_IPV4: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%llu,\"s2c_byte_len\":%llu," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," + "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u," + "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, + tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off, + //ipv6 stat is NULL, + flow_type, sendto_gdev_ip); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,strlen(info)); + push_data_to_GSJ(info, strlen(info)); + break; + case ADDR_TYPE_IPV6: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%llu,\"s2c_byte_len\":%llu," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," + "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," + "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d," + "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, + tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length, + tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,strlen(info)); + push_data_to_GSJ(info, strlen(info)); + break; + case ADDR_TYPE_ARP: + snprintf(info, MAX_TRAFFIC_INFO_LEN, + "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%llu,\"s2c_byte_len\":%llu," + "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\"," + "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," + "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," + "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", + tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, + tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, + tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, + flow_type, sendto_gdev_ip); + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); + // push_data_to_kafka(info,strlen(info)); + push_data_to_GSJ(info, strlen(info)); + break; + default: + break; + } +} + +char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + //printf("TCP_ENTRY_ALL SUCCESS!!!\n"); + return DEFAULT_RETURN_VALUE; +} + +static int tcp_flow_id = -1; +char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail; + struct traffic_info *tinfo; + time_t *time_old; + struct traffic_info *tinfo_new; + + all_stream_pkt_num[thread_seq]++; + + if (-1 == tcp_flow_id) + { + tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); + if (-1 == tcp_flow_id) + { + // printf("'tcp_flow_stat' is disable, no statistics\n"); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n"); + } + } + + if (pstream->opstate == OP_STATE_PENDING) + { + pme = (void **)calloc(3, sizeof(void *)); + time_old = (time_t *)calloc(1, sizeof(time_t)); + tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + + tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); + tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); + /* PROTOCOL */ + switch (pstream->addr.addrtype) + { + case ADDR_TYPE_IPV4: + tinfo->protocol = 1; // IPV4_TCP + tinfo->vx_type = 0x0800; + break; + case ADDR_TYPE_IPV6: + tinfo->protocol = 3; // IPV6_TCP + tinfo->vx_type = 0x86DD; + break; + case ADDR_TYPE_ARP: + tinfo->protocol = 0; + tinfo->vx_type = 0x0806; + break; + default: + tinfo->protocol = 0; + tinfo->vx_type = 0; + break; + } + /* vx_UDP_header_src_port dst_port*/ + tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); + tinfo->vx_UDP_header_dst_port = 4789; + /* vx_ip_header_src_ip dst_ip*/ + if (get_vxlan_ip_addr(pstream, tinfo) < 0) + { + goto error_drop; + } + /* IPv4、IPv6头部信息 */ + get_ip_detail(pstream, tinfo, raw_pkt); + /* 应用层协议类型 用目的端口表示*/ + tinfo->PROTO_TYPE = get_proto_type(pstream); + + *tinfo_new = *tinfo; + + pme[0] = tinfo; + pme[1] = tinfo_new; + time(time_old); + pme[2] = time_old; + } + tinfo = (struct traffic_info *)(pme[0]); + tinfo_new = (struct traffic_info *)(pme[1]); + time_old = (time_t *)(pme[2]); + /* 自己统计包数字节数*/ /* + if(raw_pdetail->datalen > 0) + { + if(DIR_C2S == pstream->curdir) + { + tinfo->C2S_pkt_num++; + tinfo->C2S_bytes += raw_pdetail->datalen; + } + else + { + tinfo->S2C_pkt_num++; + tinfo->S2C_bytes += raw_pdetail->datalen; + } + } + */ + + /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ + // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) + if (pstream->opstate == OP_STATE_CLOSE) + { + //printf("TCP_ENTRY SUCCESS!!!\n"); + /* 获取包数字节数*/ + // tinfo->C2S_pkt_num = raw_pdetail->serverpktnum; + // tinfo->S2C_pkt_num = raw_pdetail->clientpktnum; + // tinfo->C2S_bytes = raw_pdetail->serverbytes; + // tinfo->S2C_bytes = raw_pdetail->clientbytes; + // if(tinfo->C2S_bytes < 0) + // { + // tinfo->C2S_bytes = 0; + // } + // if(tinfo->S2C_bytes < 0) + // { + // tinfo->S2C_bytes = 0; + // } + /* 另一种获取包数字节数的方式*/ + + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes; + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } + + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; + push_count[thread_seq]++; + + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo->stat_time, NULL); + + print_traffic_info(tinfo, pstream); + free(tinfo); + free(tinfo_new); + free(time_old); + free(pme); + // printf("\n"); + return DEFAULT_RETURN_VALUE; + } + + /*每隔N秒发一次 */ + if (pstream->opstate == OP_STATE_DATA) + { + time_t *time_new; + time_new = (time_t *)calloc(1, sizeof(time_t)); + time(time_new); + if (*time_new - *time_old >= 120) + { + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt - tinfo_new->C2S_pkt_num; + tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt - tinfo_new->S2C_pkt_num; + tinfo_new->C2S_bytes = tflow->C2S_all_byte - tinfo_new->C2S_bytes; + tinfo_new->S2C_bytes = tflow->S2C_all_byte - tinfo_new->S2C_bytes; + } + else + { + tinfo_new->C2S_pkt_num = 0; + tinfo_new->S2C_pkt_num = 0; + tinfo_new->C2S_bytes = 0; + tinfo_new->S2C_bytes = 0; + } + /* layer_addr */ + tinfo_new->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo_new->stat_time, NULL); + + print_traffic_info(tinfo_new, pstream); + + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo_new->C2S_pkt_num = tflow->C2S_all_pkt; + tinfo_new->S2C_pkt_num = tflow->S2C_all_pkt; + tinfo_new->C2S_bytes = tflow->C2S_all_byte; + tinfo_new->S2C_bytes = tflow->S2C_all_byte; + } + + free(time_new); + + time(time_old); + } + } + + return DEFAULT_RETURN_VALUE; + +error_drop: + error_count[thread_seq]++; + free(tinfo); + return APP_STATE_DROPME | APP_STATE_DROPPKT; +} + +/* + add by lijia 20190604. +*/ +static inline int is_gdev_keepalive_pkt(const struct ip *iphdr) +{ + const struct udphdr *udh; + + if (NULL == iphdr) + { + return 0; + } + + if (iphdr->ip_p != 17) + { + return 0; + } + + udh = (struct udphdr *)((char *)iphdr + iphdr->ip_hl * 4); + + if (udh->dest == ntohs(3784)) + { + return 1; + } + + return 0; +} + +static int udp_flow_id = -1; +char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) +{ + struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail; + struct traffic_info *tinfo; + time_t *time_old; + struct traffic_info *tinfo_new; + + all_stream_pkt_num[thread_seq]++; + + if (is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0) + { //add by lijia 20190604, drop BFD keepalive packet. + return APP_STATE_DROPME; + } + + if (-1 == udp_flow_id) + { + udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct"); + if (-1 == udp_flow_id) + { + // printf("'udp_flow_stat' is disable, no statistics\n"); + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n"); + } + } + + if (pstream->opstate == OP_STATE_PENDING) + { + pme = (void **)calloc(3, sizeof(void *)); + time_old = (time_t *)calloc(1, sizeof(time_t)); + tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + + tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); + tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); + + /* PROTOCOL */ + switch (pstream->addr.addrtype) + { + case ADDR_TYPE_IPV4: + tinfo->protocol = 2; // IPV4_UDP + tinfo->vx_type = 0x0800; + break; + case ADDR_TYPE_IPV6: + tinfo->protocol = 4; // IPV6_UDP + tinfo->vx_type = 0x86DD; + break; + case ADDR_TYPE_ARP: + tinfo->protocol = 0; + tinfo->vx_type = 0x0806; + break; + default: + tinfo->protocol = 0; + tinfo->vx_type = 0; + break; + } + /* vx_UDP_header_src_port dst_port*/ + tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream); + tinfo->vx_UDP_header_dst_port = 4789; + /* vx_ip_header_src_ip dst_ip*/ + if (get_vxlan_ip_addr(pstream, tinfo) < 0) + { + goto error_drop; + } + /* IPv4、IPv6头部信息 */ + get_ip_detail(pstream, tinfo, raw_pkt); + /* 应用层协议类型 用目的端口表示*/ + tinfo->PROTO_TYPE = get_proto_type(pstream); + + *tinfo_new = *tinfo; + + pme[0] = tinfo; + pme[1] = tinfo_new; + time(time_old); + pme[2] = time_old; + } + tinfo = (struct traffic_info *)(pme[0]); + tinfo_new = (struct traffic_info *)(pme[1]); + time_old = (time_t *)(pme[2]); + + /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ + // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) + if (pstream->opstate == OP_STATE_CLOSE) + { + //printf("UDP_ENTRY SUCCESS!!!\n"); + + if (pdetail != NULL) + { + /* 获取包数字节数 */ + // tinfo->C2S_pkt_num = pdetail->serverpktnum; + // tinfo->S2C_pkt_num = pdetail->clientpktnum; + // tinfo->C2S_bytes = pdetail->serverbytes; + // tinfo->S2C_bytes = pdetail->clientbytes; + // if (tinfo->C2S_bytes < 0) + // { + // tinfo->C2S_bytes = 0; + // } + // if (tinfo->S2C_bytes < 0) + // { + // tinfo->S2C_bytes = 0; + // } + + if (udp_flow_id != -1) + { + struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes; + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } + + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; + push_count[thread_seq]++; + + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo->stat_time, NULL); + + print_traffic_info(tinfo, pstream); + free(tinfo); + free(tinfo_new); + free(time_old); + free(pme); + // printf("\n"); + return DEFAULT_RETURN_VALUE; + } + + /*每隔N秒发一次 */ + if (pstream->opstate == OP_STATE_DATA) + { + time_t *time_new; + time_new = (time_t *)calloc(1, sizeof(time_t)); + time(time_new); + if (*time_new - *time_old >= 120) + { + if (udp_flow_id != -1) + { + struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); + tinfo_new->C2S_pkt_num = tflow->C2S_pkt - tinfo_new->C2S_pkt_num; + tinfo_new->S2C_pkt_num = tflow->S2C_pkt - tinfo_new->S2C_pkt_num; + tinfo_new->C2S_bytes = tflow->C2S_byte - tinfo_new->C2S_bytes; + tinfo_new->S2C_bytes = tflow->S2C_byte - tinfo_new->S2C_bytes; + } + else + { + tinfo_new->C2S_pkt_num = 0; + tinfo_new->S2C_pkt_num = 0; + tinfo_new->C2S_bytes = 0; + tinfo_new->S2C_bytes = 0; + } + /* layer_addr */ + tinfo_new->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo_new->stat_time, NULL); + + print_traffic_info(tinfo_new, pstream); + if (udp_flow_id != -1) + { + struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); + tinfo_new->C2S_pkt_num = tflow->C2S_pkt; + tinfo_new->S2C_pkt_num = tflow->S2C_pkt; + tinfo_new->C2S_bytes = tflow->C2S_byte; + tinfo_new->S2C_bytes = tflow->S2C_byte; + } + + free(time_new); + + time(time_old); + } + } + + return DEFAULT_RETURN_VALUE; + +error_drop: + error_count[thread_seq]++; + free(tinfo); + return APP_STATE_DROPME | APP_STATE_DROPPKT; +} + +static char return_action_cb_fun(int net_conn_mode, char plug_action) +{ + return 0; //所有包默认都DROP +} + +void count_work() +{ + while (1) + { + sleep(60); + int i = 0; + UINT64 all_pkt = 0; + UINT64 close_pkt = 0; + UINT64 all_push_count = 0; + UINT64 all_error_count = 0; + for (i; i < 32; i++) + { + all_pkt += all_stream_pkt_num[i]; + close_pkt += close_stream_pkt_num[i]; + all_push_count += push_count[i]; + all_error_count += error_count[i]; + } + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, + module_name, "all_stream_pkt_num is %llu,close_stream_pkt_num is %llu, push_count is %llu, error_count is %llu. \n", + all_pkt, close_pkt, all_push_count, all_error_count); + } +} + +int CHAR_INIT() +{ + int demo_plugid = 51; + runtime_log_handler = NULL; + kafka_log_handler = NULL; + char str_tmp[128]; + + int log_level = 30; + + MESA_load_profile_int_def(entrance_id_path, "LOG", "log_level", &log_level, 30); + + runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level); + kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level); + if (runtime_log_handler == NULL || kafka_log_handler == NULL) + { + /* code */ + printf("MESA_create_runtime_log_handle failed!!!"); + return -1; + } + /* VPN_ID drop */ + char vpn_id[256]; + MESA_load_profile_string_def(entrance_id_path, "SETTING", "VPN_ID_DROP", vpn_id, sizeof(vpn_id), ""); + char *buff; + buff = vpn_id; + if (strlen(vpn_id) != 0) + { + char *id_str; + id_str = strsep(&buff, ","); + while (id_str != NULL) + { + int id = atoi(id_str); + if (id > 0 && id < 256) + { + vpn_id_drop[id] = 1; + } + id_str = strsep(&buff, ","); + } + } + /* ENTRANCE_ID */ + MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0); + /* FLOW_TYPE */ + MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX); + /* Brokers */ + // MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#"); + /* vx_ip_header_dst_ip */ + MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#"); + if ('#' == str_tmp[0]) + { + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "can't get %s->sendto_gdev_ip!!", gdev_conf_path); + return -1; + } + /* load gdev ip from conf/gdev.conf*/ + MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", sendto_gdev_ip, sizeof(sendto_gdev_ip), "#"); + + int ret, conf_ret_val; + ret = MESA_load_profile_int_def(entrance_id_path, "SETTING", "__DEBUG_RETURN_VALUE", &conf_ret_val, (APP_STATE_GIVEME | APP_STATE_DROPPKT)); + if (ret >= 0) + { + /* debug模式下, 临时开启回注, 用于测试 */ + if ((conf_ret_val != APP_STATE_GIVEME) && (conf_ret_val != (APP_STATE_GIVEME | APP_STATE_DROPPKT)) && (conf_ret_val != (APP_STATE_DROPME | APP_STATE_DROPPKT))) + { + printf("config __DEBUG_RETURN_VALUE invalid!"); + exit(1); + } + DEFAULT_RETURN_VALUE = (char)conf_ret_val; + g_business_plug_type = 1; + } + else + { + platform_register_action_judge(return_action_cb_fun); + g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */ + } + + MESA_load_profile_int_def(entrance_id_path, "SETTING", "__ERROR_COREDUMP", &error_coredump, 0); + + inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net); + + /* kafka初始化*/ + // if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) + // { + // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); + // return -1; + // } + + //GSJ Work thread start + int ret_thread; + ret_thread = pthread_create(&GSJ_Work, NULL, (void *)Work, NULL); + if (ret_thread == 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start success\n"); + pthread_detach(GSJ_Work); + } + else + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start failed\n"); + } + + int count_thread; + count_thread = pthread_create(&count, NULL, (void *)count_work, NULL); + if (count_thread == 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "count thread start success\n"); + pthread_detach(count); + } + else + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "count thread start failed\n"); + } + + // 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n"); + return demo_plugid; +} + +void LRJ_APP_DESTROY() +{ + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n"); + printf("TEST_APP_DESTORY in...\n"); + // kafka_destroy(); + if (runtime_log_handler == NULL) + { + printf("TEST_APP_DESTORY out...\n"); + return; + } + MESA_destroy_runtime_log_handle(runtime_log_handler); + MESA_destroy_runtime_log_handle(kafka_log_handler); + printf("TEST_APP_DESTORY out...\n"); +} |
