summaryrefslogtreecommitdiff
path: root/plugin/business/tsg-http/src/tsg_logger.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/business/tsg-http/src/tsg_logger.cpp')
-rw-r--r--plugin/business/tsg-http/src/tsg_logger.cpp93
1 files changed, 76 insertions, 17 deletions
diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp
index 8114e0f..a5e8b26 100644
--- a/plugin/business/tsg-http/src/tsg_logger.cpp
+++ b/plugin/business/tsg-http/src/tsg_logger.cpp
@@ -5,6 +5,7 @@
#include <tfe_utils.h>
#include <tfe_resource.h>
+#include "mpack.h"
#include "tsg_proxy_logger.h"
struct json_spec
@@ -55,7 +56,57 @@ void get_http_body_uuid(char *uuid)
return;
}
-struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger)
+size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body)
+{
+ int kafka_status=0;
+ mpack_writer_t writer;
+ char *mpack_data=NULL, *data=NULL;
+ size_t mpack_size=0, datalen=0;
+
+ mpack_writer_init_growable(&writer, &mpack_data, &mpack_size);
+ mpack_build_map(&writer);
+
+ mpack_write_cstr(&writer, "uuid");
+ mpack_write_cstr(&writer, uuid);
+ mpack_write_cstr(&writer, "fileType");
+ mpack_write_cstr(&writer, "txt");
+ mpack_write_cstr(&writer, "combineMode");
+ mpack_write_cstr(&writer, "seek");
+ mpack_write_cstr(&writer, "offset");
+ mpack_write_u64(&writer, 0);
+ mpack_write_cstr(&writer, "lastChunkFlag");
+ mpack_write_u32(&writer, 1);
+ datalen = evbuffer_get_length(http_body);
+ if(datalen > 0)
+ {
+ data = (char *)evbuffer_pullup(http_body, datalen);
+ mpack_write_cstr(&writer, "chunk");
+ mpack_start_bin(&writer, datalen);
+ mpack_write_bytes(&writer, (const char *)data, datalen);
+ mpack_finish_bin(&writer);
+ }
+ mpack_write_cstr(&writer, "length");
+ mpack_write_u64(&writer, datalen);
+ mpack_complete_map(&writer); // mpack_init_map
+ mpack_error_t errorno=mpack_writer_destroy(&writer);
+ if(errorno!=mpack_ok)
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid);
+ }
+ kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size);
+ if(kafka_status<0)
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
+ }
+
+ free(mpack_data);
+ mpack_data = NULL;
+ mpack_size = 0;
+
+ return datalen;
+}
+
+struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger)
{
struct tango_cache_parameter *log_file_upload_para=NULL;
struct proxy_logger* instance=ALLOC(struct proxy_logger,1);
@@ -157,12 +208,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
common_obj=cJSON_CreateObject();
gettimeofday(&cur_time, NULL);
-
cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time));
cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time));
- cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]);
- cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP");
-
unsigned int category_id_val[64]={0};
char opt_val[24]={0}; uint16_t opt_out_size;
@@ -236,6 +283,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
s2c_byte_num = log_msg->s2c_byte_num;
}
+ cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]);
+ cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP");
cJSON_AddNumberToObject(common_obj, "out_link_id", 0);
cJSON_AddNumberToObject(common_obj, "in_link_id", 0);
cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str);
@@ -271,18 +320,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
#define FILE_CHUNK_UUID_LEN 40
char uuid[FILE_CHUNK_UUID_LEN]={0};
+ size_t datalen=0;
for(size_t i=0; i<log_msg->result_num; i++)
{
- if(log_msg->result[i].do_log!=1)
- {
- continue;
- }
-
- if(handle->en_hoslog!=1)
- {
- continue;
- }
+ if(log_msg->result[i].do_log!=1) continue;
+ if(handle->en_hoslog!=1) continue;
if(log_msg->req_body!=NULL)
{
@@ -293,7 +336,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
else
{
get_http_body_uuid(uuid);
- cJSON_AddStringToObject(common_obj, "http_request_body", uuid);
+ datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body);
+ if(datalen>0)
+ {
+ cJSON_AddStringToObject(common_obj, "http_request_body", uuid);
+ }
+ else
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed.");
+ }
}
}
if(log_msg->resp_body!=NULL)
@@ -305,7 +356,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
else
{
get_http_body_uuid(uuid);
- cJSON_AddStringToObject(common_obj, "http_response_body", uuid);
+ datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body);
+ if(datalen>0)
+ {
+ cJSON_AddStringToObject(common_obj, "http_response_body", uuid);
+ }
+ else
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed.");
+ }
}
}
}
@@ -361,7 +420,7 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload);
- kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload));
+ kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload));
free(log_payload);
cJSON_Delete(per_hit_obj);
if(kafka_status<0)