summaryrefslogtreecommitdiff
path: root/lirenjie_vxlan_sapp_20200803.c
diff options
context:
space:
mode:
Diffstat (limited to 'lirenjie_vxlan_sapp_20200803.c')
-rw-r--r--lirenjie_vxlan_sapp_20200803.c43
1 files changed, 29 insertions, 14 deletions
diff --git a/lirenjie_vxlan_sapp_20200803.c b/lirenjie_vxlan_sapp_20200803.c
index 1f843d9..579bbb4 100644
--- a/lirenjie_vxlan_sapp_20200803.c
+++ b/lirenjie_vxlan_sapp_20200803.c
@@ -20,7 +20,8 @@
#include "rdkafka.h"
#include "libGSJ.h"
#include "MESA_htable.h"
-int version_20190827_1605;
+// int version_20190827_1605;
+int version_20200911;
static char DEFAULT_RETURN_VALUE = (APP_STATE_GIVEME | APP_STATE_DROPPKT);
@@ -31,7 +32,7 @@ extern unsigned char vxlan_id_map_to_service_id(int vxlan_id_host_order);
extern int platform_register_action_judge(char (*action_cb_fun)(int net_conn_mode, char plug_action));
extern int g_business_plug_type;
-#define ENABLE_COUNT_THREAD 1 //是否启用统计功能.added by lrj at 20200803
+#define ENABLE_COUNT_THREAD 0 //是否启用统计功能.added by lrj at 20200803
#define ENABLE_GSJ_THREAD 0 //是否启用GSJ
#define ENABLE_TIMER 0
#define ENABLE_KAFKA 1 //是否向G_BACK_TRAFFIC_STATISTIC_new发数据
@@ -93,6 +94,12 @@ char topic_traffic_stat[64];
char topic_sum[64];
// static char *topic1 = "G_BACK_TRAFFIC_STATISTIC_new";
// static char *topic2 = "Gsj_Sum";
+char kafka_statistics_interval_ms[9];
+char kafka_request_required_acks[2];
+char kafka_queue_buffering_max_messages[9];
+char kafka_broker_version_fallback[12];
+char kafka_api_version_request[6];
+char kafka_topic_metadata_refresh_interval_ms[8];
//GSJ thread
pthread_t GSJ_Work;
@@ -186,13 +193,13 @@ static int init_kafka(int partition_, char *brokers_, char *topic_traffic_stat,
rd_kafka_conf_set_stats_cb(conf, kafka_stats_cb);
/* Quick termination */
snprintf(tmp, sizeof(tmp), "%i", SIGIO);
- rd_kafka_conf_set(conf, "api.version.request", "false", errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "broker.version.fallback", "0.9.0.1", errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "statistics.interval.ms", "60", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "api.version.request", kafka_api_version_request, errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "broker.version.fallback", kafka_broker_version_fallback, errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "statistics.interval.ms", kafka_statistics_interval_ms, errstr, sizeof(errstr));
rd_kafka_conf_set(conf, "internal.termination.signal", tmp, NULL, 0);
- rd_kafka_conf_set(conf, "queue.buffering.max.messages", "5000000", errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", "600000", errstr, sizeof(errstr));
- rd_kafka_conf_set(conf, "request.required.acks", "0", errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "queue.buffering.max.messages", kafka_queue_buffering_max_messages, errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "topic.metadata.refresh.interval.ms", kafka_topic_metadata_refresh_interval_ms, errstr, sizeof(errstr));
+ rd_kafka_conf_set(conf, "request.required.acks", kafka_request_required_acks, errstr, sizeof(errstr));
/*topic configuration*/
topic_conf_traffic_stat = rd_kafka_topic_conf_new();
topic_conf_sum = rd_kafka_topic_conf_new();
@@ -664,7 +671,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"%s\",\"ipv4_dst_ip\":\"%s\",\"ipv4_identification\":%d,\"ipv4_fragment_offset\":%u,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
- "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -681,7 +688,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%u,\"vx_udp_header_dst_port\":%u,\"vx_vlan_id\":%u,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"%s\",\"ipv6_dst_ip\":\"%s\",\"ipv6_bus_type\":\"0x%02X\",\"ipv6_flow_flag\":\"0x%05X\",\"ipv6_load_length\":%d,"
- "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
+ "\"ipv6_next_msg_head\":%d,\"ipv6_limit\":%d,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -697,7 +704,7 @@ static void print_traffic_info(struct traffic_info *tinfo, struct streaminfo *ps
"\"vx_udp_header_src_port\":%d,\"vx_udp_header_dst_port\":%d,\"vx_vlan_id\":%d,"
"\"ipv4_src_ip\":\"\",\"ipv4_dst_ip\":\"\",\"ipv4_identification\":0,\"ipv4_fragment_offset\":0,"
"\"ipv6_src_ip\":\"\",\"ipv6_dst_ip\":\"\",\"ipv6_bus_type\":\"\",\"ipv6_flow_flag\":\"\",\"ipv6_load_length\":0,"
- "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}-",
+ "\"ipv6_next_msg_head\":0,\"ipv6_limit\":0,\"flow_type\":%d,\"sendto_gdev_ip\":\"%s\"}",
tinfo->PROTO_TYPE, protocol, entrance_id, tinfo->C2S_pkt_num, tinfo->S2C_pkt_num, tinfo->C2S_bytes, tinfo->S2C_bytes,
tinfo->stat_time.tv_sec * 1000 + tinfo->stat_time.tv_usec / 1000, tinfo->vx_type, vxlan_sip_ip_str, vxlan_dip_ip_str,
tinfo->vx_UDP_header_src_port, tinfo->vx_UDP_header_dst_port, tinfo->vxlan_vpn_id,
@@ -1407,9 +1414,9 @@ void vlanid_ip_htable_iterate(const uchar *key, uint size, void *data, void *use
snprintf(info, 64, "hzsum#%s#%llu", vxlan_ip_key, *(UINT64 *)data);
}
push_data_to_kafka(info, strlen(info), rkt_sum);
- MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info);
- vlanid_ip_pkt_num += *(UINT64 *)data;
- vlanid_ip_push_count++;
+ // MESA_handle_runtime_log(runtime_log_handler, RLOG_LV_FATAL, module_name, "%s", info);
+ // vlanid_ip_pkt_num += *(UINT64 *)data;
+ // vlanid_ip_push_count++;
*(UINT64 *)data = 0;
free(info);
}
@@ -1480,6 +1487,14 @@ int CHAR_INIT()
/* TOPIC */
MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_TRAFFIC_STATISTIC", topic_traffic_stat, sizeof(topic_traffic_stat), "#");
MESA_load_profile_string_def(entrance_id_path, "SETTING", "TOPIC_SUM", topic_sum, sizeof(topic_sum), "#");
+ /* kafka configure */
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_api_version_request", kafka_api_version_request, sizeof(kafka_api_version_request), "false");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_broker_version_fallback", kafka_broker_version_fallback, sizeof(kafka_broker_version_fallback), "0.9.0.1");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_statistics_interval_ms", kafka_statistics_interval_ms, sizeof(kafka_statistics_interval_ms), "120");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_request_required_acks", kafka_request_required_acks, sizeof(kafka_request_required_acks), "0");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_queue_buffering_max_messages", kafka_queue_buffering_max_messages, sizeof(kafka_queue_buffering_max_messages), "5000000");
+ MESA_load_profile_string_def(entrance_id_path, "SETTING", "kafka_topic_metadata_refresh_interval_ms", kafka_topic_metadata_refresh_interval_ms, sizeof(kafka_topic_metadata_refresh_interval_ms), "600000");
+
/* vx_ip_header_dst_ip */
MESA_load_profile_string_def(gdev_conf_path, "Module", "sendto_gdev_ip", str_tmp, sizeof(str_tmp), "#");
if ('#' == str_tmp[0])