summaryrefslogtreecommitdiff
path: root/src/tsg_send_log.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/tsg_send_log.cpp')
-rw-r--r--src/tsg_send_log.cpp126
1 files changed, 52 insertions, 74 deletions
diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp
index e21a822..3c6513a 100644
--- a/src/tsg_send_log.cpp
+++ b/src/tsg_send_log.cpp
@@ -19,7 +19,7 @@
#include "tsg_send_log.h"
#include "tsg_send_log_internal.h"
-char TSG_SEND_LOG_VERSION_20191129=0;
+char TSG_SEND_LOG_VERSION_20200119=0;
struct tsg_log_instance_t *g_tsg_log_instance;
@@ -238,7 +238,7 @@ int TLD_append_streaminfo(struct tsg_log_instance_t *instance, struct TLD_handle
return 0;
}
-int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t *service2topic)
+int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t **service2topic, int *max_service)
{
int i=0;
int ret=0,id=0;
@@ -246,7 +246,8 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
char line[1024]={0};
char field_name[64]={0};
char type_name[32]={0};
-
+ id2field_t *_service2topic=NULL;
+
fp=fopen(filename, "r");
if(fp==NULL)
{
@@ -282,9 +283,36 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
default:
if((strncasecmp("TOPIC", type_name, strlen("TOPIC")))==0)
{
- service2topic[id].type = TLD_TYPE_MAX;
- service2topic[id].id = id;
- memcpy(service2topic[id].name, field_name, strlen(field_name));
+ if(_service2topic==NULL)
+ {
+ _service2topic=(id2field_t *)calloc(1, sizeof(id2field_t)*(id+1));
+ _service2topic[id].type = TLD_TYPE_MAX;
+ _service2topic[id].id = id;
+ memcpy(_service2topic[id].name, field_name, strlen(field_name));
+
+ *max_service=id+1;
+ }
+ else
+ {
+ if(*max_service<=id)
+ {
+ _service2topic=(id2field_t *)realloc(_service2topic, sizeof(id2field_t)*(id+1));
+ memset(&_service2topic[id], 0, sizeof(id2field_t));
+ _service2topic[id].type = TLD_TYPE_MAX;
+ _service2topic[id].id = id;
+ memcpy(_service2topic[id].name, field_name, strlen(field_name));
+
+ *max_service=id+1;
+ }
+ else
+ {
+ memset(&_service2topic[id], 0, sizeof(id2field_t));
+ _service2topic[id].type = TLD_TYPE_MAX;
+ _service2topic[id].id = id;
+ memcpy(_service2topic[id].name, field_name, strlen(field_name));
+ }
+ }
+
}
break;
}
@@ -296,6 +324,8 @@ int load_log_common_field(const char *filename, id2field_t *id2field, id2field_t
fclose(fp);
fp=NULL;
+ *service2topic=_service2topic;
+
return 0;
}
@@ -314,7 +344,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
_instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t));
MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(level), 30);
- MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), NULL);
+ MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./tsglog/tsglog");
_instance->logger=MESA_create_runtime_log_handle(log_path, level);
if(_instance->logger==NULL)
@@ -358,23 +388,26 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "rd_kafka_brokers_add is error, broker_list: %s", _instance->broker_list);
return NULL;
}
-
- MESA_load_profile_int_def(conffile, "TSG_LOG", "MAX_SERVICE",&(_instance->max_service), 0);
- //(_instance->topic_rkt)=(rd_kafka_topic_t **)calloc(1, sizeof(void *));
- (_instance->topic_rkt)=(rd_kafka_topic_t **)calloc(1, (1+_instance->max_service)*sizeof(rd_kafka_topic_t*));
- _instance->service2topic=(id2field_t *)calloc(1, (1+_instance->max_service)*sizeof(id2field_t));
-
- load_log_common_field(_instance->common_field_file, _instance->id2field, _instance->service2topic);
+ load_log_common_field(_instance->common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service));
- for(i=0; i<_instance->max_service+1; i++)
+ if(_instance->service2topic!=NULL)
{
- if(_instance->service2topic[i].type==TLD_TYPE_MAX)
+ _instance->topic_rkt=(rd_kafka_topic_t **)calloc(1, (_instance->max_service)*sizeof(rd_kafka_topic_t*));
+
+ for(i=0; i<_instance->max_service+1; i++)
{
- topic_conf=rd_kafka_topic_conf_new();
- _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(kafka_handle, _instance->service2topic[i].name, topic_conf);
+ if(_instance->service2topic[i].type==TLD_TYPE_MAX)
+ {
+ topic_conf=rd_kafka_topic_conf_new();
+ _instance->topic_rkt[_instance->service2topic[i].id]=rd_kafka_topic_new(kafka_handle, _instance->service2topic[i].name, topic_conf);
+ }
}
}
+ else
+ {
+ MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "load_log_common_field is error, please check %s", _instance->common_field_file);
+ }
return _instance;
}
@@ -382,7 +415,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile)
int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id)
{
- int i=0,ret=0,status=0;
+ int i=0,status=0;
char *payload=NULL;
struct TLD_handle_t *_handle=handle;
struct tsg_log_instance_t *_instance=instance;
@@ -400,37 +433,10 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log.");
return 0;
}
-
- //TODO
- //common_user_tags
- //common_isp
- //common_app_label
- //common_app_id
- //common_protocol_id
- //common_has_dup_traffic
- //common_stream_error
TLD_append_streaminfo(instance, handle, log_msg->a_stream);
TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING);
-#if 0
- struct vxlan_info vinfo;
- int opt_val_len = sizeof(vinfo);
- status=MESA_get_stream_opt(log_msg->a_stream, MSO_STREAM_VXLAN_INFO, &vinfo, &opt_val_len);
- if(status < 0)
- {
- MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", "tsg log: get vxlan info error, tuple4: %s", printaddr(&log_msg->a_stream->addr, thread_id));
- }
- else
- {
- TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_LINK_ID].name, (void *)(long)vinfo.link_id, TLD_TYPE_LONG);
- TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_DIRECTION].name, (void *)(long)vinfo.link_dir, TLD_TYPE_LONG);
- TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(long)vinfo.dev_id, TLD_TYPE_LONG);
- TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ENTRANCE_ID].name, (void *)(long)vinfo.entrance_id, TLD_TYPE_LONG);
- TLD_append((TLD_handle_t)_handle, _instance->id2field[LOG_COMMON_ENCAPSULATION].name, (void *)(long)vinfo.encap_type, TLD_TYPE_LONG);
- }
-#endif
-
for(i=0;i<log_msg->result_num; i++)
{
switch(log_msg->result[i].do_log)
@@ -457,33 +463,6 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
TLD_append(_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(log_msg->result[i].service_id), TLD_TYPE_LONG);
TLD_append(_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)((unsigned char)log_msg->result[i].action), TLD_TYPE_LONG);
- if(log_msg->result[i].serv_def_len<128)
- {
- TLD_append(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name, (void *)(log_msg->result[i].service_defined), TLD_TYPE_STRING);
- }
- else
- {
- char *service_defined=(char *)calloc(1, log_msg->result[i].serv_def_len+1);
- ret=Maat_read_rule(g_tsg_maat_feather, &log_msg->result[i], MAAT_RULE_SERV_DEFINE, service_defined, log_msg->result[i].serv_def_len);
- if(ret==log_msg->result[i].serv_def_len)
- {
- TLD_append(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name, (void *)service_defined, TLD_TYPE_STRING);
- }
- else
- {
- MESA_handle_runtime_log(_instance->logger,
- RLOG_LV_FATAL,
- "TSG_SEND_LOG",
- "Fetch service_defined failed, policy_id: %d service: %d action: %d addr: %s",
- log_msg->result[i].config_id,
- log_msg->result[i].service_id,
- log_msg->result[i].action,
- printaddr(&log_msg->a_stream->addr, thread_id));
- }
- free((void *)service_defined);
- service_defined=NULL;
- }
-
payload = cJSON_PrintUnformatted(_handle->object);
status = rd_kafka_produce(_instance->topic_rkt[log_msg->result[i].service_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, payload, strlen(payload), NULL, 0, NULL);
@@ -506,7 +485,6 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl
TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name);
TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name);
TLD_delete(_handle, _instance->id2field[LOG_COMMON_ACTION].name);
- TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name);
FS_operate(g_tsg_para.fs2_handle, g_tsg_para.fs2_field_id[TSG_FS2_LOG], 0, FS_OP_ADD, 1);
}