diff options
| author | 杨玉波 <[email protected]> | 2023-09-01 08:50:24 +0000 |
|---|---|---|
| committer | 刘学利 <[email protected]> | 2023-09-01 08:50:24 +0000 |
| commit | 5c1e250c7a1ec5a77c520263048d6a0fbadec892 (patch) | |
| tree | 9d6777a3b31fa58ef1661db45341f2245e0043ae /src/tsg_send_log.cpp | |
| parent | fc4c49379f9cab40c801dcbbf7e012a348cd89f6 (diff) | |
TSG-15739:功能端支持输出IPFIX封装的UDP报文v6.1.5
Diffstat (limited to 'src/tsg_send_log.cpp')
| -rw-r--r-- | src/tsg_send_log.cpp | 134 |
1 files changed, 118 insertions, 16 deletions
diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index 9ea0a41..452fe0a 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -1045,7 +1045,7 @@ int TLD_cancel(struct TLD_handle_t *handle) tsg_stat_log_handle_update(LOG_HANDLE_FREE_CNT, 1); } - + free(handle); handle = NULL; } @@ -1106,7 +1106,6 @@ int TLD_append(struct TLD_handle_t *handle, char *key, void *value, TLD_TYPE typ abort(); default: return -1; - break; } tsg_stat_log_handle_update(LOG_HANDLE_APPEND_CNT, 1); @@ -1168,7 +1167,7 @@ struct TLD_handle_t *TLD_duplicate(struct TLD_handle_t *handle) struct TLD_handle_t *TLD_create(int thread_id) { - if(g_tsg_log_instance->mode==CLOSE) + if(g_tsg_log_instance->mode==CLOSE_SEND_MODE) { return NULL; } @@ -1180,7 +1179,7 @@ struct TLD_handle_t *TLD_create(int thread_id) _handle->document = new Document(_handle->valueAllocator); _handle->document->SetObject(); tsg_stat_log_handle_update(LOG_HANDLE_CREATE_CNT, 1); - + return _handle; } @@ -1987,6 +1986,37 @@ int log_common_fields_new(const char *filename, id2field_t *id2field, struct top return 0; } +static unsigned char tsg_send_mode_get(char *mode_str) +{ + if (mode_str == NULL) + { + return KAFKA_SEND_MODE; // kafka is defualt + } + + unsigned char mode = CLOSE_SEND_MODE; + if (strstr(mode_str, "close") != NULL) + { + return CLOSE_SEND_MODE; + } + + if (strstr(mode_str, "kafka") != NULL) + { + mode |= KAFKA_SEND_MODE; + } + + if (strstr(mode_str, "ipfix") != NULL) + { + mode |= IPFIX_SEND_MODE; + } + + if (mode == CLOSE_SEND_MODE) + { + return KAFKA_SEND_MODE; // kafka is defualt + } + + return mode; +} + struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) { char override_sled_ip[32]={0}; @@ -1997,6 +2027,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) struct tsg_log_instance_t *_instance=NULL; char common_field_file[128]={0}; char log_path[128]={0}; + char send_mode[128] = {0}; _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); @@ -2028,14 +2059,31 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile) return NULL; } - MESA_load_profile_int_def(conffile, "TSG_LOG", "MODE",&(_instance->mode), 0); - if(_instance->mode==CLOSE) + MESA_load_profile_string_def(conffile, "TSG_LOG", "MODE", send_mode, sizeof(send_mode), "kafka"); + _instance->mode = tsg_send_mode_get(send_mode); + + if(_instance->mode==CLOSE_SEND_MODE) { MASTER_LOG(_instance->logger, RLOG_LV_FATAL, LOG_MODULE_SENDLOG, "Disable tsg_send_log"); return _instance; } - MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); + if (_instance->mode&IPFIX_SEND_MODE) + { + char ipfix_conf_path[128] = {0}; + MESA_load_profile_string_def(conffile, "TSG_LOG", "IPFIX_EXPORTER_CONF", ipfix_conf_path, sizeof(ipfix_conf_path), "./tsgconf/ipfix_conf.json"); + _instance->ipfix_instance = ipfix_exporter_instance_init(ipfix_conf_path, _instance->logger, get_thread_count()); + if (_instance->ipfix_instance == NULL) + { + free(_instance); + _instance=NULL; + return NULL; + } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "IPFIX_TEMPLATE_INTERVAL_PKTS", &(_instance->ipfix_template_interval_pkts), 1000); + } + + MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL); MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL); @@ -2136,7 +2184,7 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) return ; } - if(instance->mode!=CLOSE) + if(instance->mode!=CLOSE_SEND_MODE) { for(int i=0; i<instance->max_service; i++) { @@ -2170,6 +2218,10 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) instance->service2topic=NULL; } + if (instance->mode&IPFIX_SEND_MODE) + { + ipfix_exporter_destroy(instance->ipfix_instance); + } MESA_destroy_runtime_log_handle(instance->logger); instance->logger=NULL; @@ -2179,6 +2231,48 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) return ; } +static int tsg_send_ipfix_message(struct TLD_handle_t *_handle, int thread_id) +{ + if (_handle == NULL) + { + return -1; + } + + if (ipfix_message_get_current_sequence() % g_tsg_log_instance->ipfix_template_interval_pkts == 0) + { + ipfix_message_template_send(g_tsg_log_instance->ipfix_instance, thread_id); + } + + Value::ConstMemberIterator schema_type = _handle->document->FindMember("common_schema_type"); + if (schema_type == _handle->document->MemberEnd()) + { + return -1; + } + + struct ipfix_message* message = ipfix_message_new(g_tsg_log_instance->ipfix_instance, schema_type->value.GetString()); + if (message == NULL) + { + return -1; + } + + for (rapidjson::Value::ConstMemberIterator iter = _handle->document->MemberBegin(); iter != _handle->document->MemberEnd(); ++iter) + { + if (iter->value.GetType() == rapidjson::kStringType) + { + ipfix_message_append(message, iter->name.GetString(), iter->name.GetStringLength(), (char *)iter->value.GetString(), iter->value.GetStringLength()); + } + else if (iter->value.GetType() == rapidjson::kNumberType) + { + int64_t value = iter->value.GetInt64(); + ipfix_message_append(message, iter->name.GetString(), iter->name.GetStringLength(), (char *)&(value), sizeof(int64_t)); + } + } + + ipfix_message_send(g_tsg_log_instance->ipfix_instance, message, (uint16_t)thread_id); + ipfix_message_free(message); + return 0; +} + int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, const struct streaminfo *a_stream, LOG_TYPE log_type, int thread_id) { int ret=update_percent(_instance, log_type, LOG_STATUS_DROP, thread_id); @@ -2191,13 +2285,21 @@ int send_log_by_type(struct tsg_log_instance_t *_instance, struct TLD_handle_t * (a_stream==NULL ? "" : printaddr(&(a_stream->addr), thread_id)) ); } - - StringBuffer sb(0, 2048); - Writer<StringBuffer> writer(sb); - _handle->document->Accept(writer); + + if (_instance->mode&KAFKA_SEND_MODE) + { + StringBuffer sb(0, 2048); + Writer<StringBuffer> writer(sb); + _handle->document->Accept(writer); - tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id); + tsg_send_payload(_instance, log_type, (char *)sb.GetString(), sb.GetSize(), thread_id); + } + if (_instance->mode&IPFIX_SEND_MODE && log_type == LOG_TYPE_SESSION_RECORD) + { + tsg_send_ipfix_message(_handle, thread_id); + } + return 0; } @@ -2320,7 +2422,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl return -1; } - if(_instance->mode==CLOSE) + if(_instance->mode==CLOSE_SEND_MODE) { TLD_cancel(_handle); tsg_stat_sendlog_update(_instance->sum_stat_row_id, LOG_STATUS_DROP, 1); @@ -2396,7 +2498,7 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name) { struct tsg_log_instance_t *_instance=(struct tsg_log_instance_t *)instance; - if(_instance==NULL || _instance->mode==CLOSE || topic_name==NULL || _instance->kafka_handle==NULL) + if(_instance==NULL || _instance->mode==CLOSE_SEND_MODE || topic_name==NULL || _instance->kafka_handle==NULL) { return -1; } @@ -2417,7 +2519,7 @@ int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *pa int status=0; struct tsg_log_instance_t *_instance=instance; - if(_instance==NULL || _instance->mode==CLOSE) + if(_instance==NULL || _instance->mode==CLOSE_SEND_MODE) { return 0; } |
