summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Makefile7
-rw-r--r--lirenjie_vxlan_sapp_20200728.c1113
-rw-r--r--plug_run_evn/conf/lrj_vxlan.inf17
-rw-r--r--plug_run_evn/conf/lrj_vxlan_sapp.conf15
4 files changed, 1149 insertions, 3 deletions
diff --git a/Makefile b/Makefile
index da90a51..5672bc4 100644
--- a/Makefile
+++ b/Makefile
@@ -20,9 +20,10 @@ INCS += -I../../include/support/
.PHONY: all clean
all: $(TARGET)
-lirenjie_vxlan_sapp.so: lirenjie_vxlan_sapp.o
- $(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES)
- cp $@ ../plug/business/lirenjie_vxlan/
+#lirenjie_vxlan_sapp.so: lirenjie_vxlan_sapp.o
+lirenjie_vxlan_sapp.so: lirenjie_vxlan_sapp_20200728.o
+ $(CC) -o $@ -shared -fPIC $(CFLAGS) $^ $(MODULES) -L/opt/MESA/lib/
+ #cp $@ ../plug/business/lirenjie_vxlan/
.c.o:
$(CC) -c -o $@ $(CFLAGS) -I. $(INCS) $<
diff --git a/lirenjie_vxlan_sapp_20200728.c b/lirenjie_vxlan_sapp_20200728.c
new file mode 100644
index 0000000..41ff349
--- /dev/null
+++ b/lirenjie_vxlan_sapp_20200728.c
@@ -0,0 +1,1113 @@
+#include <stdlib.h>
+#include <stdio.h>
+#include <netinet/ip.h>
+#include <netinet/tcp.h>
+#include <netinet/in.h>
+#include <netinet/udp.h>
+#include <assert.h>
+#include <sys/time.h>
+#include <time.h>
+#include <arpa/inet.h>
+#include <syslog.h>
+#include <signal.h>
+#include <netinet/ip.h>
+#include <netinet/ip6.h>
+#include <netinet/udp.h>
+#include <pthread.h>
+#include "MESA_handle_logger.h"
+#include "MESA_prof_load.h"
+#include "stream.h"
+#include "rdkafka.h"
+
+/*********************************************************************************************************************************
+此版本没功能改动, 仅用于定位包数、字节数统计错误bug!!!
+v3.0版本会出现pkt=0, 但Bytes不是0的情况!
+v4.0版本暂时没发现!!
+*********************************************************************************************************************************/
+
+#define FLOW_DEBUG_ABORT 0
+
+int version_20200728;
+
+static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT);
+
+static int error_coredump = 0;
+
+extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order);
+extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
+extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action));
+extern int g_business_plug_type;
+
+#define MAX_LOG_INFO_LEN 256
+#define MAX_VPN_ID_NUM 256
+#define MAX_TRAFFIC_INFO_LEN 1024
+static const char *module_name = "lirenjie_vxlan";
+static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
+static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
+static const char *gdev_conf_path = "./conf/gdev.conf";
+static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
+static void *runtime_log_handler;
+static void *kafka_log_handler;
+//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
+static char sendto_gdev_ip[INET_ADDRSTRLEN]; //conf/gdev.conf sendto_gdev_ip
+static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */
+static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop */
+
+#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回�? */
+
+static unsigned int entrance_id; /* 局点ID 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�?*/
+
+enum flow_type_t
+{
+ FLOW_TYPE_REFLUX = 0, /* 回流 */
+ FLOW_TYPE_INJECT = 1, /* 回注 */
+};
+static unsigned int flow_type; /* 回流0/回注1 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�? */
+
+/* kafka */
+static const int PRODUCER_INIT_FAILED = -1;
+static const int PRODUCER_INIT_SUCCESS = 0;
+static const int PUSH_DATA_FAILED = -1;
+static const int PUSH_DATA_SUCCESS = 0;
+static int partition;
+//rd
+static rd_kafka_t *kafka_producer;
+static rd_kafka_conf_t *conf;
+// topic
+static rd_kafka_topic_t *rkt;
+static rd_kafka_topic_conf_t *topic_conf;
+// char errstr[512]={0};
+//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
+//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092";
+// char brokers[128];
+
+static char *topic = "G_BACK_TRAFFIC_STATISTIC_new";
+
+//GSJ thread
+pthread_t GSJ_Work;
+
+struct traffic_info
+{
+ unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0
+ unsigned short PROTO_TYPE; //应用层协议类型,用目的端口来表示
+ UINT32 C2S_pkt_num; /* C2S, you should better use stream_project.h : struct udp_flow_stat */
+ UINT32 S2C_pkt_num; /* S2C, you should better use stream_project.h : struct udp_flow_stat */
+ UINT64 C2S_bytes; /* C2S, you should better use stream_project.h : struct udp_flow_stat */
+ UINT64 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */
+ //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday
+ // time_t stat_time; // 秒级
+ struct timeval stat_time;
+ unsigned int vx_ip_header_src_ip_net;
+ unsigned int vx_ip_header_dst_ip_net;
+ unsigned short vx_UDP_header_src_port;
+ unsigned short vx_UDP_header_dst_port;
+ struct layer_addr addr;
+ //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
+ int vxlan_vpn_id;
+ unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806
+ /* ipv4 src_ip dst_ip identification fragment_offset*/
+ char ipv4_sip[INET_ADDRSTRLEN];
+ char ipv4_dip[INET_ADDRSTRLEN];
+ unsigned short ipv4_id;
+ unsigned short ipv4_off;
+ /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/
+ char ipv6_sip[INET6_ADDRSTRLEN];
+ char ipv6_dip[INET6_ADDRSTRLEN];
+ unsigned char ipv6_bus_type;
+ unsigned int ipv6_flow_flag;
+ unsigned short ipv6_load_length;
+ unsigned char ipv6_next_msg_head;
+ unsigned char ipv6_limit;
+};
+
+static void logger(const rd_kafka_t *rk, int level, const char *fac, const char *buf)
+{
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ /*fprintf(stderr, "%u.%03u RDKAFKA-%i-%s: %s: %s\n",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%u.%03u RDKAFKA-%i-%s: %s: %s",
+ (int)tv.tv_sec, (int)(tv.tv_usec / 1000),
+ level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
+}
+
+static int init_kafka(int partition_, char *brokers_, char *topic_)
+{
+ char tmp[16];
+ char errstr[1024];
+ partition = partition_;
+ /* Kafka configuration */
+ conf = rd_kafka_conf_new();
+ //set logger :register log function
+ rd_kafka_conf_set_log_cb(conf, logger);
+ /* Quick termination */
+ snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+ rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+ //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
+ /*topic configuration*/
+ topic_conf = rd_kafka_topic_conf_new();
+
+ if (conf == NULL)
+ {
+ //fprintf(stderr, "***** Failed to create new conf *******\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******");
+ return PRODUCER_INIT_FAILED;
+ }
+ kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
+ if (kafka_producer == NULL)
+ {
+ /*fprintf(stderr, "***** kafka_producer is null *******\n");
+ fprintf(stderr, "*****Failed to create new producer: %s*******\n", errstr);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** kafka_producer is null, *******");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "*****Failed to create new producer: %s*******\n", errstr);
+ return PRODUCER_INIT_FAILED;
+ }
+
+ rd_kafka_set_log_level(kafka_producer, LOG_DEBUG);
+
+ /* Add brokers */
+ if (rd_kafka_brokers_add(kafka_producer, brokers_) == 0)
+ {
+ //fprintf(stderr, "****** No valid brokers specified********\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "****** No valid brokers specified********");
+ return PRODUCER_INIT_FAILED;
+ }
+ /* Create topic */
+ rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf);
+
+ return PRODUCER_INIT_SUCCESS;
+}
+
+static void kafka_destroy()
+{
+ rd_kafka_topic_destroy(rkt);
+ rd_kafka_destroy(kafka_producer);
+}
+
+static int push_data_to_kafka(char *buffer, int buf_len)
+{
+ int ret;
+ if (buffer == NULL)
+ {
+ return 0;
+ }
+ //ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ if (ret == -1)
+ {
+ /*fprintf(stderr,
+ "%% Failed to produce to topic %s "
+ "partition %i: %s\n",
+ rd_kafka_topic_name(rkt), partition,
+ rd_kafka_err2str(rd_kafka_last_error()));*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "%% Failed to produce to topic %s partition %i: %s",
+ rd_kafka_topic_name(rkt), partition,
+ rd_kafka_err2str(rd_kafka_last_error()));
+ /* Poll to handle delivery reports */
+ //rd_kafka_poll(kafka_producer, 0);
+ return PUSH_DATA_FAILED;
+ }
+ /*fprintf(stderr, "%% Sent %zd bytes to topic "
+ "%s partition %i\n",
+ buf_len, rd_kafka_topic_name(rkt), partition);*/
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i",
+ buf_len, rd_kafka_topic_name(rkt), partition);
+ // rd_kafka_poll(kafka_producer, 0);
+ return PUSH_DATA_SUCCESS;
+}
+
+static void push_data_to_GSJ(char *buffer, int buf_len)
+{
+// GoString value = {(const char *)buffer, buf_len};
+// GetInfo(value);
+ // Work();
+}
+
+#if 0
+unsigned char get_service_id(struct streaminfo *pstream)
+{
+ int ret;
+ int gdev_ip;
+ int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
+ unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
+ unsigned char service_id;
+
+ /* 获取vxlan_info结构体*/
+ struct vxlan_info *vxlan;
+ int opt_val_len = sizeof(struct vxlan_info);
+ ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len);
+ if (ret > 0)
+ {
+ printf("[debug], encap_type:%s, entrance_id:%s, dev_id:%s, link_id:%s, link_dir:%s,inner_smac:%s, inner_dmac:%s,inner_smac_hex:%s, inner_dmac_hex:%s ",
+ vxlan->encap_type, entrance_id, vxlan->dev_id, vxlan->link_id, vxlan->link_dir,
+ vxlan->inner_smac, vxlan->inner_dmac, vxlan->inner_smac_hex, vxlan->inner_dmac_hex);
+ }
+ else
+ {
+ printf("[error], MESA_get_stream_opt get VXLAN_INFO error\n");
+ }
+
+ /* get ip */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip);
+ if (ret >= 0)
+ {
+ char tmp_ip_str[32];
+ inet_ntop(AF_INET, &gdev_ip, tmp_ip_str, 32);
+ printf("[debug], get_rawpkt_options get gdev-ip:%s\n", tmp_ip_str);
+ }
+ else
+ {
+ printf("[error], get_rawpkt_options get gdev-ip error\n");
+ }
+ /* get vxlan_id */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
+ if (ret >= 0)
+ {
+ printf("[debug], test_get_rawpkt_options get vlan-id:%d\n", vxlan_id);
+
+ service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id));
+ printf("service id from vxlan_id: %u\n", service_id);
+ }
+ else
+ {
+ printf("[error], test_get_rawpkt_options get vlan-id error\n");
+ }
+ /* get vxlan_port */
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport));
+
+ service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport));
+ printf("service id from sport: %u\n", service_id);
+ }
+ else
+ {
+ printf("[error], test_get_rawpkt_options get sport error\n");
+ }
+
+ return service_id;
+}
+#endif
+
+static unsigned short get_proto_type(struct streaminfo *pstream)
+{
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ return ntohs(tuple4_v4->dest);
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 */
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ return ntohs(tuple4_v6->dest);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+static int get_vpnid_from_stream(struct streaminfo *pstream)
+{
+ int vpn_id_net_order;
+ int ret;
+
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_VPNID, &vpn_id_net_order);
+ if (ret >= 0)
+ {
+ return ntohl(vpn_id_net_order);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+#if 0
+static unsigned char get_service_id_from_vxlanid(struct streaminfo *pstream)
+{
+ int vxlan_id; /* 由vxlan_id获取当前包所属业务号 vxlan_id_map_to_service_id*/
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_ID, &vxlan_id);
+ unsigned char service_id;
+ if (ret >= 0)
+ {
+ service_id = vxlan_id_map_to_service_id(ntohl(vxlan_id));
+ return service_id;
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+
+static unsigned char get_service_id_from_sport(struct streaminfo *pstream)
+{
+ int ret;
+ unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
+ unsigned char service_id;
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ // printf("[debug], test_get_rawpkt_options get sport:%u\n", ntohs(vxlan_sport));
+ service_id = vxlan_sport_map_to_service_id(ntohs(vxlan_sport));
+ // printf("service id from sport: %u\n", service_id);
+ return service_id;
+ }
+ else
+ {
+ // printf("[error], test_get_rawpkt_options get sport error\n");
+ return 0;
+ }
+}
+#endif
+
+#if 0
+static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net)
+{
+ if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) ==
+ (ntohl(local_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK)){
+ return 1;
+ }
+
+ return 0;
+}
+#endif
+
+static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo)
+{
+ int gdev_ip_net = 0, local_dev_ip = 0;
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net);
+ if (ret >= 0)
+ {
+#if 0
+ /* 回流/回注流量已经区分,接入不同的机器 靠配置文件指定*/
+ if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){
+ flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网�? 说明是回注包 */
+ }else{
+ flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网�? 说明是回流包 */
+ }
+
+ if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, "vxlan src and dst ip is equal!");
+ ret = -1;
+ }
+#endif
+
+ tinfo->vx_ip_header_src_ip_net = gdev_ip_net;
+ ret = 0;
+ }
+ else
+ {
+ tinfo->vx_ip_header_src_ip_net = 0;
+ tinfo->vx_ip_header_dst_ip_net = 0;
+ ret = -1;
+ if (error_coredump)
+ {
+ assert(0);
+ }
+ }
+
+ ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_LOCAL_IP, &local_dev_ip);
+ if (ret >= 0)
+ {
+ tinfo->vx_ip_header_dst_ip_net = local_dev_ip;
+ ret = 0;
+ }
+ else
+ {
+ tinfo->vx_ip_header_src_ip_net = 0;
+ tinfo->vx_ip_header_dst_ip_net = 0;
+ ret = -1;
+ if (error_coredump)
+ {
+ assert(0);
+ }
+ }
+
+ return ret;
+}
+
+static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream)
+{
+ unsigned short vxlan_sport;
+ int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_SPORT, &vxlan_sport);
+ if (ret >= 0)
+ {
+ return ntohs(vxlan_sport);
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+/* 获取pstream中的四元组 */
+#if 0
+static void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
+{
+ // printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ /* ipv4 */
+ char sip[INET_ADDRSTRLEN];
+ char dip[INET_ADDRSTRLEN];
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET, &(tuple4_v4->saddr), sip, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(tuple4_v4->daddr), dip, INET_ADDRSTRLEN);
+ // printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source), dip, ntohs(tuple4_v4->dest));
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v4->source),
+ dip, ntohs(tuple4_v4->dest), service_id);
+ printf(info);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v4->source),
+ dip, ntohs(tuple4_v4->dest));
+ }
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 */
+ char sip[INET6_ADDRSTRLEN];
+ char dip[INET6_ADDRSTRLEN];
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET6, &(tuple4_v6->saddr), sip, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN);
+ printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest));
+
+ /* 获取业务号 */
+ // unsigned char service_id = get_service_id(pstream);
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", sip, ntohs(tuple4_v6->source),
+ dip, ntohs(tuple4_v6->dest), service_id);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source),
+ dip, ntohs(tuple4_v6->dest));
+ }
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
+ {
+ }
+}
+#endif
+
+static void get_ip_detail(struct streaminfo *pstream, struct traffic_info *tinfo, const void *rawpkt)
+{
+ // printf("%s\n", addr_type_to_string((pstream->addr).addrtype));
+ if (pstream->addr.addrtype == ADDR_TYPE_IPV4)
+ {
+ /* ipv4 src_ip dst_ip identification fragment_offset*/
+ struct stream_tuple4_v4 *tuple4_v4 = (struct stream_tuple4_v4 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET, &(tuple4_v4->saddr), tinfo->ipv4_sip, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &(tuple4_v4->daddr), tinfo->ipv4_dip, INET_ADDRSTRLEN);
+ struct ip *ip_hdr = (struct ip *)rawpkt;
+ tinfo->ipv4_id = ntohs(ip_hdr->ip_id);
+ tinfo->ipv4_off = ntohs(ip_hdr->ip_off);
+#if 0
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (tinfo->service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source),
+ tinfo->ipv4_dip, ntohs(tuple4_v4->dest), tinfo->service_id);
+ // printf(info);
+ // push_data_to_kafka(info,sizeof(info));
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv4_sip, ntohs(tuple4_v4->source),
+ tinfo->ipv4_dip, ntohs(tuple4_v4->dest));
+ }
+ // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+#endif
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_IPV6)
+ {
+ /* ipv6 src_ip dst_ip bus_type flow_flag load_length next_msg_head limit*/
+ struct stream_tuple4_v6 *tuple4_v6 = (struct stream_tuple4_v6 *)(pstream->addr.paddr);
+ inet_ntop(AF_INET6, &(tuple4_v6->saddr), tinfo->ipv6_sip, INET6_ADDRSTRLEN);
+ inet_ntop(AF_INET6, &(tuple4_v6->daddr), tinfo->ipv6_dip, INET6_ADDRSTRLEN);
+ struct ip6_hdr *ip6_head = (struct ip6_hdr *)rawpkt;
+ tinfo->ipv6_bus_type = ntohl(ip6_head->ip6_flow) & 0x0FF00000;
+ tinfo->ipv6_flow_flag = ntohl(ip6_head->ip6_flow) & 0x000FFFFF;
+ tinfo->ipv6_load_length = ntohs(ip6_head->ip6_plen);
+ tinfo->ipv6_next_msg_head = ip6_head->ip6_nxt;
+ tinfo->ipv6_limit = ip6_head->ip6_hlim;
+#if 0
+
+ char info[MAX_LOG_INFO_LEN] = {0};
+ if (tinfo->service_id > 0)
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d service_id:%u\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source),
+ tinfo->ipv6_dip, ntohs(tuple4_v6->dest), tinfo->service_id);
+ }
+ else
+ {
+ snprintf(info, MAX_LOG_INFO_LEN, "%s:%d -> %s:%d\n", tinfo->ipv6_sip, ntohs(tuple4_v6->source),
+ tinfo->ipv6_dip, ntohs(tuple4_v6->dest));
+ }
+ // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
+ // push_data_to_kafka(info,sizeof(info));
+#endif
+ }
+ else if (pstream->addr.addrtype == ADDR_TYPE_ARP)
+ {
+ return;
+ }
+}
+
+static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *pstream)
+{
+ char protocol[8];
+ char vxlan_sip_ip_str[INET_ADDRSTRLEN];
+ char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+
+ switch (tinfo->protocol)
+ {
+ case 1:
+ sprintf(protocol, "%s", "IPv4_TCP");
+ break;
+ case 2:
+ sprintf(protocol, "%s", "IPv4_UDP");
+ break;
+ case 3:
+ sprintf(protocol, "%s", "IPv6_TCP");
+ break;
+ case 4:
+ sprintf(protocol, "%s", "IPv6_UDP");
+ break;
+ default:
+ sprintf(protocol, "%s", "others");
+ break;
+ }
+ /*
+ printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ",
+ tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes);
+ printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ",
+ tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip);
+ printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ",
+ tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id);
+ printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ",
+ tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off);
+ printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ",
+ tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit);
+ */
+ char info[MAX_TRAFFIC_INFO_LEN] = {0};
+
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
+
+ switch (tinfo->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%ld,\"s2c_byte_len\":%ld,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
+ "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u,"
+ "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
+ tinfo->ipv4_sip, tinfo->ipv4_dip, tinfo->ipv4_id, tinfo->ipv4_off,
+ //ipv6 stat is NULL,
+ flow_type, sendto_gdev_ip);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, module_name, info);
+ // push_data_to_kafka(info,strlen(info));
+ //push_data_to_GSJ(info, strlen(info));
+
+ if((0 == tinfo->C2S_pkt_num) && ( tinfo->C2S_bytes > 0)){
+#if FLOW_DEBUG_ABORT
+ abort();
+#else
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, "--------", "###### C2S_pkt_num is 0, but C2S_bytes not 0!--------------");
+#endif
+ }
+ if((0 == tinfo->S2C_pkt_num) && (tinfo->S2C_bytes > 0)){
+#if FLOW_DEBUG_ABORT
+ abort();
+#else
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, "--------", "###### S2C_pkt_num is 0, but S2C_bytes not 0!--------------");
+#endif
+
+ }
+
+ if((tinfo->C2S_pkt_num > 0) && ((double)tinfo->C2S_bytes/(double)tinfo->C2S_pkt_num > 2000)){
+#if FLOW_DEBUG_ABORT
+ abort();
+#else
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, "--------", "###### C2S avg pkt len more than 2000!---------------");
+#endif
+
+ }
+ if((tinfo->S2C_pkt_num > 0) && ((double)tinfo->S2C_bytes/(double)tinfo->S2C_pkt_num > 2000)){
+#if FLOW_DEBUG_ABORT
+ abort();
+#else
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, "--------", "###### S2C avg pkt len more than 2000!------------------");
+#endif
+
+ }
+ break;
+ case ADDR_TYPE_IPV6:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%ld,\"s2c_byte_len\":%ld,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
+ "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
+ "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
+ "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
+ tinfo->ipv6_sip, tinfo->ipv6_dip, tinfo->ipv6_bus_type, tinfo->ipv6_flow_flag, tinfo->ipv6_load_length,
+ tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, module_name, info);
+ // push_data_to_kafka(info,strlen(info));
+ //push_data_to_GSJ(info, strlen(info));
+ break;
+ case ADDR_TYPE_ARP:
+ snprintf(info, MAX_TRAFFIC_INFO_LEN,
+ "{\"proto_type\":%d,\"protocol\":\"%s\",\"entrance_id\":%d,\"c2s_pkt_num\":%d,\"s2c_pkt_num\":%d,\"c2s_byte_len\":%ld,\"s2c_byte_len\":%ld,"
+ "\"stat_time\":%ld,\"vx_type\":\"0x%04X\",\"vx_ip_header_src_ip\":\"%s\",\"vx_ip_header_dst_ip\":\"%s\","
+ "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
+ "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
+ "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
+ tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
+ tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
+ tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
+ flow_type, sendto_gdev_ip);
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_DEBUG, module_name, info);
+ // push_data_to_kafka(info,strlen(info));
+ //push_data_to_GSJ(info, strlen(info));
+ break;
+ default:
+ break;
+ }
+}
+
+char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ //printf("TCP_ENTRY_ALL SUCCESS!!!\n");
+ return DEFAULT_RETURN_VALUE;
+}
+
+static int tcp_flow_id = -1;
+char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail;
+ struct traffic_info *tinfo;
+
+ if (-1 == tcp_flow_id)
+ {
+ tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
+ if (-1 == tcp_flow_id)
+ {
+ printf("'tcp_flow_stat' is disable, no statistics\n");
+ }
+ }
+
+ if (pstream->opstate == OP_STATE_PENDING)
+ {
+ tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
+ /* PROTOCOL */
+ switch (pstream->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ tinfo->protocol = 1; // IPV4_TCP
+ tinfo->vx_type = 0x0800;
+ break;
+ case ADDR_TYPE_IPV6:
+ tinfo->protocol = 3; // IPV6_TCP
+ tinfo->vx_type = 0x86DD;
+ break;
+ case ADDR_TYPE_ARP:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0x0806;
+ break;
+ default:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0;
+ break;
+ }
+ /* vx_UDP_header_src_port dst_port*/
+ tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
+ tinfo->vx_UDP_header_dst_port = 4789;
+ /* vx_ip_header_src_ip dst_ip*/
+ if (get_vxlan_ip_addr(pstream, tinfo) < 0)
+ {
+ goto error_drop;
+ }
+ /* IPv4、IPv6头部信息 */
+ get_ip_detail(pstream, tinfo, raw_pkt);
+ /* 应用层协议类型 用目的端口表示*/
+ tinfo->PROTO_TYPE = get_proto_type(pstream);
+
+ *pme = tinfo;
+ }
+ tinfo = (struct traffic_info *)(*pme);
+ /* 自己统计包数字节数*/ /*
+ if(raw_pdetail->datalen > 0)
+ {
+ if(DIR_C2S == pstream->curdir)
+ {
+ tinfo->C2S_pkt_num++;
+ tinfo->C2S_bytes += raw_pdetail->datalen;
+ }
+ else
+ {
+ tinfo->S2C_pkt_num++;
+ tinfo->S2C_bytes += raw_pdetail->datalen;
+ }
+ }
+ */
+ /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
+ if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
+ {
+ //printf("TCP_ENTRY SUCCESS!!!\n");
+ /* 获取包数字节数*/
+ // tinfo->C2S_pkt_num = raw_pdetail->serverpktnum;
+ // tinfo->S2C_pkt_num = raw_pdetail->clientpktnum;
+ // tinfo->C2S_bytes = raw_pdetail->serverbytes;
+ // tinfo->S2C_bytes = raw_pdetail->clientbytes;
+ // if(tinfo->C2S_bytes < 0)
+ // {
+ // tinfo->C2S_bytes = 0;
+ // }
+ // if(tinfo->S2C_bytes < 0)
+ // {
+ // tinfo->S2C_bytes = 0;
+ // }
+ /* 另一种获取包数字节数的方式*/
+
+ if (tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
+ tinfo->C2S_bytes = tflow->C2S_all_byte;
+ tinfo->S2C_bytes = tflow->S2C_all_byte;
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+
+
+
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ // tinfo->stat_time = time(0);
+ gettimeofday(&tinfo->stat_time, NULL);
+
+ print_traffic_info(tinfo, pstream);
+ free(tinfo);
+ // printf("\n");
+ }
+ return DEFAULT_RETURN_VALUE;
+
+error_drop:
+ free(tinfo);
+ return APP_STATE_DROPME | APP_STATE_DROPPKT;
+}
+
+/*
+ add by lijia 20190604.
+*/
+static inline int is_gdev_keepalive_pkt(const struct ip *iphdr)
+{
+ const struct udphdr *udh;
+
+ if (NULL == iphdr)
+ {
+ return 0;
+ }
+
+ if (iphdr->ip_p != 17)
+ {
+ return 0;
+ }
+
+ udh = (struct udphdr *)((char *)iphdr + iphdr->ip_hl * 4);
+
+ if (udh->dest == ntohs(3784))
+ {
+ return 1;
+ }
+
+ return 0;
+}
+
+static int udp_flow_id = -1;
+char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
+{
+ struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
+ struct traffic_info *tinfo;
+
+ if (is_gdev_keepalive_pkt((const struct ip *)raw_pkt) != 0)
+ { //add by lijia 20190604, drop BFD keepalive packet.
+ return APP_STATE_DROPME;
+ }
+
+ if (-1 == udp_flow_id)
+ {
+ udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct");
+ if (-1 == udp_flow_id)
+ {
+ printf("'udp_flow_stat' is disable, no statistics\n");
+ }
+ }
+
+ if (pstream->opstate == OP_STATE_PENDING)
+ {
+ tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
+
+ /* PROTOCOL */
+ switch (pstream->addr.addrtype)
+ {
+ case ADDR_TYPE_IPV4:
+ tinfo->protocol = 2; // IPV4_UDP
+ tinfo->vx_type = 0x0800;
+ break;
+ case ADDR_TYPE_IPV6:
+ tinfo->protocol = 4; // IPV6_UDP
+ tinfo->vx_type = 0x86DD;
+ break;
+ case ADDR_TYPE_ARP:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0x0806;
+ break;
+ default:
+ tinfo->protocol = 0;
+ tinfo->vx_type = 0;
+ break;
+ }
+ /* vx_UDP_header_src_port dst_port*/
+ tinfo->vx_UDP_header_src_port = get_vx_UDP_header_src_port(pstream);
+ tinfo->vx_UDP_header_dst_port = 4789;
+ /* vx_ip_header_src_ip dst_ip*/
+ if (get_vxlan_ip_addr(pstream, tinfo) < 0)
+ {
+ goto error_drop;
+ }
+ /* IPv4、IPv6头部信息 */
+ get_ip_detail(pstream, tinfo, raw_pkt);
+ /* 应用层协议类型 用目的端口表示*/
+ tinfo->PROTO_TYPE = get_proto_type(pstream);
+
+ *pme = tinfo;
+ }
+ tinfo = (struct traffic_info *)(*pme);
+
+ /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
+ if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
+ {
+ //printf("UDP_ENTRY SUCCESS!!!\n");
+
+ if (pdetail != NULL)
+ {
+ /* 获取包数字节数 */
+ // tinfo->C2S_pkt_num = pdetail->serverpktnum;
+ // tinfo->S2C_pkt_num = pdetail->clientpktnum;
+ // tinfo->C2S_bytes = pdetail->serverbytes;
+ // tinfo->S2C_bytes = pdetail->clientbytes;
+ // if (tinfo->C2S_bytes < 0)
+ // {
+ // tinfo->C2S_bytes = 0;
+ // }
+ // if (tinfo->S2C_bytes < 0)
+ // {
+ // tinfo->S2C_bytes = 0;
+ // }
+
+ if (udp_flow_id != -1)
+ {
+ struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt;
+ tinfo->C2S_bytes = tflow->C2S_byte;
+ tinfo->S2C_bytes = tflow->S2C_byte;
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ // tinfo->stat_time = time(0);
+ gettimeofday(&tinfo->stat_time, NULL);
+
+ print_traffic_info(tinfo, pstream);
+ free(tinfo);
+ // printf("\n");
+ }
+ return DEFAULT_RETURN_VALUE;
+
+error_drop:
+ free(tinfo);
+ return APP_STATE_DROPME | APP_STATE_DROPPKT;
+}
+
+static char return_action_cb_fun(int net_conn_mode, char plug_action)
+{
+ return 0; //所有包默认都DROP
+}
+
+int CHAR_INIT()
+{
+ int demo_plugid = 51;
+ runtime_log_handler = NULL;
+ kafka_log_handler = NULL;
+ char str_tmp[128];
+
+ int log_level = 30;
+
+ MESA_load_profile_int_def(entrance_id_path, "LOG", "log_level", &log_level, 30);
+
+ runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level);
+ kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level);
+ if (runtime_log_handler == NULL || kafka_log_handler == NULL)
+ {
+ /* code */
+ printf("MESA_create_runtime_log_handle failed!!!");
+ return -1;
+ }
+ /* VPN_ID drop */
+ char vpn_id[256];
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "VPN_ID_DROP", vpn_id, sizeof(vpn_id), "");
+ char *buff;
+ buff = vpn_id;
+ if (strlen(vpn_id) != 0)
+ {
+ char *id_str;
+ id_str = strsep(&buff, ",");
+ while (id_str != NULL)
+ {
+ int id = atoi(id_str);
+ if (id > 0 && id < 256)
+ {
+ vpn_id_drop[id] = 1;
+ }
+ id_str = strsep(&buff, ",");
+ }
+ }
+ /* ENTRANCE_ID */
+ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0);
+ /* FLOW_TYPE */
+ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX);
+ /* Brokers */
+ // MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#");
+ /* vx_ip_header_dst_ip */
+ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#");
+ if ('#' == str_tmp[0])
+ {
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "can't get %s->sendto_gdev_ip!!", gdev_conf_path);
+ return -1;
+ }
+ /* load gdev ip from conf/gdev.conf*/
+ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", sendto_gdev_ip, sizeof(sendto_gdev_ip), "#");
+
+ int ret, conf_ret_val;
+ ret = MESA_load_profile_int_def(entrance_id_path, "SETTING", "__DEBUG_RETURN_VALUE", &conf_ret_val, (APP_STATE_GIVEME | APP_STATE_DROPPKT));
+ if (ret >= 0)
+ {
+ /* debug模式下, 临时开启回注, 用于测试 */
+ if ((conf_ret_val != APP_STATE_GIVEME) && (conf_ret_val != (APP_STATE_GIVEME | APP_STATE_DROPPKT)) && (conf_ret_val != (APP_STATE_DROPME | APP_STATE_DROPPKT)))
+ {
+ printf("config __DEBUG_RETURN_VALUE invalid!");
+ exit(1);
+ }
+ DEFAULT_RETURN_VALUE = (char)conf_ret_val;
+ g_business_plug_type = 1;
+ }
+ else
+ {
+ platform_register_action_judge(return_action_cb_fun);
+ g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */
+ }
+
+ MESA_load_profile_int_def(entrance_id_path, "SETTING", "__ERROR_COREDUMP", &error_coredump, 0);
+
+ inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net);
+
+ /* kafka初始化*/
+ // if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
+ // {
+ // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
+ // return -1;
+ // }
+
+#if 0
+ //GSJ Work thread start
+ int ret_thread;
+ ret_thread = pthread_create(&GSJ_Work, NULL, (void *)Work, NULL);
+ if(ret_thread == 0)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start success\n");
+ pthread_detach(GSJ_Work);
+ }
+ else
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "GSJ thread start failed\n");
+ }
+#endif
+ // 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n");
+ return demo_plugid;
+}
+
+void LRJ_APP_DESTROY()
+{
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n");
+ printf("TEST_APP_DESTORY in...\n");
+ // kafka_destroy();
+ if (runtime_log_handler == NULL)
+ {
+ printf("TEST_APP_DESTORY out...\n");
+ return;
+ }
+ MESA_destroy_runtime_log_handle(runtime_log_handler);
+ MESA_destroy_runtime_log_handle(kafka_log_handler);
+ printf("TEST_APP_DESTORY out...\n");
+}
+
diff --git a/plug_run_evn/conf/lrj_vxlan.inf b/plug_run_evn/conf/lrj_vxlan.inf
new file mode 100644
index 0000000..f34fdb2
--- /dev/null
+++ b/plug_run_evn/conf/lrj_vxlan.inf
@@ -0,0 +1,17 @@
+[PLUGINFO]
+PLUGNAME=lrj_vxlan
+SO_PATH=./plug/business/lrj_vxlan/lirenjie_vxlan_sapp.so
+INIT_FUNC=CHAR_INIT
+#DESTROY_FUNC=CHAR_DESTROY
+
+[TCP_ALL]
+FUNC_FLAG=ALL
+FUNC_NAME=TCP_ENTRY_ALL
+
+[TCP]
+FUNC_FLAG=ALL
+FUNC_NAME=TCP_ENTRY
+
+[UDP]
+FUNC_FLAG=ALL
+FUNC_NAME=UDP_ENTRY
diff --git a/plug_run_evn/conf/lrj_vxlan_sapp.conf b/plug_run_evn/conf/lrj_vxlan_sapp.conf
new file mode 100644
index 0000000..fa9f093
--- /dev/null
+++ b/plug_run_evn/conf/lrj_vxlan_sapp.conf
@@ -0,0 +1,15 @@
+[LOG]
+log_level=10
+
+[SETTING]
+VPN_ID_DROP=0
+ENTRANCE_ID=0
+FLOW_TYPE=0
+BROKERS=192.168.40.137:9092
+
+__DEBUG_RETURN_VALUE=16
+__ERROR_COREDUMP=0
+
+[Module]
+sendto_gdev_ip=172.24.5.45
+