summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2019-05-09 13:00:28 +0800
committer李仁杰 <[email protected]>2019-05-09 13:00:28 +0800
commit89d2a94f8560277fc2a8b32d6430b9731b2043d6 (patch)
treed07feea5234b7bfac0bbb0b732245ff689b0d2c3
parent4e1cef039cd01513b4c552bb4a7f9f8f3bba1144 (diff)
源码文件
-rw-r--r--lirenjie_vxlan_sapp.c779
1 files changed, 779 insertions, 0 deletions
diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c
new file mode 100644
index 0000000..0a93fbc
--- /dev/null
+++ b/lirenjie_vxlan_sapp.c
@@ -0,0 +1,779 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <netinet/udp.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include <syslog.h>
+#include <signal.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include "MESA_handle_logger.h"
+#include "stream.h"
+#include "gdev_keepalive.h"
+#include "rdkafka.h"
+
+#define MAX_LOG_INFO_LEN 256
+#define MAX_TRAFFIC_INFO_LEN 1024
+const char *module_name = "lirenjie_vxlan";
+const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
+const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
+const char *gdev_conf_path = "./conf/gdev.conf";
+const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
+void *runtime_log_handler;
+void *kafka_log_handler;
+char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
+unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+unsigned char flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+
+/* kafka */
+const int PRODUCER_INIT_FAILED = -1;
+const int PRODUCER_INIT_SUCCESS = 0;
+const int PUSH_DATA_FAILED = -1;
+const int PUSH_DATA_SUCCESS = 0;
+int partition;
+//rd
+rd_kafka_t *kafka_producer;
+rd_kafka_conf_t *conf;
+// topic
+rd_kafka_topic_t *rkt;
+rd_kafka_topic_conf_t *topic_conf;
+// char errstr[512]={0};
+char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
+char *topic = "G_BACK_TRAFFIC_STATISTIC";
+
+struct traffic_info
+{
+ unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0
+ unsigned short PROTO_TYPE; //应用层协议类型,用目的端口来表示
+ UINT32 C2S_pkt_num; /* C2S, you should better use stream_project.h : struct udp_flow_stat */
+ UINT32 S2C_pkt_num; /* S2C, you should better use stream_project.h : struct udp_flow_stat */
+ UINT32 C2S_bytes; /* C2S, you should better use stream_project.h : struct udp_flow_stat */
+ UINT32 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */
+ //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday
+ // time_t stat_time; // 秒级
+ struct timeval stat_time; //微妙级
+ char vx_ip_header_src_ip[INET_ADDRSTRLEN];
+ unsigned short vx_UDP_header_src_port;
+ unsigned short vx_UDP_header_dst_port;
+ struct layer_addr addr;
+ unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
+ unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806
+ /* ipv4 src_ip dst_ip identification fragment_offset*/
+ char ipv4_sip[INET_ADDRSTRLEN];
+ char ipv4_dip[INET_ADDRSTRLEN];
+ unsigned short ipv4_id;
+ unsigned short ipv4_off;
+ /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/
+ char ipv6_sip[INET6_ADDRSTRLEN];
+ char ipv6_dip[INET6_ADDRSTRLEN];
+ unsigned char ipv6_bus_type;
+ unsigned int ipv6_flow_flag;
+ unsigned short ipv6_load_length;
+ unsigned char ipv6_next_msg_head;
+ unsigned char ipv6_limit;
+};
+
+static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
+}
+
+int init_kafka(int partition_, char *brokers_, char *topic_)
+{
+ char tmp[16];
+ char errstr[512];
+ partition = partition_;
+ /* Kafka configuration */
+ conf = rd_kafka_conf_new();
+ //set logger :register log function
+ rd_kafka_conf_set_log_cb(conf, logger);
+ /* Quick termination */
+ snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+ rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+ /*topic configuration*/
+ topic_conf = rd_kafka_topic_conf_new();
+
+ if (conf == NULL)
+ {
+ //fprintf(stderr, "***** Failed to create new conf *******\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** Failed to create new conf *******");
+ return PRODUCER_INIT_FAILED;
+ }
+ kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
+ if (kafka_producer == NULL)
+ {
+ /*fprintf(stderr, "***** kafka_producer is null *******\n");
+ fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"***** kafka_producer is null, *******");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"*****Failed to create new producer: %s*******\n", errstr);
+ return PRODUCER_INIT_FAILED;
+ }
+
+ rd_kafka_set_log_level(kafka_producer, LOG_DEBUG);
+
+ /* Add brokers */
+ if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0)
+ {
+ //fprintf(stderr, "****** No valid brokers specified********\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"****** No valid brokers specified********");
+ return PRODUCER_INIT_FAILED;
+ }
+ /* Create topic */
+ rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf);
+
+ return PRODUCER_INIT_SUCCESS;
+}
+
+void kafka_destroy()
+{
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(kafka_producer);
+}
+
+int push_data_to_kafka(char *buffer, int buf_len)
+{
+ int ret;
+ if (buffer == NULL)
+ {
+ return 0;
+ }
+ ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ if (ret == -1)
+ {
+ /*fprintf(stderr,
+ "%% Failed to produce to topic %s "
+ "partition %i: %s\n",
+ rd_kafka_topic_name(rkt), partition,
+ rd_kafka_err2str(rd_kafka_last_error()));*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Failed to produce to topic %s partition %i: %s",
+ rd_kafka_topic_name(rkt), partition,
+ rd_kafka_err2str(rd_kafka_last_error()));
+ /* Poll to handle delivery reports */
+ rd_kafka_poll(kafka_producer, 0);
+ }
+ /*fprintf(stderr, "%% Sent %zd bytes to topic "
+ "%s partition %i\n",
+ buf_len, rd_kafka_topic_name(rkt), partition);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"%% Sent %zd bytes to topic %s partition %i",
+ buf_len, rd_kafka_topic_name(rkt), partition);
+ rd_kafka_poll(kafka_producer, 0);
+ return PUSH_DATA_SUCCESS;
+}
+
+unsigned char get_service_id(struct streaminfo *pstream)
+{
+ int ret;
+ int gdev_ip;
+ int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
+ unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
+ unsigned char service_id;
+
+ /* 获取vxlan_info结构体 */
+ struct vxlan_info *vxlan;
+ int opt_val_len = sizeof(struct vxlan_info);
+ ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len);
+ if (ret > 0)
+ {
+ printf("[debug], encap_type:%s, entrance_id:%s, dev_id:%s, link_id:%s, link_dir:%s,inner_smac:%s, inner_dmac:%s,inner_smac_hex:%s, inner_dmac_hex:%s ",
+ vxlan->encap_type, entrance_id, vxlan->dev_id, vxlan->link_id, vxlan->link_dir,
+ vxlan->inner_smac, vxlan->inner_dmac, vxlan->inner_smac_hex, vxlan->inner_dmac_hex);
+ }
+ else
+ {
+ printf("[error], MESA_get_stream_opt get VXLAN_INFO error\n");
+ }
+
+ /* get ip */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip);
+ if (ret >= 0)
+ {
+ char tmp_ip_str[32];
+ inet_ntop(AF_INET, &gdev_ip, tmp_ip_str, 32);
+ printf("[debug], get_rawpkt_options get gdev-ip:%s\n", tmp_ip_str);
+ }
+ else
+ {
+ printf("[error], get_rawpkt_options get gdev-ip error\n");
+ }
+ /* get vxlan_id */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
+ if (ret >= 0)
+ {
+ printf("[debug], test_get_rawpkt_options get vlan-id:%d\n", vxlan_id);
+
+ service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id));
+ printf("service id from vxlan_id: %u\n", service_id);
+ }
+ else
+ {
+ printf("[error], test_get_rawpkt_options get vlan-id error\n");
+ }
+ /* get vxlan_port */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport));
+
+ service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport));
+ printf("service id from sport: %u\n", service_id);
+ }
+ else
+ {
+ printf("[error], test_get_rawpkt_options get sport error\n");
+ }
+
+ return service_id;
+}
+
+unsigned short get_proto_type(struct streaminfo *pstream)
+{
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ return ntohs(tuple4_v4->dest);
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 */
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ return ntohs(tuple4_v6->dest);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
+{
+ int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
+ unsigned char service_id;
+ if (ret >= 0)
+ {
+ service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id));
+ return service_id;
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+unsigned char get_service_id_from_sport(struct streaminfo *pstream)
+{
+ int ret;
+ unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
+ unsigned char service_id;
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ // printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport));
+ service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport));
+ // printf("service id from sport: %u\n", service_id);
+ return service_id;
+ }
+ else
+ {
+ // printf("[error], test_get_rawpkt_options get sport error\n");
+ return 0;
+ }
+}
+
+void get_vx_ip_header_src_ip(struct streaminfo *pstream, struct traffic_info *tinfo)
+{
+ int gdev_ip;
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip);
+ if (ret >= 0)
+ {
+ inet_ntop(AF_INET, &gdev_ip, tinfo->vx_ip_header_src_ip, INET_ADDRSTRLEN);
+ }
+ else
+ {
+ memset(tinfo->vx_ip_header_src_ip, 0, INET_ADDRSTRLEN);
+ }
+}
+
+unsigned char get_vx_UDP_header_src_port(struct streaminfo *pstream)
+{
+ unsigned short vxlan_sport;
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ return ntohs(vxlan_sport);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+/* 获取pstream中的四元组 */
+void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
+{
+ // printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ /* ipv4 */
+ char sip[INET_ADDRSTRLEN];
+ char dip[INET_ADDRSTRLEN];
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET, &(tuple4_v4->saddr), sip, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(tuple4_v4->daddr), dip, INET_ADDRSTRLEN);
+ // printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), dip, ntohs(tuple4_v4->dest));
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v4->source),
+ dip, ntohs(tuple4_v4->dest), service_id);
+ printf(info);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source),
+ dip, ntohs(tuple4_v4->dest));
+ }
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 */
+ char sip[INET6_ADDRSTRLEN];
+ char dip[INET6_ADDRSTRLEN];
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET6, &(tuple4_v6->saddr), sip, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN);
+ printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest));
+
+ /* 获取业务号 */
+ // unsigned char service_id = get_service_id(pstream);
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v6->source),
+ dip, ntohs(tuple4_v6->dest), service_id);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source),
+ dip, ntohs(tuple4_v6->dest));
+ }
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
+ {
+ }
+}
+
+void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
+{
+ // printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ /* ipv4 src_ip dst_ip identification fragment_offset*/
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET, &(tuple4_v4->saddr), tinfo->ipv4_sip, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(tuple4_v4->daddr), tinfo->ipv4_dip, INET_ADDRSTRLEN);
+ struct ip *ip_hdr = (struct ip *)rawpkt;
+ tinfo->ipv4_id = ntohs(ip_hdr->ip_id);
+ tinfo->ipv4_off = ntohs(ip_hdr->ip_off);
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (tinfo->service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source),
+ tinfo->ipv4_dip, ntohs(tuple4_v4->dest), tinfo->service_id);
+ // printf(info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source),
+ tinfo->ipv4_dip, ntohs(tuple4_v4->dest));
+ }
+ // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET6, &(tuple4_v6->saddr), tinfo->ipv6_sip, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &(tuple4_v6->daddr), tinfo->ipv6_dip, INET6_ADDRSTRLEN);
+ struct ip6_hdr *ip6_head = (struct ip6_hdr *)rawpkt;
+ tinfo->ipv6_bus_type = ntohl(ip6_head->ip6_flow) & 0x0FF00000;
+ tinfo->ipv6_flow_flag = ntohl(ip6_head->ip6_flow) & 0x000FFFFF;
+ tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen);
+ tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt;
+ tinfo->ipv6_limit = ip6_head->ip6_hlim;
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (tinfo->service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source),
+ tinfo->ipv6_dip, ntohs(tuple4_v6->dest), tinfo->service_id);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source),
+ tinfo->ipv6_dip, ntohs(tuple4_v6->dest));
+ }
+ // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
+ {
+ return;
+ }
+}
+
+void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
+{
+ char protocol[8];
+ switch (tinfo->protocol)
+ {
+ case 1:
+ sprintf(protocol, "%s", "IPv4_TCP");
+ break;
+ case 2:
+ sprintf(protocol, "%s", "IPv4_UDP");
+ break;
+ case 3:
+ sprintf(protocol, "%s", "IPv6_TCP");
+ break;
+ case 4:
+ sprintf(protocol, "%s", "IPv6_UDP");
+ break;
+ default:
+ sprintf(protocol, "%s", "others");
+ break;
+ }
+ /*
+ printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ",
+ tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes);
+ printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ",
+ tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip);
+ printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ",
+ tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id);
+ printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ",
+ tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off);
+ printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ",
+ tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit);
+ */
+ char info[MAX_TRAFFIC_INFO_LEN] = {0};
+
+ switch (tinfo->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
+ "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%d,"
+ "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
+ tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
+ tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,flow_type);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ push_data_to_kafka(info,strlen(info));
+ break;
+ case ADDR_TYPE_IPV6:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
+ "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
+ "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
+ "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d}",
+ tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,
+ tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length,
+ tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ push_data_to_kafka(info,strlen(info));
+ break;
+ case ADDR_TYPE_ARP:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%d,\"s2c_byte_len\":%d,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
+ "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
+ "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d}",
+ tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec*1000 + tinfo->stat_time.tv_usec/1000, tinfo->vx_type, tinfo->vx_ip_header_src_ip, vx_ip_header_dst_ip,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->service_id,flow_type);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ push_data_to_kafka(info,strlen(info));
+ break;
+ default:
+ break;
+ }
+}
+
+char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ printf("TCP_ENTRY_ALL SUCCESS!!!\n");
+ return APP_STATE_GIVEME;
+}
+
+static int tcp_flow_id = -1;
+char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail;
+ struct traffic_info *tinfo;
+
+ if (-1 == tcp_flow_id)
+ {
+ tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
+ if (-1 == tcp_flow_id)
+ {
+ printf("'tcp_flow_stat' is disable, no statistics\n");
+ }
+ }
+
+ if (pstream->opstate == OP_STATE_PENDING)
+ {
+ tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
+ tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ /* PROTOCOL */
+ switch (pstream->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ tinfo->protocol = 1; // IPV4_TCP
+ tinfo->vx_type = 0x0800;
+ break;
+ case ADDR_TYPE_IPV6:
+ tinfo->protocol = 3; // IPV6_TCP
+ tinfo->vx_type = 0x86DD;
+ break;
+ case ADDR_TYPE_ARP:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0x0806;
+ break;
+ default:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0;
+ break;
+ }
+ /* vx_UDP_header_src_port dst_port*/
+ tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
+ tinfo->vx_UDP_header_dst_port = 4789;
+ /* vx_ip_header_src_ip dst_ip*/
+ get_vx_ip_header_src_ip(pstream, tinfo);
+ /* IPv4、IPv6头部信息 */
+ get_ip_detail(pstream, tinfo, raw_pkt);
+ /* 应用层协议类型 用目的端口表示 */
+ tinfo->PROTO_TYPE = get_proto_type(pstream);
+
+ *pme = tinfo;
+ }
+ tinfo = (struct traffic_info *)(*pme);
+ /* 自己统计包数字节数 *//*
+ if(raw_pdetail->datalen > 0)
+ {
+ if(DIR_C2S == pstream->curdir)
+ {
+ tinfo->C2S_pkt_num++;
+ tinfo->C2S_bytes += raw_pdetail->datalen;
+ }
+ else
+ {
+ tinfo->S2C_pkt_num++;
+ tinfo->S2C_bytes += raw_pdetail->datalen;
+ }
+ }
+ */
+ if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
+ {
+ //printf("TCP_ENTRY SUCCESS!!!\n");
+ /* 获取包数字节数 */
+ tinfo->C2S_pkt_num = raw_pdetail->serverpktnum;
+ tinfo->S2C_pkt_num = raw_pdetail->clientpktnum;
+ tinfo->C2S_bytes = raw_pdetail->serverbytes;
+ tinfo->S2C_bytes = raw_pdetail->clientbytes;
+
+ /* 另一种获取包数字节数的方法 */
+ /*
+ if(tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
+ tinfo->C2S_bytes = tflow->C2S_all_byte;
+ tinfo->S2C_bytes = tflow->S2C_all_byte;
+ }
+ */
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ // tinfo->stat_time = time(0);
+ gettimeofday(&tinfo->stat_time,NULL);
+
+ print_traffic_info(tinfo, pstream);
+ free(tinfo);
+ // printf("\n");
+ }
+ return APP_STATE_GIVEME;
+}
+
+static int udp_flow_id = -1;
+char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
+ struct traffic_info *tinfo;
+
+ if (-1 == udp_flow_id)
+ {
+ udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct");
+ if (-1 == udp_flow_id)
+ {
+ printf("'udp_flow_stat' is disable, no statistics\n");
+ }
+ }
+
+ if (pstream->opstate == OP_STATE_PENDING)
+ {
+ tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
+ tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ /* PROTOCOL */
+ switch (pstream->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ tinfo->protocol = 2; // IPV4_UDP
+ tinfo->vx_type = 0x0800;
+ break;
+ case ADDR_TYPE_IPV6:
+ tinfo->protocol = 4; // IPV6_UDP
+ tinfo->vx_type = 0x86DD;
+ break;
+ case ADDR_TYPE_ARP:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0x0806;
+ break;
+ default:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0;
+ break;
+ }
+ /* vx_UDP_header_src_port dst_port*/
+ tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
+ tinfo->vx_UDP_header_dst_port = 4789;
+ /* vx_ip_header_src_ip dst_ip*/
+ get_vx_ip_header_src_ip(pstream, tinfo);
+ /* IPv4、IPv6头部信息 */
+ get_ip_detail(pstream, tinfo, raw_pkt);
+ /* 应用层协议类型 用目的端口表示 */
+ tinfo->PROTO_TYPE = get_proto_type(pstream);
+
+ *pme = tinfo;
+ }
+ tinfo = (struct traffic_info *)(*pme);
+
+ if (pstream->opstate == OP_STATE_CLOSE && tinfo->service_id > 0)
+ {
+ //printf("UDP_ENTRY SUCCESS!!!\n");
+
+ if (pdetail != NULL)
+ {
+ /* */
+ tinfo->C2S_pkt_num = pdetail->serverpktnum;
+ tinfo->S2C_pkt_num = pdetail->clientpktnum;
+ tinfo->C2S_bytes = pdetail->serverbytes;
+ tinfo->S2C_bytes = pdetail->clientbytes;
+ /*
+ if(udp_flow_id != -1)
+ {
+ struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt;
+ tinfo->C2S_bytes = tflow->C2S_byte;
+ tinfo->S2C_bytes = tflow->S2C_byte;
+ }*/
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ // tinfo->stat_time = time(0);
+ gettimeofday(&tinfo->stat_time,NULL);
+
+ print_traffic_info(tinfo, pstream);
+ free(tinfo);
+ // printf("\n");
+ }
+ return APP_STATE_GIVEME;
+}
+
+int CHAR_INIT()
+{
+ int demo_plugid = 51;
+ runtime_log_handler = NULL;
+ kafka_log_handler = NULL;
+ runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, RLOG_LV_INFO);
+ kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, RLOG_LV_INFO);
+ if (runtime_log_handler == NULL || kafka_log_handler == NULL)
+ {
+ /* code */
+ printf("MESA_create_runtime_log_handle failed!!!");
+ return -1;
+ }
+ /* ENTRANCE_ID */
+ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0);
+ /* FLOW_TYPE */
+ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, 0);
+ /* vx_ip_header_dst_ip */
+ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", vx_ip_header_dst_ip, INET_ADDRSTRLEN, "0.0.0.0");
+ /* kafka初始化 */
+ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
+ {
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
+ return -1;
+ }
+
+ // 函数实现自定义
+ // 只要求函数返回值为插件ID;
+ //printf("INIT SUCCESS!!!\n");
+ return demo_plugid;
+}
+
+void LRJ_APP_DESTROY()
+{
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n");
+ printf("TEST_APP_DESTORY in...\n");
+ kafka_destroy();
+ if (runtime_log_handler == NULL)
+ {
+ printf("TEST_APP_DESTORY out...\n");
+ return;
+ }
+ MESA_destroy_runtime_log_handle(runtime_log_handler);
+ MESA_destroy_runtime_log_handle(kafka_log_handler);
+ printf("TEST_APP_DESTORY out...\n");
+}