summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorlijia <[email protected]>2019-08-29 14:38:57 +0800
committerlijia <[email protected]>2019-08-29 14:38:57 +0800
commit43f1659a49fb641f9eef0cefe382bf3b4dcec43a (patch)
treee639b09c7af6574a1539be1d22c7fab8e571d582
parentf36943accb842913507d1b198cadbd2b05f08118 (diff)
合并丢弃指定vpnid功能; 修复flow_type=1时IP地址方向错误;lijia_modify
-rw-r--r--lirenjie_vxlan_sapp.c143
1 files changed, 90 insertions, 53 deletions
diff --git a/lirenjie_vxlan_sapp.c b/lirenjie_vxlan_sapp.c
index 7ab2bbf..0af5ce2 100644
--- a/lirenjie_vxlan_sapp.c
+++ b/lirenjie_vxlan_sapp.c
@@ -13,15 +13,18 @@
#include <netinet/ip.h>
#include <netinet/ip6.h>
#include <netinet/udp.h>
+#include <pthread.h>
#include "MESA_handle_logger.h"
#include "MESA_prof_load.h"
#include "stream.h"
#include "rdkafka.h"
-int version_20190730;
+int version_20190827_1605;
-#define DEFAULT_RETURN_VALUE (APP_STATE_GIVEME | APP_STATE_DROPPKT)
+static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT);
+
+static int error_coredump = 0;
extern unsigned char vxlan_sport_map_to_service_id(unsigned short sport_host_order);
extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
@@ -29,6 +32,7 @@ extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mod
extern int g_business_plug_type;
#define MAX_LOG_INFO_LEN 256
+#define MAX_VPN_ID_NUM 256
#define MAX_TRAFFIC_INFO_LEN 1024
static const char *module_name = "lirenjie_vxlan";
static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
@@ -39,16 +43,17 @@ static void *runtime_log_handler;
static void *kafka_log_handler;
//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */
+static unsigned int vpn_id_drop[MAX_VPN_ID_NUM]={0}; /* 0 do not drop, 1 drop */
-#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */
+#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回�? */
-static unsigned int entrance_id; /* 局点ID 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+static unsigned int entrance_id; /* 局点ID 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�?*/
enum flow_type_t{
FLOW_TYPE_REFLUX = 0, /* 回流 */
FLOW_TYPE_INJECT = 1, /* 回注 */
};
-static unsigned int flow_type; /* 回流0/回注1 读配置文件 ./conf/lrj_vxlan_sapp.conf 默认为0*/
+static unsigned int flow_type; /* 回流0/回注1 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�? */
/* kafka */
static const int PRODUCER_INIT_FAILED = -1;
@@ -79,13 +84,13 @@ struct traffic_info
UINT32 S2C_bytes; /* S2C, you should better use stream_project.h : struct udp_flow_stat */
//struct tm *systime; // date YYYY-MM-DD %04d-%02d-%02d systime->tm_year + 1900, systime->tm_mon + 1, systime->tm_mday
// time_t stat_time; // 秒级
- struct timeval stat_time; //微妙级
- unsigned int vx_ip_header_src_ip_net;
+ struct timeval stat_time;
+ unsigned int vx_ip_header_src_ip_net;
unsigned int vx_ip_header_dst_ip_net;
unsigned short vx_UDP_header_src_port;
unsigned short vx_UDP_header_dst_port;
struct layer_addr addr;
- //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN号 */
+ //unsigned char service_id; /* Vlan ID或特定的标签值对应的VPN�? */
int vxlan_vpn_id;
unsigned short vx_type; //IPv4=0x0800 IPv6=0x86DD Arp=0x0806
/* ipv4 src_ip dst_ip identification fragment_offset*/
@@ -127,9 +132,10 @@ static int init_kafka(int partition_, char *brokers_, char *topic_)
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
+ //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000",errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "request.required.acks", "1", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
/*topic configuration*/
topic_conf = rd_kafka_topic_conf_new();
@@ -177,8 +183,8 @@ static int push_data_to_kafka(char *buffer, int buf_len)
{
return 0;
}
- ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
- //ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ //ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
if (ret == -1)
{
/*fprintf(stderr,
@@ -211,7 +217,7 @@ unsigned char get_service_id(struct streaminfo *pstream)
unsigned short vxlan_sport; /* 由源端口获取当前包所属业务号 vxlan_sport_map_to_service_id*/
unsigned char service_id;
- /* 获取vxlan_info结构体 */
+ /* 获取vxlan_info结构�?*/
struct vxlan_info *vxlan;
int opt_val_len = sizeof(struct vxlan_info);
ret = MESA_get_stream_opt(pstream, MSO_STREAM_VXLAN_INFO, vxlan, &opt_val_len);
@@ -343,7 +349,7 @@ static unsigned char get_service_id_from_sport(struct streaminfo *pstream)
}
#endif
-
+#if 0
static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_gdev_ip_net)
{
if((ntohl(packet_gdev_ip_net) & IDENTIFY_LOCAL_IP_SUBNET_MASK) ==
@@ -353,19 +359,20 @@ static int is_same_sub_net(unsigned int packet_gdev_ip_net, unsigned int local_g
return 0;
}
+#endif
static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *tinfo)
{
- int gdev_ip_net, local_dev_ip;
+ int gdev_ip_net = 0, local_dev_ip = 0;
int ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_GDEV_IP, &gdev_ip_net);
if (ret >= 0)
{
#if 0
- /* 回流/回注流量已经区分,接入不同的机器, 靠配置文件指定 */
+ /* 回流/回注流量已经区分,接入不同的机�? 靠配置文件指�?*/
if(is_same_sub_net(gdev_ip_net, sapp_keepalive_reflux_ip_net)){
- flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网段, 说明是回注包 */
+ flow_type = FLOW_TYPE_INJECT; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP在一个网�? 说明是回注包 */
}else{
- flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网段, 说明是回流包 */
+ flow_type = FLOW_TYPE_REFLUX; /* 从驱动获取的GDEV IP(vxlan->srcip), 和本机IP不在一个网�? 说明是回流包 */
}
if(gdev_ip_net == sapp_keepalive_reflux_ip_net){ /* add by lijia 20190611 */
@@ -374,40 +381,36 @@ static int get_vxlan_ip_addr(struct streaminfo *pstream, struct traffic_info *ti
}
#endif
- if(FLOW_TYPE_REFLUX == flow_type){
- tinfo->vx_ip_header_src_ip_net = gdev_ip_net;
- ret = 0;
- }else{
- tinfo->vx_ip_header_dst_ip_net = gdev_ip_net;
- ret = 0;
- }
+ tinfo->vx_ip_header_src_ip_net = gdev_ip_net;
+ ret = 0;
+
}
else
{
tinfo->vx_ip_header_src_ip_net = 0;
tinfo->vx_ip_header_dst_ip_net = 0;
ret = -1;
+ if(error_coredump){
+ assert(0);
+ }
}
ret = get_rawpkt_opt_from_streaminfo(pstream, RAW_PKT_GET_VXLAN_LOCAL_IP, &local_dev_ip);
if (ret >= 0)
{
- if(FLOW_TYPE_REFLUX == flow_type){
- tinfo->vx_ip_header_dst_ip_net = local_dev_ip;
- ret = 0;
- }else{
- tinfo->vx_ip_header_src_ip_net = local_dev_ip;
- ret = 0;
- }
+ tinfo->vx_ip_header_dst_ip_net = local_dev_ip;
+ ret = 0;
}
else
{
tinfo->vx_ip_header_src_ip_net = 0;
tinfo->vx_ip_header_dst_ip_net = 0;
- ret = -1;
+ ret = -1;
+ if(error_coredump){
+ assert(0);
+ }
}
-
return ret;
}
@@ -425,7 +428,7 @@ static unsigned short get_vx_UDP_header_src_port(struct streaminfo *pstream)
}
}
-/* 获取pstream中的四元组 */
+/* 获取pstream中的四元�?*/
#if 0
static void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
{
@@ -465,7 +468,7 @@ static void get_tuple4(struct streaminfo *pstream, unsigned char service_id)
inet_ntop(AF_INET6, &(tuple4_v6->daddr), dip, INET6_ADDRSTRLEN);
printf("--->%s:%d -> %s:%d\n", sip, ntohs(tuple4_v6->source), dip, ntohs(tuple4_v6->dest));
- /* 获取业务号 */
+ /* 获取业务�?*/
// unsigned char service_id = get_service_id(pstream);
char info[MAX_LOG_INFO_LEN] = {0};
@@ -673,8 +676,7 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (pstream->opstate == OP_STATE_PENDING)
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
- //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
- //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待�? //tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
/* PROTOCOL */
switch (pstream->addr.addrtype)
@@ -705,13 +707,13 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
- /* 应用层协议类型 用目的端口表示 */
+ /* 应用层协议类�?用目的端口表�?*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
*pme = tinfo;
}
tinfo = (struct traffic_info *)(*pme);
- /* 自己统计包数字节数 *//*
+ /* 自己统计包数字节�?*//*
if(raw_pdetail->datalen > 0)
{
if(DIR_C2S == pstream->curdir)
@@ -726,16 +728,17 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
}
*/
- if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
+ /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
+ if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
{
//printf("TCP_ENTRY SUCCESS!!!\n");
- /* 获取包数字节数 */
+ /* 获取包数字节�?*/
tinfo->C2S_pkt_num = raw_pdetail->serverpktnum;
tinfo->S2C_pkt_num = raw_pdetail->clientpktnum;
tinfo->C2S_bytes = raw_pdetail->serverbytes;
tinfo->S2C_bytes = raw_pdetail->clientbytes;
- /* 另一种获取包数字节数的方法 */
+ /* 另一种获取包数字节数的方�?*/
/*
if(tcp_flow_id != -1)
{
@@ -810,8 +813,7 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
if (pstream->opstate == OP_STATE_PENDING)
{
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
- //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定
- //tinfo->service_id = get_service_id_from_vxlanid(pstream);
+ //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待�? //tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
/* PROTOCOL */
@@ -843,14 +845,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
}
/* IPv4、IPv6头部信息 */
get_ip_detail(pstream, tinfo, raw_pkt);
- /* 应用层协议类型 用目的端口表示 */
+ /* 应用层协议类�?用目的端口表�?*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
*pme = tinfo;
}
tinfo = (struct traffic_info *)(*pme);
- if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0)
+ /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
+ if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
{
//printf("UDP_ENTRY SUCCESS!!!\n");
@@ -919,6 +922,25 @@ int CHAR_INIT()
printf("MESA_create_runtime_log_handle failed!!!");
return -1;
}
+ /* VPN_ID drop */
+ char vpn_id[256];
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "VPN_ID_DROP", vpn_id, sizeof(vpn_id),"");
+ char * buff;
+ buff = vpn_id;
+ if(strlen(vpn_id) != 0)
+ {
+ char *id_str;
+ id_str = strsep(&buff,",");
+ while(id_str != NULL)
+ {
+ int id = atoi(id_str);
+ if(id >0 && id < 256)
+ {
+ vpn_id_drop[id] = 1;
+ }
+ id_str = strsep(&buff,",");
+ }
+ }
/* ENTRANCE_ID */
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "ENTRANCE_ID", &entrance_id, 0);
/* FLOW_TYPE */
@@ -931,21 +953,36 @@ int CHAR_INIT()
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name,"can't get %s->sendto_gdev_ip!!", gdev_conf_path);
return -1;
}
+
+ int ret, conf_ret_val;
+ ret = MESA_load_profile_int_def(entrance_id_path, "SETTING", "__DEBUG_RETURN_VALUE", &conf_ret_val, (APP_STATE_GIVEME | APP_STATE_DROPPKT));
+ if(ret >= 0){
+ /* debug模式下, 临时开启回注, 用于测试 */
+ if((conf_ret_val != APP_STATE_GIVEME )
+ &&(conf_ret_val != (APP_STATE_GIVEME | APP_STATE_DROPPKT))
+ &&(conf_ret_val != (APP_STATE_DROPME | APP_STATE_DROPPKT))){
+ printf("config __DEBUG_RETURN_VALUE invalid!");
+ exit(1);
+ }
+ DEFAULT_RETURN_VALUE = (char )conf_ret_val;
+ g_business_plug_type = 1;
+ }else{
+ platform_register_action_judge(return_action_cb_fun);
+ g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */
+ }
+
+ MESA_load_profile_int_def(entrance_id_path, "SETTING", "__ERROR_COREDUMP", &error_coredump, 0);
+
inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net);
- /* kafka初始化 */
+ /* kafka初始�?*/
if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
{
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
return -1;
}
- platform_register_action_judge(return_action_cb_fun);
- g_business_plug_type = 0; /* 非常规办法, 这是串联插件总控的内部变量, 0:JC; 1:GK, 0默认丢弃所有包 */
-
- // 函数实现自定义
- // 只要求函数返回值为插件ID;
- //printf("INIT SUCCESS!!!\n");
+ // 函数实现自定�? // 只要求函数返回值为插件ID�? //printf("INIT SUCCESS!!!\n");
return demo_plugid;
}