diff options
| author | fengweihao <[email protected]> | 2022-09-09 10:44:11 +0800 |
|---|---|---|
| committer | fengweihao <[email protected]> | 2022-09-09 10:44:11 +0800 |
| commit | b321486e3fbd27d9e061c607e10c5a4bc5a32e33 (patch) | |
| tree | c5b059d2f97c7b4fc3f8936e3303a97c14cf275d /plugin/business/tsg-http/src/tsg_logger.cpp | |
| parent | e52bafad6bfd6916a1019af562b310bd8b2a8d85 (diff) | |
TSG-11849 tfe增加从环境变量中读入处理机ip
TSG-11742 IP Libraries统一使用.分隔地理层级
TSG-10722 日志中开始时间从解析层获取
Diffstat (limited to 'plugin/business/tsg-http/src/tsg_logger.cpp')
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_logger.cpp | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp new file mode 100644 index 0000000..2d0b5aa --- /dev/null +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -0,0 +1,397 @@ +#include <cjson/cJSON.h> +#include <MESA/MESA_prof_load.h> +#include <tfe_kafka_logger.h> +#include <cache_evbase_client.h> +#include <tfe_utils.h> +#include <tfe_resource.h> + +#include "tsg_proxy_logger.h" + +struct json_spec +{ + const char *log_filed_name; + enum tfe_http_std_field field_id; +}; +struct proxy_logger +{ + int entry_id; + unsigned int en_hoslog; + unsigned int en_sendlog; + const char *device_id; + const char *effective_device_tag; + void* local_logger; + + unsigned long long send_cnt; + unsigned long long random_drop; + unsigned long long user_abort; + char local_log_path[TFE_STRING_MAX]; + tfe_kafka_logger_t *kafka_logger; + struct cache_evbase_instance * log_file_upload_instance; +}; + +enum _log_action //Bigger action number is prior. +{ + LG_ACTION_NONE = 0x00, + LG_ACTION_MONIT = 0x01, + LG_ACTION_FORWARD = 0x02, /* N/A */ + LG_ACTION_REJECT = 0x10, + LG_ACTION_DROP = 0x20, /* N/A */ + LG_ACTION_MANIPULATE = 0x30, + LG_ACTION_RATELIMIT = 0x40, /* N/A */ + LG_ACTION_LOOP = 0x60, /* N/A */ + LG_ACTION_WHITELIST = 0x80, + __LG_ACTION_MAX +}; + +struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) +{ + struct tango_cache_parameter *log_file_upload_para=NULL; + struct proxy_logger* instance=ALLOC(struct proxy_logger,1); + instance->local_logger=local_logger; + + TFE_LOG_INFO(local_logger,"Tsg-Pxy log is inititating from %s section %s.", profile, section); + MESA_load_profile_int_def(profile, section, "ENTRANCE_ID",&(instance->entry_id),0); + MESA_load_profile_uint_def(profile, section, "en_hoslog", &instance->en_hoslog, 1); + MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); + TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); + + if (!instance->en_sendlog) + { + return instance; + } + + instance->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); + instance->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG); + instance->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); + if (instance->kafka_logger && !instance->kafka_logger->enable) + { + TFE_LOG_ERROR(local_logger, "Tsg-Pxy sendlog ENABLE, but tfe kafka logger DISABLED."); + goto error_out; + } + + if(instance->en_hoslog==1) + { + log_file_upload_para=cache_evbase_parameter_new(profile, section, local_logger); + if (log_file_upload_para == NULL) + { + TFE_LOG_ERROR(local_logger, "Tsg-Pxy failed to new cache evbase parameter."); + goto error_out; + } + instance->log_file_upload_instance=cache_evbase_instance_new(log_file_upload_para, local_logger); + } + return instance; + +error_out: + free(instance); + return NULL; +} + +static unsigned int proxy_log_get_fqdn_cat(struct tfe_cmsg *cmsg, unsigned int *category_id_val) +{ + int ret=0; + unsigned category_id_num=0; + uint16_t opt_out_size; + + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_FQDN_CAT_ID_NUM, (unsigned char *)&category_id_num, sizeof(category_id_num), &opt_out_size); + if (ret != 0 || category_id_num == 0) + { + return -1; + } + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_FQDN_CAT_ID_VAL, (unsigned char *)category_id_val, sizeof(category_id_val), &opt_out_size); + if (ret != 0) + { + return -1; + } + + return category_id_num > 8 ? 8 : category_id_num; +} + +int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) +{ + const struct tfe_http_session* http=log_msg->http; + const struct tfe_stream_addr* addr=log_msg->stream->addr; + const char* tmp_val=NULL; + cJSON *common_obj=NULL, *per_hit_obj=NULL; + char* log_payload=NULL; + int kafka_status=0; + int send_cnt=0; + int tmp=0; + time_t cur_time; + char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; + char dst_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; + + const char *app_proto[]= {"unkonw","http1", "http2"}; + + const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; + + const char *panggu_action_map[__LG_ACTION_MAX]; + panggu_action_map[LG_ACTION_MONIT]="monitor"; + panggu_action_map[LG_ACTION_REJECT]="deny"; + panggu_action_map[LG_ACTION_WHITELIST]="allow"; + + struct json_spec req_fields[]={ {"http_cookie", TFE_HTTP_COOKIE}, + {"http_referer", TFE_HTTP_REFERER}, + {"http_user_agent", TFE_HTTP_USER_AGENT}, + {"http_request_content_type", TFE_HTTP_CONT_TYPE}, + {"http_request_content_length", TFE_HTTP_CONT_LENGTH}}; + + struct json_spec resp_fields[]={ {"http_response_content_type", TFE_HTTP_CONT_TYPE}, + {"http_response_content_length", TFE_HTTP_CONT_LENGTH}, + {"http_set_cookie", TFE_HTTP_SET_COOKIE}}; + + if (!handle->en_sendlog) + { + return 0; + } + + common_obj=cJSON_CreateObject(); + cur_time = time(NULL); + + cJSON_AddNumberToObject(common_obj, "common_start_time", http->start_time); + cJSON_AddNumberToObject(common_obj, "common_end_time", cur_time); + cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); + cJSON_AddStringToObject(common_obj, "common_schema_type", "HTTP"); + + unsigned int common_direction=0, category_id_val[8]={0}; + char opt_val[24]={0}; uint16_t opt_out_size; + struct tfe_cmsg * cmsg = tfe_stream_get0_cmsg(log_msg->stream); + if (cmsg!=NULL) + { + int ret=tfe_cmsg_get_value(cmsg, TFE_CMSG_STREAM_TRACE_ID, (unsigned char *) opt_val, sizeof(opt_val), &opt_out_size); + if (ret==0) + { + cJSON_AddStringToObject(common_obj, "common_stream_trace_id", opt_val); + } + ret = tfe_cmsg_get_value(cmsg, TFE_CMSG_COMMON_DIRECTION, (unsigned char *)&common_direction, sizeof(common_direction), &opt_out_size); + if (ret==0) + { + cJSON_AddNumberToObject(common_obj, "common_direction", common_direction); //0:域内->域外,1:域外->域内,描述的是CLIENT_IP信息 + } + ret = proxy_log_get_fqdn_cat(cmsg, category_id_val); + if (ret>0) + { + cJSON_AddItemToObject(common_obj, "common_service_category", cJSON_CreateIntArray((const int*)category_id_val, ret)); + } + } + + if (http->req) + { + char *request_line=NULL; + struct tfe_http_req_spec req_spec=http->req->req_spec; + asprintf(&request_line, "%s %s HTTP/%d.%d", http_std_method_to_string(req_spec.method), req_spec.url, http->major_version, http->minor_version); + cJSON_AddStringToObject(common_obj, "http_request_line", request_line); + free(request_line); + } + + if (http->resp) + { + char *response_line=NULL; + struct tfe_http_resp_spec resp_spec=http->resp->resp_spec; + asprintf(&response_line, "HTTP/%d.%d %d OK", http->major_version, http->minor_version, resp_spec.resp_code); + cJSON_AddStringToObject(common_obj, "http_response_line", response_line); + free(response_line); + } + + switch(addr->addrtype) + { + case TFE_ADDR_STREAM_TUPLE4_V4: + cJSON_AddNumberToObject(common_obj, "common_address_type", 4); + inet_ntop(AF_INET, &addr->tuple4_v4->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET, &addr->tuple4_v4->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str); + cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str); + cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v4->source)); + cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v4->dest)); + cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv4_TCP"); + break; + case TFE_ADDR_STREAM_TUPLE4_V6: + cJSON_AddNumberToObject(common_obj, "common_address_type", 6); + inet_ntop(AF_INET6, &addr->tuple4_v6->saddr, src_ip_str, sizeof(src_ip_str)); + inet_ntop(AF_INET6, &addr->tuple4_v6->daddr, dst_ip_str, sizeof(dst_ip_str)); + cJSON_AddStringToObject(common_obj, "common_client_ip", src_ip_str); + cJSON_AddStringToObject(common_obj, "common_server_ip", dst_ip_str); + cJSON_AddNumberToObject(common_obj, "common_client_port", ntohs(addr->tuple4_v6->source)); + cJSON_AddNumberToObject(common_obj, "common_server_port", ntohs(addr->tuple4_v6->dest)); + cJSON_AddStringToObject(common_obj, "common_l4_protocol", "IPv6_TCP"); + break; + default: + break; + } + size_t c2s_byte_num = 0, s2c_byte_num =0; + tfe_stream_info_get(log_msg->stream, INFO_FROM_DOWNSTREAM_RX_OFFSET, &c2s_byte_num, sizeof(c2s_byte_num)); + tfe_stream_info_get(log_msg->stream, INFO_FROM_UPSTREAM_RX_OFFSET, &s2c_byte_num, sizeof(s2c_byte_num)); + + cJSON_AddNumberToObject(common_obj, "common_link_id", 0); + cJSON_AddNumberToObject(common_obj, "common_stream_dir", 3); //1:c2s, 2:s2c, 3:double + cJSON_AddStringToObject(common_obj, "common_sled_ip", handle->kafka_logger->local_ip_str); + cJSON_AddNumberToObject(common_obj, "common_entrance_id", handle->entry_id); + cJSON_AddStringToObject(common_obj, "common_device_id", handle->device_id); + cJSON_AddNumberToObject(common_obj, "common_c2s_byte_num", c2s_byte_num); + cJSON_AddNumberToObject(common_obj, "common_s2c_byte_num", s2c_byte_num); + cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); + cJSON_AddStringToObject(common_obj, "http_host", http->req->req_spec.host); + if(handle->effective_device_tag) + { + cJSON_AddStringToObject(common_obj, "common_device_tag", handle->effective_device_tag); + } + + for(size_t i=0;i<sizeof(req_fields)/sizeof(struct json_spec);i++) + { + tmp_val=tfe_http_std_field_read(http->req, req_fields[i].field_id); + if(tmp_val!=NULL) + { + cJSON_AddStringToObject(common_obj,req_fields[i].log_filed_name, tmp_val); + } + } + for(size_t i=0;i<sizeof(resp_fields)/sizeof(struct json_spec) && http->resp!=NULL;i++) + { + tmp_val=tfe_http_std_field_read(http->resp, resp_fields[i].field_id); + if(tmp_val!=NULL) + { + cJSON_AddStringToObject(common_obj,resp_fields[i].log_filed_name, tmp_val); + } + } + + char log_file_upload_req_path[TFE_STRING_MAX]={0}, cont_type_whole[TFE_STRING_MAX]={0}; + char log_file_upload_resp_path[TFE_STRING_MAX]={0}; + memset(log_file_upload_req_path, 0, sizeof(log_file_upload_req_path)); + memset(log_file_upload_resp_path, 0, sizeof(log_file_upload_resp_path)); + memset(cont_type_whole, 0, sizeof(cont_type_whole)); + + for(size_t i=0; i<log_msg->result_num; i++) + { + if(log_msg->result[i].do_log!=1) + { + continue; + } + + if(handle->en_hoslog!=1) + { + continue; + } + + struct tango_cache_meta_put meta; + char* log_file_key=NULL;; + const char* cont_type_val; + if(log_msg->req_body!=NULL) + { + if(log_file_upload_req_path[0] != '\0') + { + cJSON_AddStringToObject(common_obj, "http_request_body", log_file_upload_req_path); + } + else + { + memset(&meta, 0, sizeof(meta)); + asprintf(&log_file_key, "%s.reqbody", http->req->req_spec.url); + meta.url=log_file_key; + cont_type_val=tfe_http_std_field_read(http->req, TFE_HTTP_CONT_TYPE); + if(cont_type_val!=NULL) + { + snprintf(cont_type_whole, sizeof(cont_type_whole), "Content-Type:%s", cont_type_val); + meta.std_hdr[0]=cont_type_whole; + } + meta.user_log_name=1; + tmp=cache_evbase_upload_once_evbuf(handle->log_file_upload_instance, NULL, + log_msg->req_body, + &meta, + log_file_upload_req_path, sizeof(log_file_upload_req_path)); + if(tmp==0) + { + cJSON_AddStringToObject(common_obj, "http_request_body", log_file_upload_req_path); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); + } + free(log_file_key); + } + } + if(log_msg->resp_body!=NULL) + { + if(log_file_upload_resp_path[0] != '\0') + { + cJSON_AddStringToObject(common_obj, "http_response_body", log_file_upload_resp_path); + } + else + { + memset(&meta, 0, sizeof(meta)); + asprintf(&log_file_key, "%s.respbody", http->req->req_spec.url); + meta.url=log_file_key; + cont_type_val=tfe_http_std_field_read(http->resp, TFE_HTTP_CONT_TYPE); + if(cont_type_val!=NULL) + { + snprintf(cont_type_whole, sizeof(cont_type_whole), "Content-Type:%s", cont_type_val); + meta.std_hdr[0]=cont_type_whole; + } + meta.user_log_name=1; + tmp=cache_evbase_upload_once_evbuf(handle->log_file_upload_instance, NULL, + log_msg->resp_body, + &meta, + log_file_upload_resp_path, sizeof(log_file_upload_resp_path)); + + if(tmp==0) + { + cJSON_AddStringToObject(common_obj, "http_response_body", log_file_upload_resp_path); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); + } + free(log_file_key); + } + } + } + + for(size_t i=0; i<log_msg->result_num; i++) + { + + TFE_LOG_DEBUG(handle->local_logger, "URL: %s, policy_id: %d, service: %d, do_log:%d", + http->req->req_spec.url, + log_msg->result[i].config_id, + log_msg->result[i].service_id, + log_msg->result[i].do_log); + + if(log_msg->result[i].do_log==0) + { + continue; + } + + per_hit_obj=cJSON_Duplicate(common_obj, 1); + cJSON_AddNumberToObject(per_hit_obj, "common_policy_id", log_msg->result[i].config_id); + cJSON_AddNumberToObject(per_hit_obj, "common_service", log_msg->result[i].service_id); + cJSON_AddNumberToObject(per_hit_obj, "common_action", LG_ACTION_MANIPULATE); + if(log_msg->result[i].action == LG_ACTION_MANIPULATE) + { + cJSON_AddStringToObject(per_hit_obj, "common_sub_action", manipulate_action_map[log_msg->action]); + cJSON_AddNumberToObject(per_hit_obj, "http_action_file_size", log_msg->inject_sz); + } + else + { + cJSON_AddStringToObject(per_hit_obj, "common_sub_action", panggu_action_map[(unsigned char)(log_msg->result[i].action)]); + } + if(log_msg->location_client) + { + cJSON_AddStringToObject(per_hit_obj, "common_client_location", log_msg->location_client); + } + if(log_msg->location_server) + { + cJSON_AddStringToObject(per_hit_obj, "common_server_location", log_msg->location_server); + } + + log_payload = cJSON_PrintUnformatted(per_hit_obj); + + TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); + + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); + free(log_payload); + cJSON_Delete(per_hit_obj); + if(kafka_status<0) + { + TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + } + send_cnt++; + } + + cJSON_Delete(common_obj); + return send_cnt; +} |
