diff options
| author | fengweihao <[email protected]> | 2023-12-14 15:08:19 +0800 |
|---|---|---|
| committer | fengweihao <[email protected]> | 2023-12-14 15:08:19 +0800 |
| commit | b3700966fd3931763ee8e09aa9a72d87c3ae8d9e (patch) | |
| tree | 8d764ca7e674cf9887355b4d2c242b5f32ae2e52 /plugin/business/tsg-http/src/tsg_logger.cpp | |
| parent | b801ca9d3b652c9aa494cb3b3fb2e0a5fbe31731 (diff) | |
TSG-17862 Proxy支持Internal IP address和Exteral IP Address的扫描, 支持以Topic方式上传HTTP请求体/应答体v4.8.51-20231215
Diffstat (limited to 'plugin/business/tsg-http/src/tsg_logger.cpp')
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_logger.cpp | 93 |
1 files changed, 76 insertions, 17 deletions
diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp index 8114e0f..a5e8b26 100644 --- a/plugin/business/tsg-http/src/tsg_logger.cpp +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -5,6 +5,7 @@ #include <tfe_utils.h> #include <tfe_resource.h> +#include "mpack.h" #include "tsg_proxy_logger.h" struct json_spec @@ -55,7 +56,57 @@ void get_http_body_uuid(char *uuid) return; } -struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) +size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) +{ + int kafka_status=0; + mpack_writer_t writer; + char *mpack_data=NULL, *data=NULL; + size_t mpack_size=0, datalen=0; + + mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "uuid"); + mpack_write_cstr(&writer, uuid); + mpack_write_cstr(&writer, "fileType"); + mpack_write_cstr(&writer, "txt"); + mpack_write_cstr(&writer, "combineMode"); + mpack_write_cstr(&writer, "seek"); + mpack_write_cstr(&writer, "offset"); + mpack_write_u64(&writer, 0); + mpack_write_cstr(&writer, "lastChunkFlag"); + mpack_write_u32(&writer, 1); + datalen = evbuffer_get_length(http_body); + if(datalen > 0) + { + data = (char *)evbuffer_pullup(http_body, datalen); + mpack_write_cstr(&writer, "chunk"); + mpack_start_bin(&writer, datalen); + mpack_write_bytes(&writer, (const char *)data, datalen); + mpack_finish_bin(&writer); + } + mpack_write_cstr(&writer, "length"); + mpack_write_u64(&writer, datalen); + mpack_complete_map(&writer); // mpack_init_map + mpack_error_t errorno=mpack_writer_destroy(&writer); + if(errorno!=mpack_ok) + { + TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); + } + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size); + if(kafka_status<0) + { + TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + } + + free(mpack_data); + mpack_data = NULL; + mpack_size = 0; + + return datalen; +} + +struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct tango_cache_parameter *log_file_upload_para=NULL; struct proxy_logger* instance=ALLOC(struct proxy_logger,1); @@ -157,12 +208,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); - cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); - cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); - cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); - unsigned int category_id_val[64]={0}; char opt_val[24]={0}; uint16_t opt_out_size; @@ -236,6 +283,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) s2c_byte_num = log_msg->s2c_byte_num; } + cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); + cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); @@ -271,18 +320,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) #define FILE_CHUNK_UUID_LEN 40 char uuid[FILE_CHUNK_UUID_LEN]={0}; + size_t datalen=0; for(size_t i=0; i<log_msg->result_num; i++) { - if(log_msg->result[i].do_log!=1) - { - continue; - } - - if(handle->en_hoslog!=1) - { - continue; - } + if(log_msg->result[i].do_log!=1) continue; + if(handle->en_hoslog!=1) continue; if(log_msg->req_body!=NULL) { @@ -293,7 +336,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); + } } } if(log_msg->resp_body!=NULL) @@ -305,7 +356,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); + } } } } @@ -361,7 +420,7 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) |
