diff options
Diffstat (limited to 'src/tsg_send_log.cpp')
| -rw-r--r-- | src/tsg_send_log.cpp | 390 |
1 files changed, 197 insertions, 193 deletions
diff --git a/src/tsg_send_log.cpp b/src/tsg_send_log.cpp index bd654a7..4ef17b3 100644 --- a/src/tsg_send_log.cpp +++ b/src/tsg_send_log.cpp @@ -117,7 +117,7 @@ static int register_topic(struct tsg_log_instance_t *instance, struct topic_stat topic_conf=rd_kafka_topic_conf_new(); topic->status=1; - topic->topic_rkt=(rd_kafka_topic_t *)calloc(1, sizeof(rd_kafka_topic_t*)); + //topic->topic_rkt=(rd_kafka_topic_t *)calloc(1, sizeof(rd_kafka_topic_t*)); topic->topic_rkt=rd_kafka_topic_new(_instance->kafka_handle, topic->name, topic_conf); int thread_num=get_thread_count(); @@ -242,12 +242,9 @@ static int is_tunnels(struct streaminfo *a_stream) static int set_isn(struct TLD_handle_t *_handle, struct streaminfo *a_stream, char *field_name, enum MESA_stream_opt type) { - int ret=0; unsigned int isn=0; - int size=sizeof(unsigned long long); - - size=sizeof(unsigned int); - ret=MESA_get_stream_opt(a_stream, type, &isn, &size); + int size=sizeof(isn); + int ret=MESA_get_stream_opt(a_stream, type, &isn, &size); if(ret==0) { TLD_append(_handle, field_name, (void *)(long)isn, TLD_TYPE_LONG); @@ -282,7 +279,7 @@ static int set_tcp_isn(struct tsg_log_instance_t *_instance, struct TLD_handle_t static int set_linkinfo(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - const char *linkinfo=(const char *)stream_bridge_async_data_get(a_stream, _instance->bridge_id[LOG_BRIDGE_MAC_LINKINFO]); + const char *linkinfo=(const char *)tsg_get_xxx_from_bridge(a_stream, _instance->bridge_id[LOG_BRIDGE_MAC_LINKINFO]); if(linkinfo==NULL) { return 0; @@ -320,13 +317,13 @@ static int set_linkinfo(struct tsg_log_instance_t *_instance, struct TLD_handle_ } static int set_asn(struct TLD_handle_t *_handle, struct streaminfo *a_stream, char *field_name, struct asn_info *asn_info) -{ - int len=0; - char buff[1024]={0}; - int buff_len=sizeof(buff); - +{ if(asn_info!=NULL) { + int len=0; + char buff[1024]={0}; + int buff_len=sizeof(buff); + len+=string_cat(buff+len, buff_len-len, asn_info->asn_id); buff[len++]='('; len+=string_cat(buff+len, buff_len-len, asn_info->organization); @@ -339,24 +336,23 @@ static int set_asn(struct TLD_handle_t *_handle, struct streaminfo *a_stream, ch static int set_location(struct TLD_handle_t *_handle, struct streaminfo *a_stream, char *field_name, struct location_info *location_info) { - int len=0; - char buff[1024]={0}; - int buff_len=sizeof(buff); - if(location_info==NULL) { return 0; } - + + int len=0; + char buff[1024]={0}; + int buff_len=sizeof(buff); int location_type=tsg_get_location_type(); switch(location_type) { - case 18: + case 18: len+=string_cat(buff+len, buff_len-len, location_info->city_full); buff[len++]=','; len+=string_cat(buff+len, buff_len-len, location_info->province_full); buff[len++]=','; - len+=string_cat(buff+len, buff_len-len, location_info->country_full); + string_cat(buff+len, buff_len-len, location_info->country_full); break; case 19: len+=string_cat(buff+len, buff_len-len, location_info->country_full); @@ -368,7 +364,7 @@ static int set_location(struct TLD_handle_t *_handle, struct streaminfo *a_strea if(location_info->subdivision_addr!=NULL) { buff[len++]='.'; - len+=string_cat(buff+len, buff_len-len, location_info->subdivision_addr); + string_cat(buff+len, buff_len-len, location_info->subdivision_addr); } break; default: @@ -462,17 +458,16 @@ static int set_tuple4(struct tsg_log_instance_t *_instance, struct TLD_handle_t static int set_duraction(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - int ret=0; long common_con_duration_ms=0; - unsigned long long create_time=0,last_time=0; int size=sizeof(unsigned long long); + unsigned long long create_time=0,last_time=0; if(a_stream->ptcpdetail!=NULL) { TLD_append(_handle, _instance->id2field[LOG_COMMON_START_TIME].name, (void *)(a_stream->ptcpdetail->createtime), TLD_TYPE_LONG); TLD_append(_handle, _instance->id2field[LOG_COMMON_END_TIME].name, (void *)(a_stream->ptcpdetail->lastmtime), TLD_TYPE_LONG); - ret=MESA_get_stream_opt(a_stream, MSO_STREAM_CREATE_TIMESTAMP_MS, (void *)&create_time, &size); + int ret=MESA_get_stream_opt(a_stream, MSO_STREAM_CREATE_TIMESTAMP_MS, (void *)&create_time, &size); if(ret>=0) { ret=MESA_get_stream_opt(a_stream, MSO_STREAM_LASTUPDATE_TIMESTAMP_MS, (void *)&last_time, &size); @@ -631,19 +626,17 @@ static int set_app_identify_info(struct TLD_handle_t *_handle, char *field_name, static int get_app_id_list(Value *app_id_object, struct TLD_handle_t *_handle, const char *field_name, struct gather_app_result *result) { - int i=0,ret=0; - char app_name[512]={0}; - if(result->app_num==0) { return 0; } Value array(kArrayType); - for(i=0; i<result->app_num; i++) - { - Value object(kObjectType); - ret=tsg_app_id2name(result->attributes[i].app_id, app_name, sizeof(app_name), 1); + for(int i=0; i<result->app_num; i++) + { + char app_name[512]={0}; + Value object(kObjectType); + int ret=tsg_app_id2name(result->attributes[i].app_id, app_name, sizeof(app_name), 1); if(ret>0) { add_str_member(_handle, &object, "app_name", app_name); @@ -849,13 +842,10 @@ int set_app_info(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_han int set_app_id(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - char app_name[512]={0}; - - struct gather_app_result *gather_result=NULL; - - gather_result=(struct gather_app_result *)project_req_get_struct(a_stream, g_tsg_para.gather_app_project_id); + struct gather_app_result *gather_result=(struct gather_app_result *)tsg_get_xxx_from_bridge(a_stream, g_tsg_para.bridge[BRIDGE_TYPE_GATHER_APP_RESULT].id); if(gather_result==NULL) - { + { + char app_name[512]={0}; if(tsg_app_id2name(_instance->unknown_app_id, app_name, sizeof(app_name), 0)) { TLD_append(_handle, _instance->id2field[LOG_COMMON_APP_FULL_PATH].name, (void *)app_name, TLD_TYPE_STRING); @@ -1137,36 +1127,36 @@ int TLD_append(struct TLD_handle_t *handle, char *key, void *value, TLD_TYPE typ int TLD_array_append(struct TLD_handle_t *handle, char *key, void **array, int array_num, TLD_TYPE type) { - if(handle==NULL || key==NULL || array_num<=0 || array==NULL || type!=TLD_TYPE_LONG || type!=TLD_TYPE_STRING) - { - return -1; - } + if(handle==NULL || key==NULL || array_num<=0 || array==NULL || (type!=TLD_TYPE_LONG && type!=TLD_TYPE_STRING)) + { + return -1; + } - int i=0; - Value obj_array(kArrayType); - - switch(type) - { - case TLD_TYPE_LONG: - for(i=0; i<array_num; i++) - { - obj_array.PushBack((long)(array[i]), handle->document->GetAllocator()); - } - break; - case TLD_TYPE_STRING: - for(i=0; i<array_num; i++) - { - Value str_value(StringRef((char *)(array[i]), strlen((char *)array[i]))); - obj_array.PushBack(str_value, handle->document->GetAllocator()); - } - break; - default: - return -1; - } + int i=0; + Value obj_array(kArrayType); + + switch(type) + { + case TLD_TYPE_LONG: + for(i=0; i<array_num; i++) + { + obj_array.PushBack((long)(array[i]), handle->document->GetAllocator()); + } + break; + case TLD_TYPE_STRING: + for(i=0; i<array_num; i++) + { + Value str_value(StringRef((char *)(array[i]), strlen((char *)array[i]))); + obj_array.PushBack(str_value, handle->document->GetAllocator()); + } + break; + default: + return -1; + } - add_object_member(handle, handle->document, key, obj_array); + add_object_member(handle, handle->document, key, obj_array); - return 1; + return 1; } struct TLD_handle_t *TLD_duplicate(struct TLD_handle_t *handle) @@ -1221,7 +1211,7 @@ int TLD_convert_json(struct TLD_handle_t *_handle, char *buff, unsigned int buff static int set_mail_eml(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)stream_bridge_async_data_get(a_stream, g_tsg_para.bridge_id[BRIDGE_TYPE_RECV_CONN_SKETCH_DATA]); + struct tsg_conn_sketch_notify_data *notify_mail=(struct tsg_conn_sketch_notify_data *)tsg_get_xxx_from_bridge(a_stream, g_tsg_para.bridge[BRIDGE_TYPE_RECV_CONN_SKETCH_DATA].id); if(notify_mail!=NULL && notify_mail->pdata.mail_eml_filename!=NULL && notify_mail->protocol==PROTO_MAIL) { TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name); @@ -1235,7 +1225,7 @@ static int set_mail_eml(struct tsg_log_instance_t *_instance, struct TLD_handle_ static int set_s3_filename(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - struct business_notify_data *bnd_label=(struct business_notify_data *)stream_bridge_async_data_get(a_stream, _instance->bridge_id[LOG_BRIDGE_BUSINESS_S3_FILENAME]); + struct business_notify_data *bnd_label=(struct business_notify_data *)tsg_get_xxx_from_bridge(a_stream, _instance->bridge_id[LOG_BRIDGE_BUSINESS_S3_FILENAME]); if(bnd_label==NULL || bnd_label->pdata==NULL) { return 0; @@ -1338,7 +1328,7 @@ static int set_tunnel_ipv4v6_port(struct tsg_log_instance_t *_instance, struct T int set_shaping_rule_ids(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - struct notify_shaping_policy *shaping_label=(struct notify_shaping_policy *)stream_bridge_async_data_get(a_stream, g_tsg_para.bridge_id[BRIDGE_TYPE_NOTIFY_SHAPING_RESULT]); + struct notify_shaping_policy *shaping_label=(struct notify_shaping_policy *)tsg_get_xxx_from_bridge(a_stream, g_tsg_para.bridge[BRIDGE_TYPE_NOTIFY_SHAPING_RESULT].id); if(shaping_label==NULL) { return 0; @@ -1351,10 +1341,14 @@ int set_shaping_rule_ids(struct tsg_log_instance_t *_instance, struct TLD_handle offset+=snprintf(shaping_rule_ids+offset, sizeof(shaping_rule_ids)-offset, "%d,", shaping_label->shaping_result[i].config_id); } - shaping_rule_ids[offset-1]='\0'; - TLD_append(_handle, _instance->id2field[LOG_COMMON_SHAPING_RULE_IDS].name, (void *)shaping_rule_ids, TLD_TYPE_STRING); - - return 1; + if(offset>0) + { + shaping_rule_ids[offset-1]='\0'; + TLD_append(_handle, _instance->id2field[LOG_COMMON_SHAPING_RULE_IDS].name, (void *)shaping_rule_ids, TLD_TYPE_STRING); + return 1; + } + + return 0; } static int set_common_tunnels(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) @@ -1445,7 +1439,7 @@ static int set_common_tunnels(struct tsg_log_instance_t *_instance, struct TLD_h break; } - ptmp = pfather;; + ptmp=pfather; tunnel_array.PushBack(tunnel_object, _handle->document->GetAllocator()); } @@ -1499,7 +1493,6 @@ int is_multi_hit_same_policy(struct Maat_rule_t *result, int *policy_id, int *po static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_log_instance_t *_instance, struct Maat_rule_t *p_result, int thread_seq) { - int ret=0; cJSON *item=NULL; cJSON *object=NULL; char *user_region=NULL; @@ -1511,7 +1504,7 @@ static int set_xxxx_from_user_region(struct TLD_handle_t *_handle, struct tsg_lo if(p_result->action!=TSG_ACTION_NONE && p_result->serv_def_len>0) { user_region=(char *)dictator_malloc(thread_seq, p_result->serv_def_len+1); - ret=Maat_read_rule(g_tsg_maat_feather, p_result, MAAT_RULE_SERV_DEFINE, user_region, p_result->serv_def_len+1); + int ret=Maat_read_rule(g_tsg_maat_feather, p_result, MAAT_RULE_SERV_DEFINE, user_region, p_result->serv_def_len+1); if(ret==p_result->serv_def_len) { user_region[p_result->serv_def_len]='\0'; @@ -1567,7 +1560,7 @@ int set_application_behavior(struct tsg_log_instance_t *_instance, struct TLD_ha } struct application_behavior *behavior_result=NULL; - behavior_result=(struct application_behavior *)stream_bridge_async_data_get(a_stream, _instance->bridge_id[LOG_BRIDGE_APP_BEHAVIOR_RESULT]); + behavior_result=(struct application_behavior *)tsg_get_xxx_from_bridge(a_stream, _instance->bridge_id[LOG_BRIDGE_APP_BEHAVIOR_RESULT]); if(behavior_result==NULL) { return 0; @@ -1587,7 +1580,7 @@ int set_notify_execution_result(struct tsg_log_instance_t *_instance, struct TLD int i=0; struct tsg_notify_execution_result *execution_result=NULL; - execution_result=(struct tsg_notify_execution_result *)stream_bridge_async_data_get(a_stream, _instance->bridge_id[LOG_BRIDGE_CONN_SKETCH_EXEC_RESULT]); + execution_result=(struct tsg_notify_execution_result *)tsg_get_xxx_from_bridge(a_stream, _instance->bridge_id[LOG_BRIDGE_CONN_SKETCH_EXEC_RESULT]); if(execution_result==NULL) { return 0; @@ -1681,13 +1674,12 @@ int set_session_attributes(struct tsg_log_instance_t *_instance, struct TLD_hand int set_lua_scripts_result(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream) { - int i=0; - struct user_defined_attribute_label *uda_label=(struct user_defined_attribute_label *)stream_bridge_async_data_get(a_stream, _instance->bridge_id[LOG_BRIDGE_APP_LUA_RESULT]); + struct user_defined_attribute_label *uda_label=(struct user_defined_attribute_label *)tsg_get_xxx_from_bridge(a_stream, _instance->bridge_id[LOG_BRIDGE_APP_LUA_RESULT]); if(uda_label!=NULL) { Value array(kArrayType); - for(i=0; i<uda_label->attribute_num; i++) + for(int i=0; i<uda_label->attribute_num; i++) { Value object(kObjectType); switch(uda_label->attribute[i].type) @@ -1715,20 +1707,19 @@ int set_lua_scripts_result(struct tsg_log_instance_t *_instance, struct TLD_hand int TLD_append_streaminfo(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, struct streaminfo *a_stream) { - int ret=0; - char *addr_proto=NULL; - char stream_id_buff[128]={0}; - unsigned long long stream_id=0; - struct TLD_handle_t *_handle=handle; - struct tsg_log_instance_t *_instance=instance; - - if(_instance==NULL || _handle==NULL || a_stream==NULL) - { - MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TLD_APPEND_STREAM", "instance==NULL || TLD_handle==NULL || addr==NULL"); + if(instance==NULL || handle==NULL || a_stream==NULL) + { + if(instance) + { + MESA_handle_runtime_log(instance->logger, RLOG_LV_DEBUG, "TLD_APPEND_STREAM", "TLD_handle==NULL || addr==NULL"); + } return -1; } + + struct TLD_handle_t *_handle=handle; + struct tsg_log_instance_t *_instance=instance; - ret=set_linkinfo(_instance, _handle, a_stream); + int ret=set_linkinfo(_instance, _handle, a_stream); if(ret==0) { set_direction(_instance, _handle, a_stream); @@ -1748,11 +1739,12 @@ int TLD_append_streaminfo(struct tsg_log_instance_t *instance, struct TLD_handle set_common_tunnels(_instance, _handle, a_stream); } - stream_id=tsg_get_stream_id(a_stream); + unsigned long long stream_id=tsg_get_stream_id(a_stream); + char stream_id_buff[128]={0}; snprintf(stream_id_buff, sizeof(stream_id_buff), "%llu", stream_id); TLD_append(_handle, _instance->id2field[LOG_COMMON_STREAM_TRACE_ID].name, (void *)stream_id_buff, TLD_TYPE_STRING); - addr_proto=(char *)layer_addr_prefix_ntop(a_stream); + char *addr_proto=(char *)layer_addr_prefix_ntop(a_stream); TLD_append(_handle, _instance->id2field[LOG_COMMON_L4_PROTOCOL].name, (void *)addr_proto, TLD_TYPE_STRING); return 0; @@ -1784,7 +1776,7 @@ int load_log_common_field(const char *filename, id2field_t *id2field, struct top continue; } memset(type_name, 0, sizeof(type_name)); - ret=sscanf(line, "%s %s %d", type_name, field_name, &id); + ret=sscanf(line, "%31s %63s %d", type_name, field_name, &id); assert(ret==3); for(i=0; i<TLD_TYPE_MAX; i++) @@ -1863,19 +1855,19 @@ int load_log_common_field(const char *filename, id2field_t *id2field, struct top struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_handle_t fs2_handle) { - int i=0,ret=0; - char nic_name[32]={0}; char override_sled_ip[32]={0}; char kafka_errstr[1024]={0}; unsigned int local_ip_nr=0; char bridge_name[LOG_BRIDGE_MAX][128]={0}; - rd_kafka_conf_t *rdkafka_conf = NULL; + rd_kafka_conf_t *rdkafka_conf = NULL; + char broker_list[1024]={0}; struct tsg_log_instance_t *_instance=NULL; - + char common_field_file[128]={0}; + char log_path[128]={0}; _instance=(struct tsg_log_instance_t *)calloc(1, sizeof(struct tsg_log_instance_t)); _instance->fs2_handle=fs2_handle; - for(i=0; i<LOG_FS2_TYPE_MAX; i++) + for(int i=0; i<LOG_FS2_TYPE_MAX; i++) { _instance->fs2_field_id[i]=FS_register(_instance->fs2_handle, FS_STYLE_FIELD, FS_CALC_SPEED, g_log_fs2_field[i].name); } @@ -1891,7 +1883,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha _instance->sum_line_id=FS_register(_instance->fs2_handle, FS_STYLE_LINE, FS_CALC_SPEED, "SUM"); MESA_load_profile_int_def(conffile, "TSG_LOG", "LOG_LEVEL",&(_instance->level), 30); - MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", _instance->log_path, sizeof(_instance->log_path), "./tsglog/tsglog"); + MESA_load_profile_string_def(conffile, "TSG_LOG", "LOG_PATH", log_path, sizeof(log_path), "./log/tsglog"); MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_USER_REGION", &(_instance->send_user_region), 0); MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_DATA_CENTER_SWITCH", &(_instance->send_data_center), 0); MESA_load_profile_int_def(conffile, "TSG_LOG", "SEND_APP_ID_SWITCH", &(_instance->send_app_id), 0); @@ -1909,7 +1901,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha MESA_load_profile_string_def(conffile, "SYSTEM", "APP_BEHAVIOR_BRIDGE_NAME", bridge_name[LOG_BRIDGE_APP_BEHAVIOR_RESULT], sizeof(bridge_name[LOG_BRIDGE_APP_BEHAVIOR_RESULT]), "TSG_APPLICATION_BEHAVIOR"); MESA_load_profile_string_def(conffile, "SYSTEM", "NOTIFY_EXEC_RESULT_BRIDGE_NAME", bridge_name[LOG_BRIDGE_CONN_SKETCH_EXEC_RESULT], sizeof(bridge_name[LOG_BRIDGE_CONN_SKETCH_EXEC_RESULT]), "TSG_NOTIFICATION_EXECUTION_RESULT"); - for(i=0; i<LOG_BRIDGE_MAX; i++) + for(int i=0; i<LOG_BRIDGE_MAX; i++) { _instance->bridge_id[i]=stream_bridge_build(bridge_name[i], "w"); if(_instance->bridge_id[i]<0) @@ -1918,10 +1910,10 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha } } - _instance->logger=MESA_create_runtime_log_handle(_instance->log_path, _instance->level); + _instance->logger=MESA_create_runtime_log_handle(log_path, _instance->level); if(_instance->logger==NULL) { - printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", _instance->log_path, _instance->level); + printf("MESA_create_runtime_log_handle failed ..., path: %s level: %d", log_path, _instance->level); return NULL; } @@ -1934,8 +1926,8 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha MESA_load_profile_int_def(conffile, "TSG_LOG", "RECOVERY_INTERVEL_S", &(_instance->recovery_interval), 30); - MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", _instance->common_field_file, sizeof(_instance->common_field_file), NULL); - MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", _instance->broker_list, sizeof(_instance->broker_list), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "COMMON_FIELD_FILE", common_field_file, sizeof(common_field_file), NULL); + MESA_load_profile_string_def(conffile, "TSG_LOG", "BROKER_LIST", broker_list, sizeof(broker_list), NULL); MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_USERNAME", _instance->sasl_username, sizeof(_instance->sasl_username), ""); //admin MESA_load_profile_string_def(conffile, "TSG_LOG", "SASL_PASSWD", _instance->sasl_passwd, sizeof(_instance->sasl_passwd), ""); @@ -1963,8 +1955,9 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha char *sled_ip=getenv(override_sled_ip); if(sled_ip==NULL) { + char nic_name[32]={0}; MESA_load_profile_string_def(conffile, "SYSTEM", "NIC_NAME", nic_name, sizeof(nic_name), "lo"); - ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr); + int ret=MESA_get_dev_ipv4(nic_name, (int *)&local_ip_nr); if(ret<0) { MESA_handle_runtime_log(_instance->logger, @@ -1987,7 +1980,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha rd_kafka_conf_set(rdkafka_conf, "topic.metadata.refresh.interval.ms", _instance->refresh_interval_ms, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "request.required.acks", _instance->require_ack, kafka_errstr, sizeof(kafka_errstr)); rd_kafka_conf_set(rdkafka_conf, "socket.keepalive.enable", "true", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", _instance->broker_list, kafka_errstr, sizeof(kafka_errstr)); + rd_kafka_conf_set(rdkafka_conf, "bootstrap.servers", broker_list, kafka_errstr, sizeof(kafka_errstr)); if(strlen(_instance->sasl_username)> 0 && strlen(_instance->sasl_passwd)>0) { @@ -2003,11 +1996,11 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha return NULL; } - load_log_common_field(_instance->common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service)); + load_log_common_field(common_field_file, _instance->id2field, &(_instance->service2topic), &(_instance->max_service)); if(_instance->service2topic!=NULL) { - for(i=0; i<_instance->max_service; i++) + for(int i=0; i<_instance->max_service; i++) { if(_instance->service2topic[i].type==TLD_TYPE_MAX && strlen(_instance->service2topic[i].name)>0) { @@ -2022,12 +2015,7 @@ struct tsg_log_instance_t *tsg_sendlog_init(const char *conffile, screen_stat_ha } else { - MESA_handle_runtime_log(_instance->logger, - RLOG_LV_FATAL, - "KAFKA_INIT", - "load_log_common_field is error, please check %s", - _instance->common_field_file - ); + MESA_handle_runtime_log(_instance->logger, RLOG_LV_FATAL, "KAFKA_INIT", "load_log_common_field is error, please check %s", common_field_file); } return _instance; @@ -2096,96 +2084,52 @@ void tsg_sendlog_destroy(struct tsg_log_instance_t * instance) return ; } -int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id) +int send_log(struct tsg_log_instance_t *_instance, struct TLD_handle_t *_handle, struct streaminfo *a_stream, struct Maat_rule_t *p_result, int p_result_num, int thread_id) { - int fs_id=0,ret=0; - int i=0,repeat_cnt=0; + int fs_id=0,ret=0,repeat_cnt=0; int policy_id[MAX_RESULT_NUM]={0}; - struct TLD_handle_t *_handle=handle; - struct tsg_log_instance_t *_instance=instance; - - if(_instance==NULL || _handle==NULL || log_msg==NULL) - { - TLD_cancel(handle); - MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", " instance==NULL || TLD_handle==NULL || log_msg==NULL "); - return -1; - } - if(_instance->mode==CLOSE) + for(int i=0;i<p_result_num; i++) { - TLD_cancel(handle); - FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_field_id[LOG_COLUMN_STATUS_DROP], FS_OP_ADD, 1); - MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log."); - return 0; - } - - TLD_append_streaminfo(instance, handle, log_msg->a_stream); - TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING); - if(strlen(g_tsg_para.device_sn)>0) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING); - } - - if(strlen(g_tsg_para.data_center)>0 && _instance->send_data_center==1) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)(g_tsg_para.data_center), TLD_TYPE_STRING); - } - - if(strlen(g_tsg_para.device_tag)>0) - { - TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)(g_tsg_para.device_tag), TLD_TYPE_STRING); - } - - TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); - - set_application_behavior(_instance, _handle, log_msg->a_stream); - - if(log_msg->result[i].service_id==2 && log_msg->a_stream!=NULL) // stream of intercept is NULL - { - set_shaping_rule_ids(_instance, _handle, log_msg->a_stream); - } - - for(i=0;i<log_msg->result_num; i++) - { - if(is_multi_hit_same_policy(&(log_msg->result[i]), policy_id, &repeat_cnt)) + if(is_multi_hit_same_policy(&(p_result[i]), policy_id, &repeat_cnt)) { MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", "tsg same log:cfg_id=%d service=%d addr=%s", - log_msg->result[i].config_id, - log_msg->result[i].service_id, - (log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level)) + p_result[i].config_id, + p_result[i].service_id, + (a_stream==NULL ? "" : PRINTADDR(a_stream,_instance->level)) ); continue; } - switch(log_msg->result[i].do_log) + switch(p_result[i].do_log) { case LOG_ABORT: MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", "tsg abort log:cfg_id=%d service=%d addr=%s", - log_msg->result[i].config_id, - log_msg->result[i].service_id, - (log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level)) + p_result[i].config_id, + p_result[i].service_id, + (a_stream==NULL ? "" : PRINTADDR(a_stream,_instance->level)) ); - fs_id=action2fs_id((int)log_msg->result[i].action); + fs_id=action2fs_id((int)p_result[i].action); FS_operate(_instance->fs2_handle, _instance->fs2_field_id[fs_id], 0, FS_OP_ADD, 1); continue; break; case LOG_ALL: - if(log_msg->result[i].action==TSG_ACTION_MONITOR) + if(p_result[i].action==TSG_ACTION_MONITOR) { - set_s3_filename(_instance, _handle, log_msg->a_stream); - set_mail_eml(_instance, _handle, log_msg->a_stream); + set_s3_filename(_instance, _handle, a_stream); + set_mail_eml(_instance, _handle, a_stream); } break; case LOG_NOFILE: - if(log_msg->result[i].action==TSG_ACTION_MONITOR) + if(p_result[i].action==TSG_ACTION_MONITOR) { TLD_delete(_handle, _instance->id2field[LOG_COMMON_MAIL_EML_FILE].name); - TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name); + TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_REQUEST_S3_FILE].name); TLD_delete(_handle, _instance->id2field[LOG_COMMON_HTTP_RESPONSE_S3_FILE].name); } break; @@ -2193,39 +2137,39 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl break; } - ret=update_percent(_instance, log_msg->result[i].service_id, LOG_COLUMN_STATUS_DROP, thread_id); + ret=update_percent(_instance, p_result[i].service_id, LOG_COLUMN_STATUS_DROP, thread_id); if(ret==1) { MESA_handle_runtime_log(_instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", "tsg drop log:cfg_id=%d service=%d send_log_percent: %d addr=%s", - log_msg->result[i].config_id, - log_msg->result[i].service_id, - _instance->service2topic[log_msg->result[i].service_id].send_log_percent[thread_id], - (log_msg->a_stream==NULL ? "" : PRINTADDR(log_msg->a_stream,_instance->level)) + p_result[i].config_id, + p_result[i].service_id, + _instance->service2topic[p_result[i].service_id].send_log_percent[thread_id], + (a_stream==NULL ? "" : PRINTADDR(a_stream,_instance->level)) ); continue; } - TLD_append(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(log_msg->result[i].config_id), TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(log_msg->result[i].service_id), TLD_TYPE_LONG); - TLD_append(_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)((unsigned char)log_msg->result[i].action), TLD_TYPE_LONG); + TLD_append(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name, (void *)(long)(p_result[i].config_id), TLD_TYPE_LONG); + TLD_append(_handle, _instance->id2field[LOG_COMMON_SERVICE].name, (void *)(long)(p_result[i].service_id), TLD_TYPE_LONG); + TLD_append(_handle, _instance->id2field[LOG_COMMON_ACTION].name, (void *)(long)((unsigned char)p_result[i].action), TLD_TYPE_LONG); - set_notify_execution_result(_instance, _handle, log_msg->a_stream, &(log_msg->result[i])); + set_notify_execution_result(_instance, _handle, a_stream, &(p_result[i])); - if(_instance->send_nat_linkinfo && log_msg->result[i].config_id==0 && log_msg->a_stream!=NULL) + if(_instance->send_nat_linkinfo &&p_result[i].config_id==0 && a_stream!=NULL) { - set_nat_linkinfo(_instance, _handle, log_msg->a_stream, _instance->id2field[LOG_COMMON_LINK_INFO_C2S].name, _instance->bridge_id[LOG_BRIDGE_NAT_C2S_LINKINFO]); - set_nat_linkinfo(_instance, _handle, log_msg->a_stream, _instance->id2field[LOG_COMMON_LINK_INFO_S2C].name, _instance->bridge_id[LOG_BRIDGE_NAT_S2C_LINKINFO]); + set_nat_linkinfo(_instance, _handle, a_stream, _instance->id2field[LOG_COMMON_LINK_INFO_C2S].name, _instance->bridge_id[LOG_BRIDGE_NAT_C2S_LINKINFO]); + set_nat_linkinfo(_instance, _handle, a_stream, _instance->id2field[LOG_COMMON_LINK_INFO_S2C].name, _instance->bridge_id[LOG_BRIDGE_NAT_S2C_LINKINFO]); } - set_xxxx_from_user_region(_handle, _instance, &(log_msg->result[i]), thread_id); + set_xxxx_from_user_region(_handle, _instance, &(p_result[i]), thread_id); StringBuffer sb(0, 2048); Writer<StringBuffer> writer(sb); _handle->document->Accept(writer); - tsg_send_payload(_instance, log_msg->result[i].service_id, (char *)sb.GetString(), sb.GetSize(), thread_id); + tsg_send_payload(_instance, p_result[i].service_id, (char *)sb.GetString(), sb.GetSize(), thread_id); TLD_delete(_handle, _instance->id2field[LOG_COMMON_POLICY_ID].name); TLD_delete(_handle, _instance->id2field[LOG_COMMON_SERVICE].name); @@ -2233,6 +2177,69 @@ int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handl TLD_delete(_handle, _instance->id2field[LOG_COMMON_USER_REGION].name); } + return 0; +} + +int tsg_send_log(struct tsg_log_instance_t *instance, struct TLD_handle_t *handle, tsg_log_t *log_msg, int thread_id) +{ + if(instance==NULL || handle==NULL || log_msg==NULL) + { + TLD_cancel(handle); + if(instance!=NULL) + { + MESA_handle_runtime_log(instance->logger, RLOG_LV_DEBUG, "TSG_SEND_LOG", " instance==NULL || TLD_handle==NULL || log_msg==NULL "); + } + return -1; + } + + struct TLD_handle_t *_handle=handle; + struct tsg_log_instance_t *_instance=instance; + + if(_instance->mode==CLOSE) + { + TLD_cancel(handle); + FS_operate(_instance->fs2_handle, _instance->sum_line_id, _instance->fs2_field_id[LOG_COLUMN_STATUS_DROP], FS_OP_ADD, 1); + MESA_handle_runtime_log(_instance->logger, RLOG_LV_INFO, "TSG_SEND_LOG", "Disable tsg_send_log."); + return 0; + } + + TLD_append_streaminfo(instance, handle, log_msg->a_stream); + TLD_append(_handle, _instance->id2field[LOG_COMMON_SLED_IP].name, (void *)(_instance->local_ip_str), TLD_TYPE_STRING); + if(strlen(g_tsg_para.device_sn)>0) + { + TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_ID].name, (void *)(g_tsg_para.device_sn), TLD_TYPE_STRING); + } + + if(strlen(g_tsg_para.data_center)>0 && _instance->send_data_center==1) + { + TLD_append(_handle, _instance->id2field[LOG_COMMON_DATA_CENTER].name, (void *)(g_tsg_para.data_center), TLD_TYPE_STRING); + } + + if(strlen(g_tsg_para.device_tag)>0) + { + TLD_append(_handle, _instance->id2field[LOG_COMMON_DEVICE_TAG].name, (void *)(g_tsg_para.device_tag), TLD_TYPE_STRING); + } + + TLD_append(_handle, _instance->id2field[LOG_COMMON_TRAFFIC_VSYSTEM_ID].name, (void *)(long)_instance->vsystem_id, TLD_TYPE_LONG); + + set_application_behavior(_instance, _handle, log_msg->a_stream); + + if(log_msg->result[0].service_id==2 && log_msg->a_stream!=NULL) // stream of intercept is NULL + { + set_shaping_rule_ids(_instance, _handle, log_msg->a_stream); + } + + send_log(_instance, _handle, log_msg->a_stream, log_msg->result, log_msg->result_num, thread_id); + + //fetch firewall result + struct policy_priority_label *priority_label=(struct policy_priority_label *)tsg_get_xxx_from_bridge(log_msg->a_stream, g_tsg_para.bridge[BRIDGE_TYPE_POLICY_PRIORITY].id); + if(priority_label!=NULL && priority_label->security_result_num>0) + { + send_log(_instance, _handle, log_msg->a_stream, priority_label->security_result, priority_label->security_result_num, thread_id); + free_policy_label(log_msg->a_stream, g_tsg_para.bridge[BRIDGE_TYPE_POLICY_PRIORITY].id, (void *)priority_label); + tsg_set_xxx_to_bridge(log_msg->a_stream, g_tsg_para.bridge[BRIDGE_TYPE_POLICY_PRIORITY].id, NULL); + } + TLD_cancel(handle); return 0; @@ -2248,8 +2255,8 @@ int tsg_register_topic(struct tsg_log_instance_t *instance, char *topic_name) _instance->service2topic=(struct topic_stat *)realloc(_instance->service2topic, (_instance->max_service+1)*sizeof(struct topic_stat)); _instance->service2topic[_instance->max_service].type=TLD_TYPE_MAX; - memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN); - memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN-1, strlen(topic_name))); + memset(_instance->service2topic[_instance->max_service].name, 0, MAX_STRING_LEN32); + memcpy(_instance->service2topic[_instance->max_service].name, topic_name, MIN(MAX_STRING_LEN32-1, strlen(topic_name))); register_topic(_instance, &(_instance->service2topic[_instance->max_service])); _instance->max_service++; @@ -2308,9 +2315,6 @@ int tsg_send_payload(struct tsg_log_instance_t *instance, int topic_id, char *pa ); } - - - update_percent(_instance, topic_id, LOG_COLUMN_STATUS_MAX, thread_id); return 0; |
