From 43f1659a49fb641f9eef0cefe382bf3b4dcec43a Mon Sep 17 00:00:00 2001 From: lijia Date: Thu, 29 Aug 2019 14:38:57 +0800 Subject: 合并丢弃指定vpnid功能; 修复flow_type=1时IP地址方向错误; MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lirenjie_vxlan_sapp.c | 143 +++++++++++++++++++++++++++++++------------------- 1 file changed, 90 insertions(+), 53 deletions(-) diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c index 7ab2bbf..0af5ce2 100644 --- a/lirenjie_vxlan_sapp.c +++ b/lirenjie_vxlan_sapp.c @@ -13,15 +13,18 @@ #include #include #include +#include #include "MESA_handle_logger.h" #include "MESA_prof_load.h" #include "stream.h" #include "rdkafka.h" -int version_20190730; +int version_20190827_1605; -#define DEFAULT_RETURN_VALUE (APP_STATE_GIVEME | APP_STATE_DROPPKT) +static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT); + +static int error_coredump = 0; extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order); extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order); @@ -29,6 +32,7 @@ extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mod extern int g_business_plug_type; #define MAX_LOG_INFO_LEN 256 +#define MAX_VPN_ID_NUM 256 #define MAX_TRAFFIC_INFO_LEN 1024 static const char *module_name = "lirenjie_vxlan"; static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; @@ -39,16 +43,17 @@ static void *runtime_log_handler; static void *kafka_log_handler; //char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */ +static unsigned int vpn_id_drop[MAX_VPN_ID_NUM]={0}; /* 0 do not drop, 1 drop */ -#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */ +#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回? */ -static unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ +static unsigned int entrance_id; /* 局点ID 读配置文?./conf/lrj_vxlan_sapp.conf 默认?*/ enum flow_type_t{ FLOW_TYPE_REFLUX = 0, /* 回流 */ FLOW_TYPE_INJECT = 1, /* 回注 */ }; -static unsigned int flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/ +static unsigned int flow_type; /* 回流0/回注1 读配置文?./conf/lrj_vxlan_sapp.conf 默认? */ /* kafka */ static const int PRODUCER_INIT_FAILED = -1; @@ -79,13 +84,13 @@ struct traffic_info UINT32 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */ //struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday // time_t stat_time; // 秒级 - struct timeval stat_time; //微妙级 - unsigned int vx_ip_header_src_ip_net; + struct timeval stat_time; + unsigned int vx_ip_header_src_ip_net; unsigned int vx_ip_header_dst_ip_net; unsigned short vx_UDP_header_src_port; unsigned short vx_UDP_header_dst_port; struct layer_addr addr; - //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */ + //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN? */ int vxlan_vpn_id; unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806 /* ipv4 src_ip dst_ip identification fragment_offset*/ @@ -127,9 +132,10 @@ static int init_kafka(int partition_, char *brokers_, char *topic_) /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); + //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "request.required.acks", "1", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); /*topic configuration*/ topic_conf = rd_kafka_topic_conf_new(); @@ -177,8 +183,8 @@ static int push_data_to_kafka(char *buffer, int buf_len) { return 0; } - ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); - //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + //ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); if (ret == -1) { /*fprintf(stderr, @@ -211,7 +217,7 @@ unsigned char get_service_id(struct streaminfo *pstream) unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/ unsigned char service_id; - /* 获取vxlan_info结构体 */ + /* 获取vxlan_info结构?*/ struct vxlan_info *vxlan; int opt_val_len = sizeof(struct vxlan_info); ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len); @@ -343,7 +349,7 @@ static unsigned char get_service_id_from_sport(struct streaminfo *pstream) } #endif - +#if 0 static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net) { if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) == @@ -353,19 +359,20 @@ static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_g return 0; } +#endif static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo) { - int gdev_ip_net, local_dev_ip; + int gdev_ip_net = 0, local_dev_ip = 0; int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net); if (ret >= 0) { #if 0 - /* 回流/回注流量已经区分,接入不同的机器, 靠配置文件指定 */ + /* 回流/回注流量已经区分,接入不同的机? 靠配置文件指?*/ if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){ - flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网段, 说明是回注包 */ + flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网? 说明是回注包 */ }else{ - flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网段, 说明是回流包 */ + flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网? 说明是回流包 */ } if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */ @@ -374,40 +381,36 @@ static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *ti } #endif - if(FLOW_TYPE_REFLUX == flow_type){ - tinfo->vx_ip_header_src_ip_net = gdev_ip_net; - ret = 0; - }else{ - tinfo->vx_ip_header_dst_ip_net = gdev_ip_net; - ret = 0; - } + tinfo->vx_ip_header_src_ip_net = gdev_ip_net; + ret = 0; + } else { tinfo->vx_ip_header_src_ip_net = 0; tinfo->vx_ip_header_dst_ip_net = 0; ret = -1; + if(error_coredump){ + assert(0); + } } ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_LOCAL_IP, &local_dev_ip); if (ret >= 0) { - if(FLOW_TYPE_REFLUX == flow_type){ - tinfo->vx_ip_header_dst_ip_net = local_dev_ip; - ret = 0; - }else{ - tinfo->vx_ip_header_src_ip_net = local_dev_ip; - ret = 0; - } + tinfo->vx_ip_header_dst_ip_net = local_dev_ip; + ret = 0; } else { tinfo->vx_ip_header_src_ip_net = 0; tinfo->vx_ip_header_dst_ip_net = 0; - ret = -1; + ret = -1; + if(error_coredump){ + assert(0); + } } - return ret; } @@ -425,7 +428,7 @@ static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream) } } -/* 获取pstream中的四元组 */ +/* 获取pstream中的四元?*/ #if 0 static void get_tuple4(struct streaminfo *pstream, unsigned char service_id) { @@ -465,7 +468,7 @@ static void get_tuple4(struct streaminfo *pstream, unsigned char service_id) inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN); printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest)); - /* 获取业务号 */ + /* 获取业务?*/ // unsigned char service_id = get_service_id(pstream); char info[MAX_LOG_INFO_LEN] = {0}; @@ -673,8 +676,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (pstream->opstate == OP_STATE_PENDING) { tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); - //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 - //tinfo->service_id = get_service_id_from_vxlanid(pstream); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待? //tinfo->service_id = get_service_id_from_vxlanid(pstream); tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); /* PROTOCOL */ switch (pstream->addr.addrtype) @@ -705,13 +707,13 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } /* IPv4、IPv6头部信息 */ get_ip_detail(pstream, tinfo, raw_pkt); - /* 应用层协议类型 用目的端口表示 */ + /* 应用层协议类?用目的端口表?*/ tinfo->PROTO_TYPE = get_proto_type(pstream); *pme = tinfo; } tinfo = (struct traffic_info *)(*pme); - /* 自己统计包数字节数 *//* + /* 自己统计包数字节?*//* if(raw_pdetail->datalen > 0) { if(DIR_C2S == pstream->curdir) @@ -726,16 +728,17 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } } */ - if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) + /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ + if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) { //printf("TCP_ENTRY SUCCESS!!!\n"); - /* 获取包数字节数 */ + /* 获取包数字节?*/ tinfo->C2S_pkt_num = raw_pdetail->serverpktnum; tinfo->S2C_pkt_num = raw_pdetail->clientpktnum; tinfo->C2S_bytes = raw_pdetail->serverbytes; tinfo->S2C_bytes = raw_pdetail->clientbytes; - /* 另一种获取包数字节数的方法 */ + /* 另一种获取包数字节数的方?*/ /* if(tcp_flow_id != -1) { @@ -810,8 +813,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi if (pstream->opstate == OP_STATE_PENDING) { tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); - //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 - //tinfo->service_id = get_service_id_from_vxlanid(pstream); + //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待? //tinfo->service_id = get_service_id_from_vxlanid(pstream); tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); /* PROTOCOL */ @@ -843,14 +845,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi } /* IPv4、IPv6头部信息 */ get_ip_detail(pstream, tinfo, raw_pkt); - /* 应用层协议类型 用目的端口表示 */ + /* 应用层协议类?用目的端口表?*/ tinfo->PROTO_TYPE = get_proto_type(pstream); *pme = tinfo; } tinfo = (struct traffic_info *)(*pme); - if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) + /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ + if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) { //printf("UDP_ENTRY SUCCESS!!!\n"); @@ -919,6 +922,25 @@ int CHAR_INIT() printf("MESA_create_runtime_log_handle failed!!!"); return -1; } + /* VPN_ID drop */ + char vpn_id[256]; + MESA_load_profile_string_def(entrance_id_path, "SETTING", "VPN_ID_DROP", vpn_id, sizeof(vpn_id),""); + char * buff; + buff = vpn_id; + if(strlen(vpn_id) != 0) + { + char *id_str; + id_str = strsep(&buff,","); + while(id_str != NULL) + { + int id = atoi(id_str); + if(id >0 && id < 256) + { + vpn_id_drop[id] = 1; + } + id_str = strsep(&buff,","); + } + } /* ENTRANCE_ID */ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0); /* FLOW_TYPE */ @@ -931,21 +953,36 @@ int CHAR_INIT() MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"can't get %s->sendto_gdev_ip!!", gdev_conf_path); return -1; } + + int ret, conf_ret_val; + ret = MESA_load_profile_int_def(entrance_id_path, "SETTING", "__DEBUG_RETURN_VALUE", &conf_ret_val, (APP_STATE_GIVEME | APP_STATE_DROPPKT)); + if(ret >= 0){ + /* debug模式下, 临时开启回注, 用于测试 */ + if((conf_ret_val != APP_STATE_GIVEME ) + &&(conf_ret_val != (APP_STATE_GIVEME | APP_STATE_DROPPKT)) + &&(conf_ret_val != (APP_STATE_DROPME | APP_STATE_DROPPKT))){ + printf("config __DEBUG_RETURN_VALUE invalid!"); + exit(1); + } + DEFAULT_RETURN_VALUE = (char )conf_ret_val; + g_business_plug_type = 1; + }else{ + platform_register_action_judge(return_action_cb_fun); + g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */ + } + + MESA_load_profile_int_def(entrance_id_path, "SETTING", "__ERROR_COREDUMP", &error_coredump, 0); + inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net); - /* kafka初始化 */ + /* kafka初始?*/ if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) { MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); return -1; } - platform_register_action_judge(return_action_cb_fun); - g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */ - - // 函数实现自定义 - // 只要求函数返回值为插件ID; - //printf("INIT SUCCESS!!!\n"); + // 函数实现自定? // 只要求函数返回值为插件ID? //printf("INIT SUCCESS!!!\n"); return demo_plugid; } -- cgit v1.2.3