diff options
Diffstat (limited to 'lirenjie_vxlan_sapp_20200803.c')
| -rw-r--r-- | lirenjie_vxlan_sapp_20200803.c | 43 |
1 files changed, 29 insertions, 14 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c index 1f843d9..579bbb4 100644 --- a/lirenjie_vxlan_sapp_20200803.c +++ b/lirenjie_vxlan_sapp_20200803.c @@ -20,7 +20,8 @@ #include "rdkafka.h" #include "libGSJ.h" #include "MESA_htable.h" -int version_20190827_1605; +// int version_20190827_1605; +int version_20200911; static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT); @@ -31,7 +32,7 @@ extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order); extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action)); extern int g_business_plug_type; -#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803 +#define ENABLE_COUNT_THREAD 0 //是否启用统计功能.added by lrj at 20200803 #define ENABLE_GSJ_THREAD 0 //是否启用GSJ #define ENABLE_TIMER 0 #define ENABLE_KAFKA 1 //是否向G_BACK_TRAFFIC_STATISTIC_new发数据 @@ -93,6 +94,12 @@ char topic_traffic_stat[64]; char topic_sum[64]; // static char *topic1 = "G_BACK_TRAFFIC_STATISTIC_new"; // static char *topic2 = "Gsj_Sum"; +char kafka_statistics_interval_ms[9]; +char kafka_request_required_acks[2]; +char kafka_queue_buffering_max_messages[9]; +char kafka_broker_version_fallback[12]; +char kafka_api_version_request[6]; +char kafka_topic_metadata_refresh_interval_ms[8]; //GSJ thread pthread_t GSJ_Work; @@ -186,13 +193,13 @@ static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat, rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb); /* Quick termination */ snprintf(tmp, sizeof(tmp), "%i", SIGIO); - rd_kafka_conf_set(conf, "api.version.request", "false", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "broker.version.fallback", "0.9.0.1", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "api.version.request", kafka_api_version_request, errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "broker.version.fallback", kafka_broker_version_fallback, errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "statistics.interval.ms", kafka_statistics_interval_ms, errstr, sizeof(errstr)); rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0); - rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5000000", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr)); - rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "queue.buffering.max.messages", kafka_queue_buffering_max_messages, errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", kafka_topic_metadata_refresh_interval_ms, errstr, sizeof(errstr)); + rd_kafka_conf_set(conf, "request.required.acks", kafka_request_required_acks, errstr, sizeof(errstr)); /*topic configuration*/ topic_conf_traffic_stat = rd_kafka_topic_conf_new(); topic_conf_sum = rd_kafka_topic_conf_new(); @@ -664,7 +671,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," "\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u," "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," - "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -681,7 +688,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u," "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," "\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d," - "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", + "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -697,7 +704,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps "\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d," "\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0," "\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0," - "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-", + "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}", tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes, tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str, tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id, @@ -1407,9 +1414,9 @@ void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *use snprintf(info, 64, "hzsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data); } push_data_to_kafka(info, strlen(info), rkt_sum); - MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info); - vlanid_ip_pkt_num += *(UINT64 *)data; - vlanid_ip_push_count++; + // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info); + // vlanid_ip_pkt_num += *(UINT64 *)data; + // vlanid_ip_push_count++; *(UINT64 *)data = 0; free(info); } @@ -1480,6 +1487,14 @@ int CHAR_INIT() /* TOPIC */ MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_TRAFFIC_STATISTIC", topic_traffic_stat, sizeof(topic_traffic_stat), "#"); MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_SUM", topic_sum, sizeof(topic_sum), "#"); + /* kafka configure */ + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_api_version_request", kafka_api_version_request, sizeof(kafka_api_version_request), "false"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_broker_version_fallback", kafka_broker_version_fallback, sizeof(kafka_broker_version_fallback), "0.9.0.1"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_statistics_interval_ms", kafka_statistics_interval_ms, sizeof(kafka_statistics_interval_ms), "120"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_request_required_acks", kafka_request_required_acks, sizeof(kafka_request_required_acks), "0"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_queue_buffering_max_messages", kafka_queue_buffering_max_messages, sizeof(kafka_queue_buffering_max_messages), "5000000"); + MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_topic_metadata_refresh_interval_ms", kafka_topic_metadata_refresh_interval_ms, sizeof(kafka_topic_metadata_refresh_interval_ms), "600000"); + /* vx_ip_header_dst_ip */ MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#"); if ('#' == str_tmp[0]) |
