diff options
| -rw-r--r-- | lirenjie_vxlan_sapp_20200803.c | 735 |
1 files changed, 406 insertions, 329 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c index 82976d7..f980e2f 100644 --- a/lirenjie_vxlan_sapp_20200803.c +++ b/lirenjie_vxlan_sapp_20200803.c @@ -19,8 +19,7 @@ #include "stream.h" #include "rdkafka.h" #include "libGSJ.h" -#include "cJSON.h" -#include "MESA_list_queue.h" +#include "MESA_htable.h" int version_20190827_1605; static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT); @@ -33,8 +32,8 @@ extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mod extern int g_business_plug_type; #define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803 -#define ENABLE_GSJ_THREAD 1 //是否启用GSJ -#define ENABLE_TIMER 0 +#define ENABLE_GSJ_THREAD 0 //是否启用GSJ +#define ENABLE_TIMER 1 #define MAX_LOG_INFO_LEN 256 #define MAX_VPN_ID_NUM 512 @@ -42,23 +41,27 @@ extern int g_business_plug_type; static const char *module_name = "lirenjie_vxlan"; static const char *tuple_log_path = "./log/lirenjie_vxlan/ip_tuple.log"; static const char *kafka_log_path = "./log/lirenjie_vxlan/kafka.log"; +static const char *kafka_stat_path = "./log/lirenjie_vxlan/kafka_stat.log"; static const char *gdev_conf_path = "./conf/gdev.conf"; static const char *entrance_id_path = "./conf/lrj_vxlan_sapp.conf"; static void *runtime_log_handler; static void *kafka_log_handler; +static void *kafka_stat_handler; //char vx_ip_header_dst_ip[INET_ADDRSTRLEN]; // conf/gdev.conf sendto_gdev_ip static char sendto_gdev_ip[INET_ADDRSTRLEN]; //conf/gdev.conf sendto_gdev_ip static unsigned int sapp_keepalive_reflux_ip_net; /* 本端业务保活、回流IP */ static unsigned int vpn_id_drop[MAX_VPN_ID_NUM] = {0}; /* 0 do not drop, 1 drop */ -#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回�? */ +#define IDENTIFY_LOCAL_IP_SUBNET_MASK (0xFFFFFF00) /* 一个局点N台前端机, 用IP段识别是否前端机IP, 用于区别流量时回流还是回注 */ -static unsigned int entrance_id; /* 局点ID 读配置文�?./conf/lrj_vxlan_sapp.conf 默认�?*/ +static unsigned int entrance_id; /* 局点ID 读配置文件./conf/lrj_vxlan_sapp.conf 默认�?*/ UINT64 all_stream_pkt_num[64] = {0}; UINT64 close_stream_pkt_num[64] = {0}; UINT64 push_count[64] = {0}; UINT64 error_count[64] = {0}; +UINT64 vlanid_ip_push_count = 0; +UINT64 vlanid_ip_pkt_num = 0; enum flow_type_t { @@ -77,14 +80,19 @@ static int partition; static rd_kafka_t *kafka_producer; static rd_kafka_conf_t *conf; // topic -static rd_kafka_topic_t *rkt; -static rd_kafka_topic_conf_t *topic_conf; +static rd_kafka_topic_t *rkt_traffic_stat; //统计数据topic +static rd_kafka_topic_t *rkt_sum; //二元组topic +static rd_kafka_topic_conf_t *topic_conf_traffic_stat; +static rd_kafka_topic_conf_t *topic_conf_sum; // char errstr[512]={0}; //static char *brokers = "10.172.208.1:9092,10.172.208.2:9092,10.172.208.2:9092,10.172.208.4:9092,10.172.208.5:9092"; //static char *brokers = "10.208.133.126:9092,10.208.133.133:9092,10.208.133.135:9092,10.208.133.141:9092"; -// char brokers[128]; - -static char *topic = "G_BACK_TRAFFIC_STATISTIC_new"; +// static char *brokers = "10.208.133.126:9092,10.208.133.128:9092,10.208.133.133:9092,10.208.133.162:9092,10.208.133.166:9092"; +char brokers[128]; +char topic_traffic_stat[64]; +char topic_sum[64]; +// static char *topic1 = "G_BACK_TRAFFIC_STATISTIC_new"; +// static char *topic2 = "Gsj_Sum"; //GSJ thread pthread_t GSJ_Work; @@ -92,11 +100,15 @@ pthread_t GSJ_Work; pthread_t count; //timer thread pthread_t timer; +//vlanid_ip thread +pthread_t vlanid_ip_pthread_t; int time_out[32] = {0}; //1:time out,push data int push_buffer_count[32] = {0}; char *thread_push_buffer[32]; #define PUSH_BUFFER_COUNT 10 +MESA_htable_handle vlanid_ip_htable_handle; + struct traffic_info { unsigned char protocol; //IPv4_TCP 1 IPv4_UDP 2 IPv6_TCP 3 IPv6_UDP 4 其他 0 @@ -143,31 +155,39 @@ static void logger(const rd_kafka_t *rk, int level, const char *fac, const char level, fac, rk ? rd_kafka_name(rk) : NULL, buf); } -static int init_kafka(int partition_, char *brokers_, char *topic_) +static int kafka_stats_cb(rd_kafka_t *rk, char *json, size_t json_len, void *opaque) +{ + MESA_handle_runtime_log(kafka_stat_handler, RLOG_LV_FATAL, module_name, "%s", json); + return 1; +} + +static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, char *topic_sum) { char tmp[16]; char errstr[1024]; partition = partition_; /* Kafka configuration */ conf = rd_kafka_conf_new(); + if (conf == NULL) + { + //fprintf(stderr, "***** Failed to create new conf *******\n"); + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******"); + return PRODUCER_INIT_FAILED; + } //set logger :register log function rd_kafka_conf_set_log_cb(conf, logger); + rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); + rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); - //rd_kafka_conf_set(conf, "producer.type", "kafka.producer.AyncProducer", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "queue.buffering.max.messages", "1000000", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", "2000000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); /*topic configuration*/ - topic_conf = rd_kafka_topic_conf_new(); + topic_conf_traffic_stat = rd_kafka_topic_conf_new(); + topic_conf_sum = rd_kafka_topic_conf_new(); - if (conf == NULL) - { - //fprintf(stderr, "***** Failed to create new conf *******\n"); - MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "***** Failed to create new conf *******"); - return PRODUCER_INIT_FAILED; - } kafka_producer = rd_kafka_new(RD_KAFKA_PRODUCER, conf, errstr, (size_t)sizeof(errstr)); if (kafka_producer == NULL) { @@ -188,18 +208,20 @@ static int init_kafka(int partition_, char *brokers_, char *topic_) return PRODUCER_INIT_FAILED; } /* Create topic */ - rkt = rd_kafka_topic_new(kafka_producer, topic_, topic_conf); + rkt_traffic_stat = rd_kafka_topic_new(kafka_producer, topic_traffic_stat, topic_conf_traffic_stat); + rkt_sum = rd_kafka_topic_new(kafka_producer, topic_sum, topic_conf_sum); return PRODUCER_INIT_SUCCESS; } static void kafka_destroy() { - rd_kafka_topic_destroy(rkt); + rd_kafka_topic_destroy(rkt_traffic_stat); + rd_kafka_topic_destroy(rkt_sum); rd_kafka_destroy(kafka_producer); } -static int push_data_to_kafka(char *buffer, int buf_len) +static int push_data_to_kafka(char *buffer, int buf_len, rd_kafka_topic_t *topic) { int ret; if (buffer == NULL) @@ -207,7 +229,7 @@ static int push_data_to_kafka(char *buffer, int buf_len) return 0; } //ret = rd_kafka_produce(rkt, partition, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); - ret = rd_kafka_produce(rkt, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); + ret = rd_kafka_produce(topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, buffer, (size_t)buf_len, NULL, 0, NULL); if (ret == -1) { /*fprintf(stderr, @@ -216,27 +238,29 @@ static int push_data_to_kafka(char *buffer, int buf_len) rd_kafka_topic_name(rkt), partition, rd_kafka_err2str(rd_kafka_last_error()));*/ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_FATAL, module_name, "%% Failed to produce to topic %s partition %i: %s", - rd_kafka_topic_name(rkt), partition, + rd_kafka_topic_name(topic), partition, rd_kafka_err2str(rd_kafka_last_error())); /* Poll to handle delivery reports */ - //rd_kafka_poll(kafka_producer, 0); + rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_FAILED; } /*fprintf(stderr, "%% Sent %zd bytes to topic " "%s partition %i\n", buf_len, rd_kafka_topic_name(rkt), partition);*/ MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "%% Sent %zd bytes to topic %s partition %i", - buf_len, rd_kafka_topic_name(rkt), partition); + buf_len, rd_kafka_topic_name(topic), partition); // rd_kafka_poll(kafka_producer, 0); return PUSH_DATA_SUCCESS; } +#if ENABLE_GSJ_THREAD static void push_data_to_GSJ(char *buffer, int buf_len) { GoString value = {(const char *)buffer, buf_len}; GetInfo(value); // Work(); } +#endif #if 0 unsigned char get_service_id(struct streaminfo *pstream) @@ -610,18 +634,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps sprintf(protocol, "%s", "others"); break; } - /* - printf("\"proto_type\":%d,protocol:%s, entrance_id:%d, c2s_pkt_num:%d, s2c_pkt_num:%d, c2s_byte_len:%d, s2c_byte_len:%d, ", - tinfo->PROTO_TYPE,protocol, entrance_id, tinfo->C2S_pkt_num,tinfo->S2C_pkt_num,tinfo->C2S_bytes,tinfo->S2C_bytes); - printf("stat_time:%ld, vx_type:0x%04X, vx_ip_header_src_ip:%s, vx_ip_header_dst_ip:%s, ", - tinfo->stat_time, tinfo->vx_type, tinfo->vx_ip_header_src_ip,vx_ip_header_dst_ip); - printf("vx_UDP_header_src_port:%d, vx_UDP_header_dst_port=%d, vx_vlan_id:%d, ", - tinfo->vx_UDP_header_src_port,tinfo->vx_UDP_header_dst_port,tinfo->service_id); - printf("ipv4_src_ip:%s, ipv4_dst_ip:%s, ipv4_identification:%d, ipv4_fragment_offset:%d, ", - tinfo->ipv4_sip,tinfo->ipv4_dip,tinfo->ipv4_id,tinfo->ipv4_off); - printf("ipv6_src_ip:%s, ipv6_dst_ip:%s, ipv6_bus_type:%s, ipv6_flow_flag:%d, ipv6_load_length:%d, ipv6_next_msg_head:%d, ipv6_limit:%d ", - tinfo->ipv6_sip,tinfo->ipv6_dip,tinfo->ipv6_bus_type,tinfo->ipv6_flow_flag,tinfo->ipv6_load_length,tinfo->ipv6_next_msg_head,tinfo->ipv6_limit); - */ char info[MAX_TRAFFIC_INFO_LEN] = {0}; inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); @@ -645,9 +657,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); -#if ENABLE_GSJ_THREAD - // push_data_to_GSJ(info, strlen(info)); -#endif break; case ADDR_TYPE_IPV6: snprintf(info, MAX_TRAFFIC_INFO_LEN, @@ -664,9 +673,6 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps tinfo->ipv6_next_msg_head, tinfo->ipv6_limit, flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); -#if ENABLE_GSJ_THREAD - // push_data_to_GSJ(info, strlen(info)); -#endif break; case ADDR_TYPE_ARP: snprintf(info, MAX_TRAFFIC_INFO_LEN, @@ -682,29 +688,229 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps flow_type, sendto_gdev_ip); // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_INFO, module_name, info); // push_data_to_kafka(info,strlen(info)); -#if ENABLE_GSJ_THREAD - // push_data_to_GSJ(info, strlen(info)); -#endif break; default: break; } + push_data_to_kafka(info, strlen(info), rkt_traffic_stat); #if ENABLE_GSJ_THREAD - if(push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT){ - memset(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq])-1, 0, 1); - push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq])); - - push_buffer_count[thread_seq] = 0; - memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN); - } - else{ + if (push_buffer_count[thread_seq] == PUSH_BUFFER_COUNT) + { + memset(thread_push_buffer[thread_seq] + strlen(thread_push_buffer[thread_seq]) - 1, 0, 1); + push_data_to_GSJ(thread_push_buffer[thread_seq], strlen(thread_push_buffer[thread_seq])); + + push_buffer_count[thread_seq] = 0; + memset(thread_push_buffer[thread_seq], 0, PUSH_BUFFER_COUNT * MAX_TRAFFIC_INFO_LEN); + } + else + { push_buffer_count[thread_seq]++; - memcpy(thread_push_buffer[thread_seq]+strlen(thread_push_buffer[thread_seq]), info, strlen(info)); + memcpy(thread_push_buffer[thread_seq] + strlen(thread_push_buffer[thread_seq]), info, strlen(info)); } #endif } + +int ip_mod(struct traffic_info *tinfo) +{ + char vxlan_sip_ip_str[INET_ADDRSTRLEN]; + char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); + + int a, b, c, d; + sscanf(vxlan_sip_ip_str, "%d.%d.%d.%d", &a, &b, &c, &d); + if (a + b + c + d == 0) + { + return 1; + } + if (d % 16 == 1) + { + return 1; + } + sscanf(vxlan_dip_ip_str, "%d.%d.%d.%d", &a, &b, &c, &d); + if (a + b + c + d == 0) + { + return 1; + } + if (d % 16 == 1) + { + return 1; + } + if (tinfo->vxlan_vpn_id == 3784) + { + return 1; + } + return 0; +} + +long vlanid_ip_htable_search_cb(void *data, const uchar *key, uint size, void *user_arg) +{ + struct traffic_info *tinfo = (struct traffic_info *)user_arg; + UINT64 *cnt = (UINT64 *)data; + if (cnt == NULL) + { + cnt = (UINT64 *)calloc(1, sizeof(UINT64)); + // *cnt = 1; + *cnt = tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; + if (MESA_htable_add(vlanid_ip_htable_handle, key, size, cnt) >= 0) + { + return 1; + } + else + { + return 0; + } + } + else + { + // *cnt = *cnt + 1; + *cnt = *cnt + tinfo->C2S_pkt_num + tinfo->S2C_pkt_num; + } + return 1; +} + +void vlanid_ip_stat(struct traffic_info *tinfo) +{ + uchar vlanid_ip_key[21]; + /* FLOW_TYPE_REFLUX: vx_vlan_id/vx_ip_header_src_ip is key + FLOW_TYPE_INJECT: vx_vlan_id/vx_ip_header_dst_ip is key + */ + memset(vlanid_ip_key, 0, sizeof(char) * 21); + char vxlan_sip_ip_str[INET_ADDRSTRLEN]; + char vxlan_dip_ip_str[INET_ADDRSTRLEN]; + inet_ntop(AF_INET, &tinfo->vx_ip_header_src_ip_net, vxlan_sip_ip_str, INET_ADDRSTRLEN); + inet_ntop(AF_INET, &tinfo->vx_ip_header_dst_ip_net, vxlan_dip_ip_str, INET_ADDRSTRLEN); + + if (flow_type == (unsigned int)FLOW_TYPE_REFLUX) + { + snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_sip_ip_str); + } + else if (flow_type == (unsigned int)FLOW_TYPE_INJECT) + { + snprintf(vlanid_ip_key, 21, "%d@%s", tinfo->vxlan_vpn_id, vxlan_dip_ip_str); + } + else + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "flow_type error: neither 0 nor 1\n"); + return; + } + hash_cb_fun_t *search_cb = vlanid_ip_htable_search_cb; + long search_cb_ret = 0; + MESA_htable_search_cb(vlanid_ip_htable_handle, vlanid_ip_key, strlen(vlanid_ip_key), search_cb, tinfo, &search_cb_ret); + if (search_cb_ret == 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_search_cb failed\n"); + } +} + static int tcp_flow_id = -1; +char process_tcp_close(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq) +{ + // if (!ip_mod(tinfo)) + // { + // free(tinfo); + // return DEFAULT_RETURN_VALUE; + // } + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } +#if ENABLE_COUNT_THREAD + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; +#endif + vlanid_ip_stat(tinfo); + if (!ip_mod(tinfo)) + { + free(tinfo); + return DEFAULT_RETURN_VALUE; + } + +#if ENABLE_COUNT_THREAD + push_count[thread_seq]++; +#endif + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + // tinfo->stat_time = time(0); + gettimeofday(&tinfo->stat_time, NULL); + + print_traffic_info(tinfo, pstream, thread_seq); + free(tinfo); + return DEFAULT_RETURN_VALUE; +} + +char process_tcp_data(struct streaminfo *pstream, struct traffic_info *tinfo, int thread_seq) +{ + // if (!ip_mod(tinfo)) + // { + // return DEFAULT_RETURN_VALUE; + // } + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; + tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; + tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; + } + else + { + tinfo->C2S_pkt_num = 0; + tinfo->S2C_pkt_num = 0; + tinfo->C2S_bytes = 0; + tinfo->S2C_bytes = 0; + } +#if ENABLE_COUNT_THREAD + close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; + close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; +#endif + vlanid_ip_stat(tinfo); + if (!ip_mod(tinfo)) + { + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt; + tinfo->C2S_bytes = tflow->C2S_all_byte; + tinfo->S2C_bytes = tflow->S2C_all_byte; + } + return DEFAULT_RETURN_VALUE; + } + +#if ENABLE_COUNT_THREAD + push_count[thread_seq]++; +#endif + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + gettimeofday(&tinfo->stat_time, NULL); + print_traffic_info(tinfo, pstream, thread_seq); + + if (tcp_flow_id != -1) + { + struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_all_pkt; + tinfo->S2C_pkt_num = tflow->S2C_all_pkt; + tinfo->C2S_bytes = tflow->C2S_all_byte; + tinfo->S2C_bytes = tflow->S2C_all_byte; + } + return DEFAULT_RETURN_VALUE; +} + char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const void *raw_pkt) { //printf("TCP_ENTRY_ALL SUCCESS!!!\n"); @@ -720,7 +926,6 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); if (-1 == tcp_flow_id) { - // printf("'tcp_flow_stat' is disable, no statistics\n"); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n"); } } @@ -767,38 +972,11 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const } tinfo = (struct traffic_info *)(*pme); + // vlanid_ip_stat(tinfo); + if (pstream->pktstate == OP_STATE_CLOSE) { - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; - } - else - { - tinfo->C2S_pkt_num = 0; - tinfo->S2C_pkt_num = 0; - tinfo->C2S_bytes = 0; - tinfo->S2C_bytes = 0; - } -#if ENABLE_COUNT_THREAD - close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; - close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; - push_count[thread_seq]++; -#endif - /* layer_addr */ - tinfo->addr = pstream->addr; - /* STAT_TIME */ - // tinfo->stat_time = time(0); - gettimeofday(&tinfo->stat_time, NULL); - - print_traffic_info(tinfo, pstream, thread_seq); - free(tinfo); - - return DEFAULT_RETURN_VALUE; + return process_tcp_close(pstream, tinfo, thread_seq); } #if ENABLE_TIMER /*每隔N秒发一次 */ @@ -806,42 +984,8 @@ char TCP_ENTRY_ALL(struct streaminfo *pstream, void **pme, int thread_seq, const { if (time_out[thread_seq]) { - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; - } - else - { - tinfo->C2S_pkt_num = 0; - tinfo->S2C_pkt_num = 0; - tinfo->C2S_bytes = 0; - tinfo->S2C_bytes = 0; - } - /* layer_addr */ - tinfo->addr = pstream->addr; - - gettimeofday(&tinfo->stat_time, NULL); - -#if ENABLE_COUNT_THREAD - close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; - close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; - push_count[thread_seq]++; -#endif - print_traffic_info(tinfo, pstream, thread_seq); - - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt; - tinfo->C2S_bytes = tflow->C2S_all_byte; - tinfo->S2C_bytes = tflow->S2C_all_byte; - } time_out[thread_seq] = 0; + return process_tcp_data(pstream, tinfo, thread_seq); } } #endif @@ -859,8 +1003,6 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { struct tcpdetail *raw_pdetail = (struct tcpdetail *)pstream->pdetail; struct traffic_info *tinfo; - // time_t *time_old; - // struct traffic_info *tinfo_new; #if ENABLE_COUNT_THREAD all_stream_pkt_num[thread_seq]++; @@ -871,17 +1013,12 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi tcp_flow_id = project_customer_register("tcp_flow_stat", "struct"); if (-1 == tcp_flow_id) { - // printf("'tcp_flow_stat' is disable, no statistics\n"); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n"); } } if (pstream->opstate == OP_STATE_PENDING) { - // *pme = (void *)calloc(2, sizeof(void *)); - // time_old = (time_t *)calloc(1, sizeof(time_t)); - // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); - tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); @@ -918,140 +1055,25 @@ char TCP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /* 应用层协议类型 用目的端口表示*/ tinfo->PROTO_TYPE = get_proto_type(pstream); - // *tinfo_new = *tinfo; *pme = tinfo; - // ((void **)*pme)[0] = tinfo; - // ((void **)*pme)[1] = tinfo_new; - // time(time_old); - // ((void **)*pme)[2] = time_old; } tinfo = (struct traffic_info *)(*pme); - // tinfo = (struct traffic_info *)(((void **)*pme)[0]); - // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); - // time_old = (time_t *)(((void **)*pme)[2]); - /* 自己统计包数字节数*/ /* - if(raw_pdetail->datalen > 0) - { - if(DIR_C2S == pstream->curdir) - { - tinfo->C2S_pkt_num++; - tinfo->C2S_bytes += raw_pdetail->datalen; - } - else - { - tinfo->S2C_pkt_num++; - tinfo->S2C_bytes += raw_pdetail->datalen; - } - } - */ - /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ - // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) + // vlanid_ip_stat(tinfo); + if (pstream->opstate == OP_STATE_CLOSE) { - //printf("TCP_ENTRY SUCCESS!!!\n"); - /* 获取包数字节数*/ - // tinfo->C2S_pkt_num = raw_pdetail->serverpktnum; - // tinfo->S2C_pkt_num = raw_pdetail->clientpktnum; - // tinfo->C2S_bytes = raw_pdetail->serverbytes; - // tinfo->S2C_bytes = raw_pdetail->clientbytes; - // if(tinfo->C2S_bytes < 0) - // { - // tinfo->C2S_bytes = 0; - // } - // if(tinfo->S2C_bytes < 0) - // { - // tinfo->S2C_bytes = 0; - // } - /* 另一种获取包数字节数的方式*/ - - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; - } - else - { - tinfo->C2S_pkt_num = 0; - tinfo->S2C_pkt_num = 0; - tinfo->C2S_bytes = 0; - tinfo->S2C_bytes = 0; - } -#if ENABLE_COUNT_THREAD - close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; - close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; - push_count[thread_seq]++; -#endif - /* layer_addr */ - tinfo->addr = pstream->addr; - /* STAT_TIME */ - // tinfo->stat_time = time(0); - gettimeofday(&tinfo->stat_time, NULL); - - print_traffic_info(tinfo, pstream, thread_seq); - free(tinfo); - // free(tinfo_new); - // free(time_old); - // free(*pme); - // printf("\n"); - return DEFAULT_RETURN_VALUE; + return process_tcp_close(pstream, tinfo, thread_seq); } #if ENABLE_TIMER /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { - // time_t *time_new; - // time_new = (time_t *)calloc(1, sizeof(time_t)); - // time(time_new); - // if (*time_new - *time_old >= 120) if (time_out[thread_seq]) { - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt - tinfo->C2S_pkt_num; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt - tinfo->S2C_pkt_num; - tinfo->C2S_bytes = tflow->C2S_all_byte - tinfo->C2S_bytes; - tinfo->S2C_bytes = tflow->S2C_all_byte - tinfo->S2C_bytes; - } - else - { - tinfo->C2S_pkt_num = 0; - tinfo->S2C_pkt_num = 0; - tinfo->C2S_bytes = 0; - tinfo->S2C_bytes = 0; - } - /* layer_addr */ - tinfo->addr = pstream->addr; - // tinfo_new->addr = pstream->addr; - /* STAT_TIME */ - // tinfo->stat_time = time(0); - // gettimeofday(&tinfo_new->stat_time, NULL); - gettimeofday(&tinfo->stat_time, NULL); - -#if ENABLE_COUNT_THREAD - close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; - close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; - push_count[thread_seq]++; -#endif - print_traffic_info(tinfo, pstream, thread_seq); - - if (tcp_flow_id != -1) - { - struct tcp_flow_stat *tflow = (struct tcp_flow_stat *)project_req_get_struct(pstream, tcp_flow_id); - tinfo->C2S_pkt_num = tflow->C2S_all_pkt; - tinfo->S2C_pkt_num = tflow->S2C_all_pkt; - tinfo->C2S_bytes = tflow->C2S_all_byte; - tinfo->S2C_bytes = tflow->S2C_all_byte; - } - // free(time_new); - // time(time_old); time_out[thread_seq] = 0; + return process_tcp_data(pstream, tinfo, thread_seq); } - // free(time_new); } #endif return DEFAULT_RETURN_VALUE; @@ -1061,9 +1083,6 @@ error_drop: error_count[thread_seq]++; #endif free(tinfo); - // free(tinfo_new); - // free(time_old); - // free(*pme); return APP_STATE_DROPME | APP_STATE_DROPPKT; } @@ -1099,8 +1118,6 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi { struct udpdetail *pdetail = (struct udpdetail *)pstream->pdetail; struct traffic_info *tinfo; - // time_t *time_old; - // struct traffic_info *tinfo_new; #if ENABLE_COUNT_THREAD all_stream_pkt_num[thread_seq]++; @@ -1116,17 +1133,12 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi udp_flow_id = project_customer_register(PROJECT_REQ_UDP_FLOW, "struct"); if (-1 == udp_flow_id) { - // printf("'udp_flow_stat' is disable, no statistics\n"); MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "'tcp_flow_stat' is disable, no statistics\n"); } } if (pstream->opstate == OP_STATE_PENDING) { - // *pme = (void *)calloc(2, sizeof(void *)); - // time_old = (time_t *)calloc(1, sizeof(time_t)); - // tinfo_new = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); - tinfo = (struct traffic_info *)calloc(1, sizeof(struct traffic_info)); //tinfo->service_id = get_service_id_from_sport(pstream); //获取vx_lan_id字段,具体方法待定 //tinfo->service_id = get_service_id_from_vxlanid(pstream); tinfo->vxlan_vpn_id = get_vpnid_from_stream(pstream); @@ -1164,40 +1176,21 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi /* 应用层协议类型 用目的端口表示*/ tinfo->PROTO_TYPE = get_proto_type(pstream); - // *tinfo_new = *tinfo; *pme = tinfo; - // ((void **)*pme)[0] = tinfo; - // ((void **)*pme)[1] = tinfo_new; - // time(time_old); - // ((void **)*pme)[2] = time_old; } tinfo = (struct traffic_info *)(*pme); - // tinfo = (struct traffic_info *)(((void **)*pme)[0]); - // tinfo_new = (struct traffic_info *)(((void **)*pme)[1]); - // time_old = (time_t *)(((void **)*pme)[2]); - /* if (pstream->opstate == OP_STATE_CLOSE && tinfo->vxlan_vpn_id > 0) */ - // if (pstream->opstate == OP_STATE_CLOSE && !vpn_id_drop[tinfo->vxlan_vpn_id]) + // vlanid_ip_stat(tinfo); + if (pstream->opstate == OP_STATE_CLOSE) { - //printf("UDP_ENTRY SUCCESS!!!\n"); - + // if (!ip_mod(tinfo)) + // { + // free(tinfo); + // return DEFAULT_RETURN_VALUE; + // } if (pdetail != NULL) { - /* 获取包数字节数 */ - // tinfo->C2S_pkt_num = pdetail->serverpktnum; - // tinfo->S2C_pkt_num = pdetail->clientpktnum; - // tinfo->C2S_bytes = pdetail->serverbytes; - // tinfo->S2C_bytes = pdetail->clientbytes; - // if (tinfo->C2S_bytes < 0) - // { - // tinfo->C2S_bytes = 0; - // } - // if (tinfo->S2C_bytes < 0) - // { - // tinfo->S2C_bytes = 0; - // } - if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); @@ -1224,6 +1217,15 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi #if ENABLE_COUNT_THREAD close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; +#endif + vlanid_ip_stat(tinfo); + if (!ip_mod(tinfo)) + { + free(tinfo); + return DEFAULT_RETURN_VALUE; + } + +#if ENABLE_COUNT_THREAD push_count[thread_seq]++; #endif /* layer_addr */ @@ -1234,22 +1236,19 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi print_traffic_info(tinfo, pstream, thread_seq); free(tinfo); - // free(tinfo_new); - // free(time_old); - // free(*pme); - // printf("\n"); + return DEFAULT_RETURN_VALUE; } #if ENABLE_TIMER /*每隔N秒发一次 */ if (pstream->opstate == OP_STATE_DATA) { - // time_t *time_new; - // time_new = (time_t *)calloc(1, sizeof(time_t)); - // time(time_new); - // if (*time_new - *time_old >= 120) if (time_out[thread_seq]) { + // if (!ip_mod(tinfo)) + // { + // return DEFAULT_RETURN_VALUE; + // } if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); @@ -1265,21 +1264,34 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi tinfo->C2S_bytes = 0; tinfo->S2C_bytes = 0; } - /* layer_addr */ - tinfo->addr = pstream->addr; - // tinfo_new->addr = pstream->addr; - /* STAT_TIME */ - // tinfo->stat_time = time(0); - // gettimeofday(&tinfo_new->stat_time, NULL); - gettimeofday(&tinfo->stat_time, NULL); - #if ENABLE_COUNT_THREAD close_stream_pkt_num[thread_seq] += tinfo->C2S_pkt_num; close_stream_pkt_num[thread_seq] += tinfo->S2C_pkt_num; - push_count[thread_seq]++; #endif + vlanid_ip_stat(tinfo); + if (!ip_mod(tinfo)) + { + if (tcp_flow_id != -1) + { + struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); + tinfo->C2S_pkt_num = tflow->C2S_pkt; + tinfo->S2C_pkt_num = tflow->S2C_pkt; + tinfo->C2S_bytes = tflow->C2S_byte; + tinfo->S2C_bytes = tflow->S2C_byte; + } + time_out[thread_seq] = 0; + return DEFAULT_RETURN_VALUE; + } +#if ENABLE_COUNT_THREAD + push_count[thread_seq]++; +#endif + /* layer_addr */ + tinfo->addr = pstream->addr; + /* STAT_TIME */ + gettimeofday(&tinfo->stat_time, NULL); print_traffic_info(tinfo, pstream, thread_seq); + if (udp_flow_id != -1) { struct udp_flow_stat *tflow = (struct udp_flow_stat *)project_req_get_struct(pstream, udp_flow_id); @@ -1288,12 +1300,8 @@ char UDP_ENTRY(struct streaminfo *pstream, void **pme, int thread_seq, const voi tinfo->C2S_bytes = tflow->C2S_byte; tinfo->S2C_bytes = tflow->S2C_byte; } - // free(time_new); - // time(time_old); time_out[thread_seq] = 0; } - - // free(time_new); } #endif return DEFAULT_RETURN_VALUE; @@ -1303,9 +1311,6 @@ error_drop: error_count[thread_seq]++; #endif free(tinfo); - // free(tinfo_new); - // free(time_old); - // free(*pme); return APP_STATE_DROPME | APP_STATE_DROPPKT; } @@ -1332,8 +1337,8 @@ void count_work() all_error_count += error_count[i]; } MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, - module_name, "all_stream_pkt_num is %llu,close_stream_pkt_num is %llu, push_count is %llu, error_count is %llu. \n", - all_pkt, close_pkt, all_push_count, all_error_count); + module_name, "all_stream_pkt_num is %llu,close_stream_pkt_num is %llu, push_count is %llu, error_count is %llu, vlanid_ip_push_count is %llu, vlanid_ip_pkt_num is %llu.\n", + all_pkt, close_pkt, all_push_count, all_error_count, vlanid_ip_push_count, vlanid_ip_pkt_num); } } #if ENABLE_TIMER @@ -1341,7 +1346,7 @@ void timer_work() { while (1) { - sleep(120); + sleep(60); int i = 0; for (i; i < 32; i++) { @@ -1353,6 +1358,41 @@ void timer_work() } } #endif + +void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *user) +{ + char info[64] = {0}; + if (flow_type == FLOW_TYPE_REFLUX) + { + snprintf(info, 64, "hlsum#%s#%llu", key, *(UINT64 *)data); + } + else + { + snprintf(info, 64, "hzsum#%s#%llu", key, *(UINT64 *)data); + } + push_data_to_kafka(info, strlen(info), rkt_sum); + vlanid_ip_pkt_num += *(UINT64 *)data; + vlanid_ip_push_count++; + // *(UINT64 *)data = 0; +} + +void vlanid_ip_thread_work() +{ + while (1) + { + sleep(1); + vlanid_ip_pkt_num = 0; + rd_kafka_poll(kafka_producer, 0); + if (MESA_htable_get_elem_num(vlanid_ip_htable_handle) > 0) + { + if (MESA_htable_iterate(vlanid_ip_htable_handle, vlanid_ip_htable_iterate, NULL) == -1) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_iterate failed\n"); + } + } + } +} + int CHAR_INIT() { int demo_plugid = 51; @@ -1366,7 +1406,9 @@ int CHAR_INIT() runtime_log_handler = MESA_create_runtime_log_handle(tuple_log_path, log_level); kafka_log_handler = MESA_create_runtime_log_handle(kafka_log_path, log_level); - if (runtime_log_handler == NULL || kafka_log_handler == NULL) + kafka_stat_handler = MESA_create_runtime_log_handle(kafka_stat_path, log_level); + + if (runtime_log_handler == NULL || kafka_log_handler == NULL || kafka_stat_handler == NULL) { /* code */ printf("MESA_create_runtime_log_handle failed!!!"); @@ -1396,7 +1438,10 @@ int CHAR_INIT() /* FLOW_TYPE */ MESA_load_profile_uint_def(entrance_id_path, "SETTING", "FLOW_TYPE", &flow_type, (int)FLOW_TYPE_REFLUX); /* Brokers */ - // MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers),"#"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "BROKERS", brokers, sizeof(brokers), "#"); + /* TOPIC */ + MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_TRAFFIC_STATISTIC", topic_traffic_stat, sizeof(topic_traffic_stat), "#"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_SUM", topic_sum, sizeof(topic_sum), "#"); /* vx_ip_header_dst_ip */ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#"); if ('#' == str_tmp[0]) @@ -1431,11 +1476,38 @@ int CHAR_INIT() inet_pton(AF_INET, str_tmp, &sapp_keepalive_reflux_ip_net); /* kafka初始化*/ - // if (init_kafka(0, brokers, topic) != PRODUCER_INIT_SUCCESS) - // { - // MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name,"kafka init failed!!!"); - // return -1; - // } + if (init_kafka(0, brokers, topic_traffic_stat, topic_sum) != PRODUCER_INIT_SUCCESS) + { + MESA_handle_runtime_log(kafka_log_handler, RLOG_LV_INFO, module_name, "kafka init failed!!!"); + return -1; + } + /* vlanid_ip_htable init*/ + vlanid_ip_htable_handle = MESA_htable_born(); + if (vlanid_ip_htable_handle == NULL) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_born failed\n"); + return -1; + } + int thread_safe = 1; + MESA_htable_set_opt(vlanid_ip_htable_handle, MHO_THREAD_SAFE, &thread_safe, sizeof(int)); + if (MESA_htable_mature(vlanid_ip_htable_handle) < 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "MESA_htable_mature failed\n"); + return -1; + } + /* vlanid_ip_thread_work start */ + int vlanid_ip_thread; + vlanid_ip_thread = pthread_create(&vlanid_ip_pthread_t, NULL, (void *)vlanid_ip_thread_work, NULL); + if (vlanid_ip_thread == 0) + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "vlanid_ip thread start success\n"); + pthread_detach(vlanid_ip_pthread_t); + } + else + { + MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "vlanid_ip thread start failed\n"); + } + #if ENABLE_GSJ_THREAD //GSJ Work thread start int ret_thread; @@ -1479,10 +1551,11 @@ int CHAR_INIT() } #endif //发送缓冲区 - int i=0; - for(i; i<32; i++){ - thread_push_buffer[i] = (char *)calloc(1,PUSH_BUFFER_COUNT*MAX_TRAFFIC_INFO_LEN*sizeof(char)); - } + // int i = 0; + // for (i; i < 32; i++) + // { + // thread_push_buffer[i] = (char *)calloc(1, PUSH_BUFFER_COUNT * MAX_TRAFFIC_INFO_LEN * sizeof(char)); + // } // 函数实现自定义 // 只要求函数返回值为插件ID //printf("INIT SUCCESS!!!\n"); return demo_plugid; @@ -1492,7 +1565,7 @@ void LRJ_APP_DESTROY() { MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "TEST_APP_DESTORY in...\n"); printf("TEST_APP_DESTORY in...\n"); - // kafka_destroy(); + kafka_destroy(); if (runtime_log_handler == NULL) { printf("TEST_APP_DESTORY out...\n"); @@ -1500,10 +1573,14 @@ void LRJ_APP_DESTROY() } MESA_destroy_runtime_log_handle(runtime_log_handler); MESA_destroy_runtime_log_handle(kafka_log_handler); + + MESA_htable_destroy(vlanid_ip_htable_handle, NULL); + //free thread_push_buffer - int i=0; - for(i; i<32; i++){ - free(thread_push_buffer[i]); - } + // int i = 0; + // for (i; i < 32; i++) + // { + // free(thread_push_buffer[i]); + // } printf("TEST_APP_DESTORY out...\n"); } |
