diff options
Diffstat (limited to 'plugin/business/tsg-http/src')
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_http.cpp | 158 | ||||
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_logger.cpp | 93 |
2 files changed, 111 insertions, 140 deletions
diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index fe61ec2..e2644e2 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -75,6 +75,8 @@ enum scan_table PXY_CTRL_HTTP_RES_HDR, PXY_CTRL_HTTP_RES_BODY, PXY_CTRL_APP_ID, + PXY_CTRL_INTERNAL_ADDR, + PXY_CTRL_EXTERNAL_ADDR, __SCAN_TABLE_MAX }; @@ -103,19 +105,9 @@ enum manipulate_profile_table POLICY_PROFILE_TABLE_INSERT, POLICY_PROFILE_TABLE_HIJACK, POLICY_PROFILE_TABLE_LUA, - POLICY_PROFILE_TABLE_APP_ID, POLICY_PROFILE_TABLE_MAX }; -struct app_id_dict -{ - int ref_cnt; - int app_id; - long long int group_id; - - pthread_mutex_t lock; -}; - struct manipulate_profile { int profile_id; @@ -968,76 +960,6 @@ void ma_profile_table_dup_cb(int table_id, void **to, void **from, long argl, vo *to=ply_obj; } -void app_dict_table_new_cb(const char *table_name, int table_id, const char* key, const char* table_line, void **ad, long argl, void* argp) -{ - int ret=0; - size_t offset=0, len=0; - char *app_id_str=NULL, *group_id_str=NULL; - struct app_id_dict *app_dict=ALLOC(struct app_id_dict, 1); - - ret = maat_helper_read_column(table_line, 1, &offset, &len); - if(ret >= 0) - { - app_id_str=ALLOC(char, len+1); - memcpy(app_id_str, table_line+offset, len); - app_dict->app_id=atoi(app_id_str); - FREE(&app_id_str); - } - - ret = maat_helper_read_column(table_line, 18, &offset, &len); - if(ret >= 0) - { - group_id_str=ALLOC(char, len+1); - memcpy(group_id_str, table_line+offset, len); - app_dict->group_id=atoll(group_id_str); - FREE(&group_id_str); - } - - app_dict->ref_cnt=1; - pthread_mutex_init(&(app_dict->lock), NULL); - *ad=app_dict; - return; -} - -void app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp) -{ - if(*ad==NULL) - { - return; - } - - struct app_id_dict *app_dict=(struct app_id_dict *)(*ad); - pthread_mutex_lock(&(app_dict->lock)); - app_dict->ref_cnt--; - if(app_dict->ref_cnt>0) - { - pthread_mutex_unlock(&(app_dict->lock)); - return; - } - pthread_mutex_unlock(&(app_dict->lock)); - pthread_mutex_destroy(&(app_dict->lock)); - - FREE(&app_dict); - *ad=NULL; - return; -} - -void app_id_dict_free(struct app_id_dict *app_dict) -{ - app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL); -} - -void app_dict_table_dup_cb(int table_id, void **to, void **from, long argl, void* argp) -{ - struct app_id_dict *app_dict=(struct app_id_dict *)(*from); - pthread_mutex_lock(&(app_dict->lock)); - app_dict->ref_cnt++; - pthread_mutex_unlock(&(app_dict->lock)); - *to=app_dict; - - return; -} - int maat_table_init(const char* table_name, maat_start_callback_t *start, maat_update_callback_t *update, maat_finish_callback_t *finish, void *u_para) @@ -1107,6 +1029,9 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons table_name[PXY_CTRL_HTTP_RES_HDR] = "ATTR_HTTP_RES_HDR"; table_name[PXY_CTRL_HTTP_RES_BODY] = "ATTR_HTTP_RES_BODY"; table_name[PXY_CTRL_APP_ID] = "ATTR_APP_ID"; + table_name[PXY_CTRL_INTERNAL_ADDR] = "ATTR_INTERNAL_ADDR"; + table_name[PXY_CTRL_EXTERNAL_ADDR] = "ATTR_EXTERNAL_ADDR"; + for (int i = 0; i < __SCAN_TABLE_MAX; i++) { g_proxy_rt->scan_table_id[i] = maat_get_table_id(g_proxy_rt->feather, table_name[i]); @@ -1124,13 +1049,6 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons policy_action_param_dup, 0, NULL); - g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID]=maat_get_table_id(g_proxy_rt->feather, "APP_ID_DICT"); - maat_plugin_table_ex_schema_register(g_proxy_rt->feather, "APP_ID_DICT", - app_dict_table_new_cb, - app_dict_table_free_cb, - app_dict_table_dup_cb, - 0, NULL); - ret = maat_table_init("PXY_PROFILE_TRUSTED_CA_CERT", trusted_CA_update_start_cb, trusted_CA_update_cert_cb, @@ -2783,6 +2701,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht { hit_cnt += n_hit_result; } + scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN], + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } scan_ret = tfe_scan_fqdn_cat(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN_CAT]); if (scan_ret > 0) { @@ -2792,7 +2716,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht const char * str_url = session->req->req_spec.url; int str_url_length = (int) (strlen(session->req->req_spec.url)); - scan_ret = maat_scan_string(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL], str_url, str_url_length, result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); @@ -2800,6 +2723,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht { hit_cnt += n_hit_result; } + scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL], + result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); + if (scan_ret == MAAT_SCAN_HIT) + { + hit_cnt += n_hit_result; + } } if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR)) @@ -2817,7 +2746,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht const char * str_field_name = http_field_name_to_string(&field_name); scan_ret = maat_state_set_scan_district(ctx->scan_mid, table_id, str_field_name, strlen(str_field_name)); - assert(scan_ret == 0); scan_ret = maat_scan_string(g_proxy_rt->feather, table_id, field_val, strlen(field_val), result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid); @@ -3190,8 +3118,7 @@ void cache_write(const struct tfe_http_session * session, enum tfe_http_event ev } } -void proxy_on_http_begin(const struct tfe_stream * stream, - const struct tfe_http_session * session, unsigned int thread_id, void ** pme) +void proxy_on_http_begin(const struct tfe_stream *stream, const struct tfe_http_session *session, unsigned int thread_id, void **pme) { if (!g_proxy_rt->enable_plugin) { @@ -3205,73 +3132,58 @@ void proxy_on_http_begin(const struct tfe_stream * stream, ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_SESSION])); ctx = proxy_http_ctx_new(thread_id); long long *result = ctx->result; - size_t n_hit_result=0; scan_ret = tfe_scan_subscribe_id(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger); if(scan_ret>0) { hit_cnt+=scan_ret; } - scan_ret = tfe_scan_ip_location(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.location_server), &(ctx->ip_ctx.location_client)); if(scan_ret>0) { hit_cnt+=scan_ret; } + scan_ret = tfe_scan_ip_asn(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.asn_server), &(ctx->ip_ctx.asn_client)); if(scan_ret>0) { hit_cnt+=scan_ret; } - long long app_id=67; - struct app_id_dict *app_dict = (struct app_id_dict*)maat_plugin_table_get_ex_data(g_proxy_rt->feather, g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID], (const char *)&app_id, sizeof(long long)); - if(app_dict!=NULL) + scan_ret = tfe_scan_app_id(result, ctx->scan_mid, hit_cnt, app_id, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID]); + if(scan_ret > 0) { - scan_ret = maat_scan_group(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID], &app_dict->group_id, 1, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, &n_hit_result, ctx->scan_mid); - if(scan_ret==MAAT_SCAN_HIT) - { - hit_cnt+=n_hit_result; - } - app_id_dict_free(app_dict); + hit_cnt += scan_ret; } addr_tfe2sapp(stream->addr, &sapp_addr); if (sapp_addr.addrtype == ADDR_TYPE_IPV4) { - scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR], - sapp_addr.v4->saddr, sapp_addr.v4->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR], - sapp_addr.v4->daddr, sapp_addr.v4->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - - if(scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv4_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } if (sapp_addr.addrtype == ADDR_TYPE_IPV6) { - scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR], - sapp_addr.v6->saddr, sapp_addr.v6->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_addr(result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } - scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR], - sapp_addr.v6->daddr, sapp_addr.v6->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, - &n_hit_result, ctx->scan_mid); - if (scan_ret == MAAT_SCAN_HIT) + scan_ret = tfe_scan_ipv6_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr); + if (scan_ret > 0) { - hit_cnt += n_hit_result; + hit_cnt += scan_ret; } } + if(hit_cnt > 0) { ctx->hit_cnt = hit_cnt; diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp index 8114e0f..a5e8b26 100644 --- a/plugin/business/tsg-http/src/tsg_logger.cpp +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -5,6 +5,7 @@ #include <tfe_utils.h> #include <tfe_resource.h> +#include "mpack.h" #include "tsg_proxy_logger.h" struct json_spec @@ -55,7 +56,57 @@ void get_http_body_uuid(char *uuid) return; } -struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) +size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) +{ + int kafka_status=0; + mpack_writer_t writer; + char *mpack_data=NULL, *data=NULL; + size_t mpack_size=0, datalen=0; + + mpack_writer_init_growable(&writer, &mpack_data, &mpack_size); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "uuid"); + mpack_write_cstr(&writer, uuid); + mpack_write_cstr(&writer, "fileType"); + mpack_write_cstr(&writer, "txt"); + mpack_write_cstr(&writer, "combineMode"); + mpack_write_cstr(&writer, "seek"); + mpack_write_cstr(&writer, "offset"); + mpack_write_u64(&writer, 0); + mpack_write_cstr(&writer, "lastChunkFlag"); + mpack_write_u32(&writer, 1); + datalen = evbuffer_get_length(http_body); + if(datalen > 0) + { + data = (char *)evbuffer_pullup(http_body, datalen); + mpack_write_cstr(&writer, "chunk"); + mpack_start_bin(&writer, datalen); + mpack_write_bytes(&writer, (const char *)data, datalen); + mpack_finish_bin(&writer); + } + mpack_write_cstr(&writer, "length"); + mpack_write_u64(&writer, datalen); + mpack_complete_map(&writer); // mpack_init_map + mpack_error_t errorno=mpack_writer_destroy(&writer); + if(errorno!=mpack_ok) + { + TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); + } + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size); + if(kafka_status<0) + { + TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + } + + free(mpack_data); + mpack_data = NULL; + mpack_size = 0; + + return datalen; +} + +struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger) { struct tango_cache_parameter *log_file_upload_para=NULL; struct proxy_logger* instance=ALLOC(struct proxy_logger,1); @@ -157,12 +208,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) common_obj=cJSON_CreateObject(); gettimeofday(&cur_time, NULL); - cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time)); cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time)); - cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); - cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); - unsigned int category_id_val[64]={0}; char opt_val[24]={0}; uint16_t opt_out_size; @@ -236,6 +283,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) s2c_byte_num = log_msg->s2c_byte_num; } + cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]); + cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); @@ -271,18 +320,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) #define FILE_CHUNK_UUID_LEN 40 char uuid[FILE_CHUNK_UUID_LEN]={0}; + size_t datalen=0; for(size_t i=0; i<log_msg->result_num; i++) { - if(log_msg->result[i].do_log!=1) - { - continue; - } - - if(handle->en_hoslog!=1) - { - continue; - } + if(log_msg->result[i].do_log!=1) continue; + if(handle->en_hoslog!=1) continue; if(log_msg->req_body!=NULL) { @@ -293,7 +336,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_request_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed."); + } } } if(log_msg->resp_body!=NULL) @@ -305,7 +356,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) else { get_http_body_uuid(uuid); - cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body); + if(datalen>0) + { + cJSON_AddStringToObject(common_obj, "http_response_body", uuid); + } + else + { + TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed."); + } } } } @@ -361,7 +420,7 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload)); + kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); free(log_payload); cJSON_Delete(per_hit_obj); if(kafka_status<0) |
