summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author李仁杰 <[email protected]>2020-08-19 12:03:51 +0800
committer李仁杰 <[email protected]>2020-08-19 12:03:51 +0800
commit782e778e7d7e206d7192a70cffdfd0ca20619271 (patch)
treed4a864909c6ba9238b9655556e1b508cef3a2621
parent7946812e0e4d1d307fe8df707e0875154504d11d (diff)
Update lirenjie_vxlan_sapp_20200803.c 20200819,重写了逻辑,不再调用GSJ程序。按照长安的需求新加了两个逻辑:1、ip取模16,余1发送数据 2、统计(vxlan_vpn_id,vxlan_sip_ip_str/vxlan_dip_ip_str)二元组上的包数。
-rw-r--r--lirenjie_vxlan_sapp_20200803.c735
1 files changed, 406 insertions, 329 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c
index 82976d7..f980e2f 100644
--- a/lirenjie_vxlan_sapp_20200803.c
+++ b/lirenjie_vxlan_sapp_20200803.c
@@ -19,8 +19,7 @@
#include "stream.h"
#include "rdkafka.h"
#include "libGSJ.h"
-#include "cJSON.h"
-#include "MESA_list_queue.h"
+#include "MESA_htable.h"
int version_20190827_1605;
static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT);
@@ -33,8 +32,8 @@ extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mod
extern int g_business_plug_type;
#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803
-#define ENABLE_GSJ_THREAD 1 //是否启用GSJ
-#define ENABLE_TIMER 0
+#define ENABLE_GSJ_THREAD 0 //是否启用GSJ
+#define ENABLE_TIMER 1
#define MAX_LOG_INFO_LEN 256
#define MAX_VPN_ID_NUM 512
@@ -42,23 +41,27 @@ extern int g_business_plug_type;
static const char *module_name = "lirenjie_vxlan";
static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log";
static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log";
+static const char *kafka_stat_path = "./log/lirenjie_vxlan/kafka_stat.log";
static const char *gdev_conf_path = "./conf/gdev.conf";
static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf";
static void *runtime_log_handler;
static void *kafka_log_handler;
+static void *kafka_stat_handler;
//char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip
static char sendto_gdev_ip[INET_ADDRSTRLEN]; //conf/gdev.conf sendto_gdev_ip
static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */
static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop */
-#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回�? */
+#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */
-static unsigned int entrance_id; /* 局点ID 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�?*/
+static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认�?*/
UINT64 all_stream_pkt_num[64] = {0};
UINT64 close_stream_pkt_num[64] = {0};
UINT64 push_count[64] = {0};
UINT64 error_count[64] = {0};
+UINT64 vlanid_ip_push_count = 0;
+UINT64 vlanid_ip_pkt_num = 0;
enum flow_type_t
{
@@ -77,14 +80,19 @@ static int partition;
static rd_kafka_t *kafka_producer;
static rd_kafka_conf_t *conf;
// topic
-static rd_kafka_topic_t *rkt;
-static rd_kafka_topic_conf_t *topic_conf;
+static rd_kafka_topic_t *rkt_traffic_stat; //统计数据topic
+static rd_kafka_topic_t *rkt_sum; //二元组topic
+static rd_kafka_topic_conf_t *topic_conf_traffic_stat;
+static rd_kafka_topic_conf_t *topic_conf_sum;
// char errstr[512]={0};
//static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092";
//static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092";
-// char brokers[128];
-
-static char *topic = "G_BACK_TRAFFIC_STATISTIC_new";
+// static char *brokers = "10.208.133.126:9092,10.208.133.128:9092,10.208.133.133:9092,10.208.133.162:9092,10.208.133.166:9092";
+char brokers[128];
+char topic_traffic_stat[64];
+char topic_sum[64];
+// static char *topic1 = "G_BACK_TRAFFIC_STATISTIC_new";
+// static char *topic2 = "Gsj_Sum";
//GSJ thread
pthread_t GSJ_Work;
@@ -92,11 +100,15 @@ pthread_t GSJ_Work;
pthread_t count;
//timer thread
pthread_t timer;
+//vlanid_ip thread
+pthread_t vlanid_ip_pthread_t;
int time_out[32] = {0}; //1:time out,push data
int push_buffer_count[32] = {0};
char *thread_push_buffer[32];
#define PUSH_BUFFER_COUNT 10
+MESA_htable_handle vlanid_ip_htable_handle;
+
struct traffic_info
{
unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0
@@ -143,31 +155,39 @@ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char
level, fac, rk ? rd_kafka_name(rk) : NULL, buf);
}
-static int init_kafka(int partition_, char *brokers_, char *topic_)
+static int kafka_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque)
+{
+ MESA_handle_runtime_log(kafka_stat_handler, RLOG_LV_FATAL, module_name, "%s", json);
+ return 1;
+}
+
+static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, char *topic_sum)
{
char tmp[16];
char errstr[1024];
partition = partition_;
/* Kafka configuration */
conf = rd_kafka_conf_new();
+ if (conf == NULL)
+ {
+ //fprintf(stderr, "***** Failed to create new conf *******\n");
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******");
+ return PRODUCER_INIT_FAILED;
+ }
//set logger :register log function
rd_kafka_conf_set_log_cb(conf, logger);
+ rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
+ rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
- //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", "2000000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
/*topic configuration*/
- topic_conf = rd_kafka_topic_conf_new();
+ topic_conf_traffic_stat = rd_kafka_topic_conf_new();
+ topic_conf_sum = rd_kafka_topic_conf_new();
- if (conf == NULL)
- {
- //fprintf(stderr, "***** Failed to create new conf *******\n");
- MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******");
- return PRODUCER_INIT_FAILED;
- }
kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr));
if (kafka_producer == NULL)
{
@@ -188,18 +208,20 @@ static int init_kafka(int partition_, char *brokers_, char *topic_)
return PRODUCER_INIT_FAILED;
}
/* Create topic */
- rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf);
+ rkt_traffic_stat = rd_kafka_topic_new(kafka_producer, topic_traffic_stat, topic_conf_traffic_stat);
+ rkt_sum = rd_kafka_topic_new(kafka_producer, topic_sum, topic_conf_sum);
return PRODUCER_INIT_SUCCESS;
}
static void kafka_destroy()
{
- rd_kafka_topic_destroy(rkt);
+ rd_kafka_topic_destroy(rkt_traffic_stat);
+ rd_kafka_topic_destroy(rkt_sum);
rd_kafka_destroy(kafka_producer);
}
-static int push_data_to_kafka(char *buffer, int buf_len)
+static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic)
{
int ret;
if (buffer == NULL)
@@ -207,7 +229,7 @@ static int push_data_to_kafka(char *buffer, int buf_len)
return 0;
}
//ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
- ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
+ ret = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL);
if (ret == -1)
{
/*fprintf(stderr,
@@ -216,27 +238,29 @@ static int push_data_to_kafka(char *buffer, int buf_len)
rd_kafka_topic_name(rkt), partition,
rd_kafka_err2str(rd_kafka_last_error()));*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "%% Failed to produce to topic %s partition %i: %s",
- rd_kafka_topic_name(rkt), partition,
+ rd_kafka_topic_name(topic), partition,
rd_kafka_err2str(rd_kafka_last_error()));
/* Poll to handle delivery reports */
- //rd_kafka_poll(kafka_producer, 0);
+ rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_FAILED;
}
/*fprintf(stderr, "%% Sent %zd bytes to topic "
"%s partition %i\n",
buf_len, rd_kafka_topic_name(rkt), partition);*/
MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i",
- buf_len, rd_kafka_topic_name(rkt), partition);
+ buf_len, rd_kafka_topic_name(topic), partition);
// rd_kafka_poll(kafka_producer, 0);
return PUSH_DATA_SUCCESS;
}
+#if ENABLE_GSJ_THREAD
static void push_data_to_GSJ(char *buffer, int buf_len)
{
GoString value = {(const char *)buffer, buf_len};
GetInfo(value);
// Work();
}
+#endif
#if 0
unsigned char get_service_id(struct streaminfo *pstream)
@@ -610,18 +634,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
sprintf(protocol, "%s", "others");
break;
}
- /*
- printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ",
- tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes);
- printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ",
- tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip);
- printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ",
- tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id);
- printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ",
- tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off);
- printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ",
- tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit);
- */
char info[MAX_TRAFFIC_INFO_LEN] = {0};
inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
@@ -645,9 +657,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
-#if ENABLE_GSJ_THREAD
- // push_data_to_GSJ(info, strlen(info));
-#endif
break;
case ADDR_TYPE_IPV6:
snprintf(info, MAX_TRAFFIC_INFO_LEN,
@@ -664,9 +673,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
-#if ENABLE_GSJ_THREAD
- // push_data_to_GSJ(info, strlen(info));
-#endif
break;
case ADDR_TYPE_ARP:
snprintf(info, MAX_TRAFFIC_INFO_LEN,
@@ -682,29 +688,229 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
flow_type, sendto_gdev_ip);
// MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info);
// push_data_to_kafka(info,strlen(info));
-#if ENABLE_GSJ_THREAD
- // push_data_to_GSJ(info, strlen(info));
-#endif
break;
default:
break;
}
+ push_data_to_kafka(info, strlen(info), rkt_traffic_stat);
#if ENABLE_GSJ_THREAD
- if(push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT){
- memset(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq])-1, 0, 1);
- push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq]));
-
- push_buffer_count[thread_seq] = 0;
- memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN);
- }
- else{
+ if (push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT)
+ {
+ memset(thread_push_buffer[thread_seq] + strlen(thread_push_buffer[thread_seq]) - 1, 0, 1);
+ push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq]));
+
+ push_buffer_count[thread_seq] = 0;
+ memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT * MAX_TRAFFIC_INFO_LEN);
+ }
+ else
+ {
push_buffer_count[thread_seq]++;
- memcpy(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq]), info, strlen(info));
+ memcpy(thread_push_buffer[thread_seq] + strlen(thread_push_buffer[thread_seq]), info, strlen(info));
}
#endif
}
+
+int ip_mod(struct traffic_info *tinfo)
+{
+ char vxlan_sip_ip_str[INET_ADDRSTRLEN];
+ char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
+
+ int a, b, c, d;
+ sscanf(vxlan_sip_ip_str, "%d.%d.%d.%d", &a, &b, &c, &d);
+ if (a + b + c + d == 0)
+ {
+ return 1;
+ }
+ if (d % 16 == 1)
+ {
+ return 1;
+ }
+ sscanf(vxlan_dip_ip_str, "%d.%d.%d.%d", &a, &b, &c, &d);
+ if (a + b + c + d == 0)
+ {
+ return 1;
+ }
+ if (d % 16 == 1)
+ {
+ return 1;
+ }
+ if (tinfo->vxlan_vpn_id == 3784)
+ {
+ return 1;
+ }
+ return 0;
+}
+
+long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *user_arg)
+{
+ struct traffic_info *tinfo = (struct traffic_info *)user_arg;
+ UINT64 *cnt = (UINT64 *)data;
+ if (cnt == NULL)
+ {
+ cnt = (UINT64 *)calloc(1, sizeof(UINT64));
+ // *cnt = 1;
+ *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
+ if (MESA_htable_add(vlanid_ip_htable_handle, key, size, cnt) >= 0)
+ {
+ return 1;
+ }
+ else
+ {
+ return 0;
+ }
+ }
+ else
+ {
+ // *cnt = *cnt + 1;
+ *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num;
+ }
+ return 1;
+}
+
+void vlanid_ip_stat(struct traffic_info *tinfo)
+{
+ uchar vlanid_ip_key[21];
+ /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key
+ FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key
+ */
+ memset(vlanid_ip_key, 0, sizeof(char) * 21);
+ char vxlan_sip_ip_str[INET_ADDRSTRLEN];
+ char vxlan_dip_ip_str[INET_ADDRSTRLEN];
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN);
+ inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN);
+
+ if (flow_type == (unsigned int)FLOW_TYPE_REFLUX)
+ {
+ snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str);
+ }
+ else if (flow_type == (unsigned int)FLOW_TYPE_INJECT)
+ {
+ snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str);
+ }
+ else
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "flow_type error: neither 0 nor 1\n");
+ return;
+ }
+ hash_cb_fun_t *search_cb = vlanid_ip_htable_search_cb;
+ long search_cb_ret = 0;
+ MESA_htable_search_cb(vlanid_ip_htable_handle, vlanid_ip_key, strlen(vlanid_ip_key), search_cb, tinfo, &search_cb_ret);
+ if (search_cb_ret == 0)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_search_cb failed\n");
+ }
+}
+
static int tcp_flow_id = -1;
+char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq)
+{
+ // if (!ip_mod(tinfo))
+ // {
+ // free(tinfo);
+ // return DEFAULT_RETURN_VALUE;
+ // }
+ if (tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+#if ENABLE_COUNT_THREAD
+ close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
+ close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
+#endif
+ vlanid_ip_stat(tinfo);
+ if (!ip_mod(tinfo))
+ {
+ free(tinfo);
+ return DEFAULT_RETURN_VALUE;
+ }
+
+#if ENABLE_COUNT_THREAD
+ push_count[thread_seq]++;
+#endif
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ // tinfo->stat_time = time(0);
+ gettimeofday(&tinfo->stat_time, NULL);
+
+ print_traffic_info(tinfo, pstream, thread_seq);
+ free(tinfo);
+ return DEFAULT_RETURN_VALUE;
+}
+
+char process_tcp_data(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq)
+{
+ // if (!ip_mod(tinfo))
+ // {
+ // return DEFAULT_RETURN_VALUE;
+ // }
+ if (tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
+ tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
+ tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
+ }
+ else
+ {
+ tinfo->C2S_pkt_num = 0;
+ tinfo->S2C_pkt_num = 0;
+ tinfo->C2S_bytes = 0;
+ tinfo->S2C_bytes = 0;
+ }
+#if ENABLE_COUNT_THREAD
+ close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
+ close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
+#endif
+ vlanid_ip_stat(tinfo);
+ if (!ip_mod(tinfo))
+ {
+ if (tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
+ tinfo->C2S_bytes = tflow->C2S_all_byte;
+ tinfo->S2C_bytes = tflow->S2C_all_byte;
+ }
+ return DEFAULT_RETURN_VALUE;
+ }
+
+#if ENABLE_COUNT_THREAD
+ push_count[thread_seq]++;
+#endif
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ gettimeofday(&tinfo->stat_time, NULL);
+ print_traffic_info(tinfo, pstream, thread_seq);
+
+ if (tcp_flow_id != -1)
+ {
+ struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
+ tinfo->C2S_bytes = tflow->C2S_all_byte;
+ tinfo->S2C_bytes = tflow->S2C_all_byte;
+ }
+ return DEFAULT_RETURN_VALUE;
+}
+
char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt)
{
//printf("TCP_ENTRY_ALL SUCCESS!!!\n");
@@ -720,7 +926,6 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
if (-1 == tcp_flow_id)
{
- // printf("'tcp_flow_stat' is disable, no statistics\n");
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n");
}
}
@@ -767,38 +972,11 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
}
tinfo = (struct traffic_info *)(*pme);
+ // vlanid_ip_stat(tinfo);
+
if (pstream->pktstate == OP_STATE_CLOSE)
{
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
- }
- else
- {
- tinfo->C2S_pkt_num = 0;
- tinfo->S2C_pkt_num = 0;
- tinfo->C2S_bytes = 0;
- tinfo->S2C_bytes = 0;
- }
-#if ENABLE_COUNT_THREAD
- close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
- close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
- push_count[thread_seq]++;
-#endif
- /* layer_addr */
- tinfo->addr = pstream->addr;
- /* STAT_TIME */
- // tinfo->stat_time = time(0);
- gettimeofday(&tinfo->stat_time, NULL);
-
- print_traffic_info(tinfo, pstream, thread_seq);
- free(tinfo);
-
- return DEFAULT_RETURN_VALUE;
+ return process_tcp_close(pstream, tinfo, thread_seq);
}
#if ENABLE_TIMER
/*每隔N秒发一次 */
@@ -806,42 +984,8 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const
{
if (time_out[thread_seq])
{
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
- }
- else
- {
- tinfo->C2S_pkt_num = 0;
- tinfo->S2C_pkt_num = 0;
- tinfo->C2S_bytes = 0;
- tinfo->S2C_bytes = 0;
- }
- /* layer_addr */
- tinfo->addr = pstream->addr;
-
- gettimeofday(&tinfo->stat_time, NULL);
-
-#if ENABLE_COUNT_THREAD
- close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
- close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
- push_count[thread_seq]++;
-#endif
- print_traffic_info(tinfo, pstream, thread_seq);
-
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
- tinfo->C2S_bytes = tflow->C2S_all_byte;
- tinfo->S2C_bytes = tflow->S2C_all_byte;
- }
time_out[thread_seq] = 0;
+ return process_tcp_data(pstream, tinfo, thread_seq);
}
}
#endif
@@ -859,8 +1003,6 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
- // time_t *time_old;
- // struct traffic_info *tinfo_new;
#if ENABLE_COUNT_THREAD
all_stream_pkt_num[thread_seq]++;
@@ -871,17 +1013,12 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
tcp_flow_id = project_customer_register("tcp_flow_stat", "struct");
if (-1 == tcp_flow_id)
{
- // printf("'tcp_flow_stat' is disable, no statistics\n");
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n");
}
}
if (pstream->opstate == OP_STATE_PENDING)
{
- // *pme = (void *)calloc(2, sizeof(void *));
- // time_old = (time_t *)calloc(1, sizeof(time_t));
- // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
-
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
@@ -918,140 +1055,25 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/* 应用层协议类型 用目的端口表示*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
- // *tinfo_new = *tinfo;
*pme = tinfo;
- // ((void **)*pme)[0] = tinfo;
- // ((void **)*pme)[1] = tinfo_new;
- // time(time_old);
- // ((void **)*pme)[2] = time_old;
}
tinfo = (struct traffic_info *)(*pme);
- // tinfo = (struct traffic_info *)(((void **)*pme)[0]);
- // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
- // time_old = (time_t *)(((void **)*pme)[2]);
- /* 自己统计包数字节数*/ /*
- if(raw_pdetail->datalen > 0)
- {
- if(DIR_C2S == pstream->curdir)
- {
- tinfo->C2S_pkt_num++;
- tinfo->C2S_bytes += raw_pdetail->datalen;
- }
- else
- {
- tinfo->S2C_pkt_num++;
- tinfo->S2C_bytes += raw_pdetail->datalen;
- }
- }
- */
- /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
- // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
+ // vlanid_ip_stat(tinfo);
+
if (pstream->opstate == OP_STATE_CLOSE)
{
- //printf("TCP_ENTRY SUCCESS!!!\n");
- /* 获取包数字节数*/
- // tinfo->C2S_pkt_num = raw_pdetail->serverpktnum;
- // tinfo->S2C_pkt_num = raw_pdetail->clientpktnum;
- // tinfo->C2S_bytes = raw_pdetail->serverbytes;
- // tinfo->S2C_bytes = raw_pdetail->clientbytes;
- // if(tinfo->C2S_bytes < 0)
- // {
- // tinfo->C2S_bytes = 0;
- // }
- // if(tinfo->S2C_bytes < 0)
- // {
- // tinfo->S2C_bytes = 0;
- // }
- /* 另一种获取包数字节数的方式*/
-
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
- }
- else
- {
- tinfo->C2S_pkt_num = 0;
- tinfo->S2C_pkt_num = 0;
- tinfo->C2S_bytes = 0;
- tinfo->S2C_bytes = 0;
- }
-#if ENABLE_COUNT_THREAD
- close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
- close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
- push_count[thread_seq]++;
-#endif
- /* layer_addr */
- tinfo->addr = pstream->addr;
- /* STAT_TIME */
- // tinfo->stat_time = time(0);
- gettimeofday(&tinfo->stat_time, NULL);
-
- print_traffic_info(tinfo, pstream, thread_seq);
- free(tinfo);
- // free(tinfo_new);
- // free(time_old);
- // free(*pme);
- // printf("\n");
- return DEFAULT_RETURN_VALUE;
+ return process_tcp_close(pstream, tinfo, thread_seq);
}
#if ENABLE_TIMER
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
- // time_t *time_new;
- // time_new = (time_t *)calloc(1, sizeof(time_t));
- // time(time_new);
- // if (*time_new - *time_old >= 120)
if (time_out[thread_seq])
{
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num;
- tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes;
- tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes;
- }
- else
- {
- tinfo->C2S_pkt_num = 0;
- tinfo->S2C_pkt_num = 0;
- tinfo->C2S_bytes = 0;
- tinfo->S2C_bytes = 0;
- }
- /* layer_addr */
- tinfo->addr = pstream->addr;
- // tinfo_new->addr = pstream->addr;
- /* STAT_TIME */
- // tinfo->stat_time = time(0);
- // gettimeofday(&tinfo_new->stat_time, NULL);
- gettimeofday(&tinfo->stat_time, NULL);
-
-#if ENABLE_COUNT_THREAD
- close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
- close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
- push_count[thread_seq]++;
-#endif
- print_traffic_info(tinfo, pstream, thread_seq);
-
- if (tcp_flow_id != -1)
- {
- struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id);
- tinfo->C2S_pkt_num = tflow->C2S_all_pkt;
- tinfo->S2C_pkt_num = tflow->S2C_all_pkt;
- tinfo->C2S_bytes = tflow->C2S_all_byte;
- tinfo->S2C_bytes = tflow->S2C_all_byte;
- }
- // free(time_new);
- // time(time_old);
time_out[thread_seq] = 0;
+ return process_tcp_data(pstream, tinfo, thread_seq);
}
- // free(time_new);
}
#endif
return DEFAULT_RETURN_VALUE;
@@ -1061,9 +1083,6 @@ error_drop:
error_count[thread_seq]++;
#endif
free(tinfo);
- // free(tinfo_new);
- // free(time_old);
- // free(*pme);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
@@ -1099,8 +1118,6 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
{
struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail;
struct traffic_info *tinfo;
- // time_t *time_old;
- // struct traffic_info *tinfo_new;
#if ENABLE_COUNT_THREAD
all_stream_pkt_num[thread_seq]++;
@@ -1116,17 +1133,12 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct");
if (-1 == udp_flow_id)
{
- // printf("'udp_flow_stat' is disable, no statistics\n");
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n");
}
}
if (pstream->opstate == OP_STATE_PENDING)
{
- // *pme = (void *)calloc(2, sizeof(void *));
- // time_old = (time_t *)calloc(1, sizeof(time_t));
- // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
-
tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info));
//tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream);
tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream);
@@ -1164,40 +1176,21 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
/* 应用层协议类型 用目的端口表示*/
tinfo->PROTO_TYPE = get_proto_type(pstream);
- // *tinfo_new = *tinfo;
*pme = tinfo;
- // ((void **)*pme)[0] = tinfo;
- // ((void **)*pme)[1] = tinfo_new;
- // time(time_old);
- // ((void **)*pme)[2] = time_old;
}
tinfo = (struct traffic_info *)(*pme);
- // tinfo = (struct traffic_info *)(((void **)*pme)[0]);
- // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]);
- // time_old = (time_t *)(((void **)*pme)[2]);
- /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */
- // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id])
+ // vlanid_ip_stat(tinfo);
+
if (pstream->opstate == OP_STATE_CLOSE)
{
- //printf("UDP_ENTRY SUCCESS!!!\n");
-
+ // if (!ip_mod(tinfo))
+ // {
+ // free(tinfo);
+ // return DEFAULT_RETURN_VALUE;
+ // }
if (pdetail != NULL)
{
- /* 获取包数字节数 */
- // tinfo->C2S_pkt_num = pdetail->serverpktnum;
- // tinfo->S2C_pkt_num = pdetail->clientpktnum;
- // tinfo->C2S_bytes = pdetail->serverbytes;
- // tinfo->S2C_bytes = pdetail->clientbytes;
- // if (tinfo->C2S_bytes < 0)
- // {
- // tinfo->C2S_bytes = 0;
- // }
- // if (tinfo->S2C_bytes < 0)
- // {
- // tinfo->S2C_bytes = 0;
- // }
-
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
@@ -1224,6 +1217,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
#if ENABLE_COUNT_THREAD
close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
+#endif
+ vlanid_ip_stat(tinfo);
+ if (!ip_mod(tinfo))
+ {
+ free(tinfo);
+ return DEFAULT_RETURN_VALUE;
+ }
+
+#if ENABLE_COUNT_THREAD
push_count[thread_seq]++;
#endif
/* layer_addr */
@@ -1234,22 +1236,19 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
print_traffic_info(tinfo, pstream, thread_seq);
free(tinfo);
- // free(tinfo_new);
- // free(time_old);
- // free(*pme);
- // printf("\n");
+
return DEFAULT_RETURN_VALUE;
}
#if ENABLE_TIMER
/*每隔N秒发一次 */
if (pstream->opstate == OP_STATE_DATA)
{
- // time_t *time_new;
- // time_new = (time_t *)calloc(1, sizeof(time_t));
- // time(time_new);
- // if (*time_new - *time_old >= 120)
if (time_out[thread_seq])
{
+ // if (!ip_mod(tinfo))
+ // {
+ // return DEFAULT_RETURN_VALUE;
+ // }
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
@@ -1265,21 +1264,34 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
tinfo->C2S_bytes = 0;
tinfo->S2C_bytes = 0;
}
- /* layer_addr */
- tinfo->addr = pstream->addr;
- // tinfo_new->addr = pstream->addr;
- /* STAT_TIME */
- // tinfo->stat_time = time(0);
- // gettimeofday(&tinfo_new->stat_time, NULL);
- gettimeofday(&tinfo->stat_time, NULL);
-
#if ENABLE_COUNT_THREAD
close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num;
close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num;
- push_count[thread_seq]++;
#endif
+ vlanid_ip_stat(tinfo);
+ if (!ip_mod(tinfo))
+ {
+ if (tcp_flow_id != -1)
+ {
+ struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
+ tinfo->C2S_pkt_num = tflow->C2S_pkt;
+ tinfo->S2C_pkt_num = tflow->S2C_pkt;
+ tinfo->C2S_bytes = tflow->C2S_byte;
+ tinfo->S2C_bytes = tflow->S2C_byte;
+ }
+ time_out[thread_seq] = 0;
+ return DEFAULT_RETURN_VALUE;
+ }
+#if ENABLE_COUNT_THREAD
+ push_count[thread_seq]++;
+#endif
+ /* layer_addr */
+ tinfo->addr = pstream->addr;
+ /* STAT_TIME */
+ gettimeofday(&tinfo->stat_time, NULL);
print_traffic_info(tinfo, pstream, thread_seq);
+
if (udp_flow_id != -1)
{
struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id);
@@ -1288,12 +1300,8 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi
tinfo->C2S_bytes = tflow->C2S_byte;
tinfo->S2C_bytes = tflow->S2C_byte;
}
- // free(time_new);
- // time(time_old);
time_out[thread_seq] = 0;
}
-
- // free(time_new);
}
#endif
return DEFAULT_RETURN_VALUE;
@@ -1303,9 +1311,6 @@ error_drop:
error_count[thread_seq]++;
#endif
free(tinfo);
- // free(tinfo_new);
- // free(time_old);
- // free(*pme);
return APP_STATE_DROPME | APP_STATE_DROPPKT;
}
@@ -1332,8 +1337,8 @@ void count_work()
all_error_count += error_count[i];
}
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL,
- module_name, "all_stream_pkt_num is %llu,close_stream_pkt_num is %llu, push_count is %llu, error_count is %llu. \n",
- all_pkt, close_pkt, all_push_count, all_error_count);
+ module_name, "all_stream_pkt_num is %llu,close_stream_pkt_num is %llu, push_count is %llu, error_count is %llu, vlanid_ip_push_count is %llu, vlanid_ip_pkt_num is %llu.\n",
+ all_pkt, close_pkt, all_push_count, all_error_count, vlanid_ip_push_count, vlanid_ip_pkt_num);
}
}
#if ENABLE_TIMER
@@ -1341,7 +1346,7 @@ void timer_work()
{
while (1)
{
- sleep(120);
+ sleep(60);
int i = 0;
for (i; i < 32; i++)
{
@@ -1353,6 +1358,41 @@ void timer_work()
}
}
#endif
+
+void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *user)
+{
+ char info[64] = {0};
+ if (flow_type == FLOW_TYPE_REFLUX)
+ {
+ snprintf(info, 64, "hlsum#%s#%llu", key, *(UINT64 *)data);
+ }
+ else
+ {
+ snprintf(info, 64, "hzsum#%s#%llu", key, *(UINT64 *)data);
+ }
+ push_data_to_kafka(info, strlen(info), rkt_sum);
+ vlanid_ip_pkt_num += *(UINT64 *)data;
+ vlanid_ip_push_count++;
+ // *(UINT64 *)data = 0;
+}
+
+void vlanid_ip_thread_work()
+{
+ while (1)
+ {
+ sleep(1);
+ vlanid_ip_pkt_num = 0;
+ rd_kafka_poll(kafka_producer, 0);
+ if (MESA_htable_get_elem_num(vlanid_ip_htable_handle) > 0)
+ {
+ if (MESA_htable_iterate(vlanid_ip_htable_handle, vlanid_ip_htable_iterate, NULL) == -1)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_iterate failed\n");
+ }
+ }
+ }
+}
+
int CHAR_INIT()
{
int demo_plugid = 51;
@@ -1366,7 +1406,9 @@ int CHAR_INIT()
runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level);
kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level);
- if (runtime_log_handler == NULL || kafka_log_handler == NULL)
+ kafka_stat_handler = MESA_create_runtime_log_handle(kafka_stat_path, log_level);
+
+ if (runtime_log_handler == NULL || kafka_log_handler == NULL || kafka_stat_handler == NULL)
{
/* code */
printf("MESA_create_runtime_log_handle failed!!!");
@@ -1396,7 +1438,10 @@ int CHAR_INIT()
/* FLOW_TYPE */
MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX);
/* Brokers */
- // MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers), "#");
+ /* TOPIC */
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_TRAFFIC_STATISTIC", topic_traffic_stat, sizeof(topic_traffic_stat), "#");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_SUM", topic_sum, sizeof(topic_sum), "#");
/* vx_ip_header_dst_ip */
MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#");
if ('#' == str_tmp[0])
@@ -1431,11 +1476,38 @@ int CHAR_INIT()
inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net);
/* kafka初始化*/
- // if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS)
- // {
- // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!");
- // return -1;
- // }
+ if (init_kafka(0, brokers, topic_traffic_stat, topic_sum) != PRODUCER_INIT_SUCCESS)
+ {
+ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "kafka init failed!!!");
+ return -1;
+ }
+ /* vlanid_ip_htable init*/
+ vlanid_ip_htable_handle = MESA_htable_born();
+ if (vlanid_ip_htable_handle == NULL)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_born failed\n");
+ return -1;
+ }
+ int thread_safe = 1;
+ MESA_htable_set_opt(vlanid_ip_htable_handle, MHO_THREAD_SAFE, &thread_safe, sizeof(int));
+ if (MESA_htable_mature(vlanid_ip_htable_handle) < 0)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_mature failed\n");
+ return -1;
+ }
+ /* vlanid_ip_thread_work start */
+ int vlanid_ip_thread;
+ vlanid_ip_thread = pthread_create(&vlanid_ip_pthread_t, NULL, (void *)vlanid_ip_thread_work, NULL);
+ if (vlanid_ip_thread == 0)
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "vlanid_ip thread start success\n");
+ pthread_detach(vlanid_ip_pthread_t);
+ }
+ else
+ {
+ MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "vlanid_ip thread start failed\n");
+ }
+
#if ENABLE_GSJ_THREAD
//GSJ Work thread start
int ret_thread;
@@ -1479,10 +1551,11 @@ int CHAR_INIT()
}
#endif
//发送缓冲区
- int i=0;
- for(i; i<32; i++){
- thread_push_buffer[i] = (char *)calloc(1,PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN*sizeof(char));
- }
+ // int i = 0;
+ // for (i; i < 32; i++)
+ // {
+ // thread_push_buffer[i] = (char *)calloc(1, PUSH_BUFFER_COUNT * MAX_TRAFFIC_INFO_LEN * sizeof(char));
+ // }
// 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n");
return demo_plugid;
@@ -1492,7 +1565,7 @@ void LRJ_APP_DESTROY()
{
MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n");
printf("TEST_APP_DESTORY in...\n");
- // kafka_destroy();
+ kafka_destroy();
if (runtime_log_handler == NULL)
{
printf("TEST_APP_DESTORY out...\n");
@@ -1500,10 +1573,14 @@ void LRJ_APP_DESTROY()
}
MESA_destroy_runtime_log_handle(runtime_log_handler);
MESA_destroy_runtime_log_handle(kafka_log_handler);
+
+ MESA_htable_destroy(vlanid_ip_htable_handle, NULL);
+
//free thread_push_buffer
- int i=0;
- for(i; i<32; i++){
- free(thread_push_buffer[i]);
- }
+ // int i = 0;
+ // for (i; i < 32; i++)
+ // {
+ // free(thread_push_buffer[i]);
+ // }
printf("TEST_APP_DESTORY out...\n");
}