summaryrefslogtreecommitdiff
path: root/plugin/business/tsg-http/src
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2023-12-14 15:08:19 +0800
committerfengweihao <[email protected]>2023-12-14 15:08:19 +0800
commitb3700966fd3931763ee8e09aa9a72d87c3ae8d9e (patch)
tree8d764ca7e674cf9887355b4d2c242b5f32ae2e52 /plugin/business/tsg-http/src
parentb801ca9d3b652c9aa494cb3b3fb2e0a5fbe31731 (diff)
TSG-17862 Proxy支持Internal IP address和Exteral IP Address的扫描, 支持以Topic方式上传HTTP请求体/应答体v4.8.51-20231215
Diffstat (limited to 'plugin/business/tsg-http/src')
-rw-r--r--plugin/business/tsg-http/src/tsg_http.cpp158
-rw-r--r--plugin/business/tsg-http/src/tsg_logger.cpp93
2 files changed, 111 insertions, 140 deletions
diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp
index fe61ec2..e2644e2 100644
--- a/plugin/business/tsg-http/src/tsg_http.cpp
+++ b/plugin/business/tsg-http/src/tsg_http.cpp
@@ -75,6 +75,8 @@ enum scan_table
PXY_CTRL_HTTP_RES_HDR,
PXY_CTRL_HTTP_RES_BODY,
PXY_CTRL_APP_ID,
+ PXY_CTRL_INTERNAL_ADDR,
+ PXY_CTRL_EXTERNAL_ADDR,
__SCAN_TABLE_MAX
};
@@ -103,19 +105,9 @@ enum manipulate_profile_table
POLICY_PROFILE_TABLE_INSERT,
POLICY_PROFILE_TABLE_HIJACK,
POLICY_PROFILE_TABLE_LUA,
- POLICY_PROFILE_TABLE_APP_ID,
POLICY_PROFILE_TABLE_MAX
};
-struct app_id_dict
-{
- int ref_cnt;
- int app_id;
- long long int group_id;
-
- pthread_mutex_t lock;
-};
-
struct manipulate_profile
{
int profile_id;
@@ -968,76 +960,6 @@ void ma_profile_table_dup_cb(int table_id, void **to, void **from, long argl, vo
*to=ply_obj;
}
-void app_dict_table_new_cb(const char *table_name, int table_id, const char* key, const char* table_line, void **ad, long argl, void* argp)
-{
- int ret=0;
- size_t offset=0, len=0;
- char *app_id_str=NULL, *group_id_str=NULL;
- struct app_id_dict *app_dict=ALLOC(struct app_id_dict, 1);
-
- ret = maat_helper_read_column(table_line, 1, &offset, &len);
- if(ret >= 0)
- {
- app_id_str=ALLOC(char, len+1);
- memcpy(app_id_str, table_line+offset, len);
- app_dict->app_id=atoi(app_id_str);
- FREE(&app_id_str);
- }
-
- ret = maat_helper_read_column(table_line, 18, &offset, &len);
- if(ret >= 0)
- {
- group_id_str=ALLOC(char, len+1);
- memcpy(group_id_str, table_line+offset, len);
- app_dict->group_id=atoll(group_id_str);
- FREE(&group_id_str);
- }
-
- app_dict->ref_cnt=1;
- pthread_mutex_init(&(app_dict->lock), NULL);
- *ad=app_dict;
- return;
-}
-
-void app_dict_table_free_cb(int table_id, void **ad, long argl, void* argp)
-{
- if(*ad==NULL)
- {
- return;
- }
-
- struct app_id_dict *app_dict=(struct app_id_dict *)(*ad);
- pthread_mutex_lock(&(app_dict->lock));
- app_dict->ref_cnt--;
- if(app_dict->ref_cnt>0)
- {
- pthread_mutex_unlock(&(app_dict->lock));
- return;
- }
- pthread_mutex_unlock(&(app_dict->lock));
- pthread_mutex_destroy(&(app_dict->lock));
-
- FREE(&app_dict);
- *ad=NULL;
- return;
-}
-
-void app_id_dict_free(struct app_id_dict *app_dict)
-{
- app_dict_table_free_cb(0, (void **)&app_dict, 0, NULL);
-}
-
-void app_dict_table_dup_cb(int table_id, void **to, void **from, long argl, void* argp)
-{
- struct app_id_dict *app_dict=(struct app_id_dict *)(*from);
- pthread_mutex_lock(&(app_dict->lock));
- app_dict->ref_cnt++;
- pthread_mutex_unlock(&(app_dict->lock));
- *to=app_dict;
-
- return;
-}
-
int maat_table_init(const char* table_name,
maat_start_callback_t *start, maat_update_callback_t *update, maat_finish_callback_t *finish,
void *u_para)
@@ -1107,6 +1029,9 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons
table_name[PXY_CTRL_HTTP_RES_HDR] = "ATTR_HTTP_RES_HDR";
table_name[PXY_CTRL_HTTP_RES_BODY] = "ATTR_HTTP_RES_BODY";
table_name[PXY_CTRL_APP_ID] = "ATTR_APP_ID";
+ table_name[PXY_CTRL_INTERNAL_ADDR] = "ATTR_INTERNAL_ADDR";
+ table_name[PXY_CTRL_EXTERNAL_ADDR] = "ATTR_EXTERNAL_ADDR";
+
for (int i = 0; i < __SCAN_TABLE_MAX; i++)
{
g_proxy_rt->scan_table_id[i] = maat_get_table_id(g_proxy_rt->feather, table_name[i]);
@@ -1124,13 +1049,6 @@ int proxy_policy_init(const char* profile_path, const char* static_section, cons
policy_action_param_dup,
0, NULL);
- g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID]=maat_get_table_id(g_proxy_rt->feather, "APP_ID_DICT");
- maat_plugin_table_ex_schema_register(g_proxy_rt->feather, "APP_ID_DICT",
- app_dict_table_new_cb,
- app_dict_table_free_cb,
- app_dict_table_dup_cb,
- 0, NULL);
-
ret = maat_table_init("PXY_PROFILE_TRUSTED_CA_CERT",
trusted_CA_update_start_cb,
trusted_CA_update_cert_cb,
@@ -2783,6 +2701,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht
{
hit_cnt += n_hit_result;
}
+ scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN],
+ result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid);
+ if (scan_ret == MAAT_SCAN_HIT)
+ {
+ hit_cnt += n_hit_result;
+ }
scan_ret = tfe_scan_fqdn_cat(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_FQDN_CAT]);
if (scan_ret > 0)
{
@@ -2792,7 +2716,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht
const char * str_url = session->req->req_spec.url;
int str_url_length = (int) (strlen(session->req->req_spec.url));
-
scan_ret = maat_scan_string(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL],
str_url, str_url_length, result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid);
@@ -2800,6 +2723,12 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht
{
hit_cnt += n_hit_result;
}
+ scan_ret = maat_scan_not_logic(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_HTTP_URL],
+ result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid);
+ if (scan_ret == MAAT_SCAN_HIT)
+ {
+ hit_cnt += n_hit_result;
+ }
}
if ((events & EV_HTTP_REQ_HDR) || (events & EV_HTTP_RESP_HDR))
@@ -2817,7 +2746,6 @@ enum proxy_action http_scan(const struct tfe_http_session * session, enum tfe_ht
const char * str_field_name = http_field_name_to_string(&field_name);
scan_ret = maat_state_set_scan_district(ctx->scan_mid, table_id, str_field_name, strlen(str_field_name));
-
assert(scan_ret == 0);
scan_ret = maat_scan_string(g_proxy_rt->feather, table_id, field_val, strlen(field_val),
result + hit_cnt, MAX_SCAN_RESULT - hit_cnt, &n_hit_result, ctx->scan_mid);
@@ -3190,8 +3118,7 @@ void cache_write(const struct tfe_http_session * session, enum tfe_http_event ev
}
}
-void proxy_on_http_begin(const struct tfe_stream * stream,
- const struct tfe_http_session * session, unsigned int thread_id, void ** pme)
+void proxy_on_http_begin(const struct tfe_stream *stream, const struct tfe_http_session *session, unsigned int thread_id, void **pme)
{
if (!g_proxy_rt->enable_plugin)
{
@@ -3205,73 +3132,58 @@ void proxy_on_http_begin(const struct tfe_stream * stream,
ATOMIC_INC(&(g_proxy_rt->stat_val[STAT_SESSION]));
ctx = proxy_http_ctx_new(thread_id);
long long *result = ctx->result;
- size_t n_hit_result=0;
scan_ret = tfe_scan_subscribe_id(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger);
if(scan_ret>0)
{
hit_cnt+=scan_ret;
}
-
scan_ret = tfe_scan_ip_location(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.location_server), &(ctx->ip_ctx.location_client));
if(scan_ret>0)
{
hit_cnt+=scan_ret;
}
+
scan_ret = tfe_scan_ip_asn(stream, result, ctx->scan_mid, hit_cnt, g_proxy_rt->local_logger, &(ctx->ip_ctx.asn_server), &(ctx->ip_ctx.asn_client));
if(scan_ret>0)
{
hit_cnt+=scan_ret;
}
-
long long app_id=67;
- struct app_id_dict *app_dict = (struct app_id_dict*)maat_plugin_table_get_ex_data(g_proxy_rt->feather, g_proxy_rt->plolicy_table_id[POLICY_PROFILE_TABLE_APP_ID], (const char *)&app_id, sizeof(long long));
- if(app_dict!=NULL)
+ scan_ret = tfe_scan_app_id(result, ctx->scan_mid, hit_cnt, app_id, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID]);
+ if(scan_ret > 0)
{
- scan_ret = maat_scan_group(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_APP_ID], &app_dict->group_id, 1, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt, &n_hit_result, ctx->scan_mid);
- if(scan_ret==MAAT_SCAN_HIT)
- {
- hit_cnt+=n_hit_result;
- }
- app_id_dict_free(app_dict);
+ hit_cnt += scan_ret;
}
addr_tfe2sapp(stream->addr, &sapp_addr);
if (sapp_addr.addrtype == ADDR_TYPE_IPV4)
{
- scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR],
- sapp_addr.v4->saddr, sapp_addr.v4->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt,
- &n_hit_result, ctx->scan_mid);
- if (scan_ret == MAAT_SCAN_HIT)
+ scan_ret = tfe_scan_ipv4_addr(result, ctx->scan_mid, hit_cnt, sapp_addr);
+ if (scan_ret > 0)
{
- hit_cnt += n_hit_result;
+ hit_cnt += scan_ret;
}
- scan_ret = maat_scan_ipv4(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR],
- sapp_addr.v4->daddr, sapp_addr.v4->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt,
- &n_hit_result, ctx->scan_mid);
-
- if(scan_ret == MAAT_SCAN_HIT)
+ scan_ret = tfe_scan_ipv4_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr);
+ if (scan_ret > 0)
{
- hit_cnt += n_hit_result;
+ hit_cnt += scan_ret;
}
}
if (sapp_addr.addrtype == ADDR_TYPE_IPV6)
{
- scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_SOURCE_ADDR],
- sapp_addr.v6->saddr, sapp_addr.v6->source, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt,
- &n_hit_result, ctx->scan_mid);
- if (scan_ret == MAAT_SCAN_HIT)
+ scan_ret = tfe_scan_ipv6_addr(result, ctx->scan_mid, hit_cnt, sapp_addr);
+ if (scan_ret > 0)
{
- hit_cnt += n_hit_result;
+ hit_cnt += scan_ret;
}
- scan_ret = maat_scan_ipv6(g_proxy_rt->feather, g_proxy_rt->scan_table_id[PXY_CTRL_DESTINATION_ADDR],
- sapp_addr.v6->daddr, sapp_addr.v6->dest, 6, result+hit_cnt, MAX_SCAN_RESULT-hit_cnt,
- &n_hit_result, ctx->scan_mid);
- if (scan_ret == MAAT_SCAN_HIT)
+ scan_ret = tfe_scan_ipv6_internal_addr(stream, result, ctx->scan_mid, hit_cnt, sapp_addr);
+ if (scan_ret > 0)
{
- hit_cnt += n_hit_result;
+ hit_cnt += scan_ret;
}
}
+
if(hit_cnt > 0)
{
ctx->hit_cnt = hit_cnt;
diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp
index 8114e0f..a5e8b26 100644
--- a/plugin/business/tsg-http/src/tsg_logger.cpp
+++ b/plugin/business/tsg-http/src/tsg_logger.cpp
@@ -5,6 +5,7 @@
#include <tfe_utils.h>
#include <tfe_resource.h>
+#include "mpack.h"
#include "tsg_proxy_logger.h"
struct json_spec
@@ -55,7 +56,57 @@ void get_http_body_uuid(char *uuid)
return;
}
-struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger)
+size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body)
+{
+ int kafka_status=0;
+ mpack_writer_t writer;
+ char *mpack_data=NULL, *data=NULL;
+ size_t mpack_size=0, datalen=0;
+
+ mpack_writer_init_growable(&writer, &mpack_data, &mpack_size);
+ mpack_build_map(&writer);
+
+ mpack_write_cstr(&writer, "uuid");
+ mpack_write_cstr(&writer, uuid);
+ mpack_write_cstr(&writer, "fileType");
+ mpack_write_cstr(&writer, "txt");
+ mpack_write_cstr(&writer, "combineMode");
+ mpack_write_cstr(&writer, "seek");
+ mpack_write_cstr(&writer, "offset");
+ mpack_write_u64(&writer, 0);
+ mpack_write_cstr(&writer, "lastChunkFlag");
+ mpack_write_u32(&writer, 1);
+ datalen = evbuffer_get_length(http_body);
+ if(datalen > 0)
+ {
+ data = (char *)evbuffer_pullup(http_body, datalen);
+ mpack_write_cstr(&writer, "chunk");
+ mpack_start_bin(&writer, datalen);
+ mpack_write_bytes(&writer, (const char *)data, datalen);
+ mpack_finish_bin(&writer);
+ }
+ mpack_write_cstr(&writer, "length");
+ mpack_write_u64(&writer, datalen);
+ mpack_complete_map(&writer); // mpack_init_map
+ mpack_error_t errorno=mpack_writer_destroy(&writer);
+ if(errorno!=mpack_ok)
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid);
+ }
+ kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size);
+ if(kafka_status<0)
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error()));
+ }
+
+ free(mpack_data);
+ mpack_data = NULL;
+ mpack_size = 0;
+
+ return datalen;
+}
+
+struct proxy_logger* proxy_log_handle_create(const char* profile, const char* section, void* local_logger)
{
struct tango_cache_parameter *log_file_upload_para=NULL;
struct proxy_logger* instance=ALLOC(struct proxy_logger,1);
@@ -157,12 +208,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
common_obj=cJSON_CreateObject();
gettimeofday(&cur_time, NULL);
-
cJSON_AddNumberToObject(common_obj, "start_timestamp_ms", get_time_ms(http->start_time));
cJSON_AddNumberToObject(common_obj, "end_timestamp_ms", get_time_ms(cur_time));
- cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]);
- cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP");
-
unsigned int category_id_val[64]={0};
char opt_val[24]={0}; uint16_t opt_out_size;
@@ -236,6 +283,8 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
s2c_byte_num = log_msg->s2c_byte_num;
}
+ cJSON_AddStringToObject(common_obj, "http_version", app_proto[http->major_version]);
+ cJSON_AddStringToObject(common_obj, "decoded_as", "HTTP");
cJSON_AddNumberToObject(common_obj, "out_link_id", 0);
cJSON_AddNumberToObject(common_obj, "in_link_id", 0);
cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str);
@@ -271,18 +320,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
#define FILE_CHUNK_UUID_LEN 40
char uuid[FILE_CHUNK_UUID_LEN]={0};
+ size_t datalen=0;
for(size_t i=0; i<log_msg->result_num; i++)
{
- if(log_msg->result[i].do_log!=1)
- {
- continue;
- }
-
- if(handle->en_hoslog!=1)
- {
- continue;
- }
+ if(log_msg->result[i].do_log!=1) continue;
+ if(handle->en_hoslog!=1) continue;
if(log_msg->req_body!=NULL)
{
@@ -293,7 +336,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
else
{
get_http_body_uuid(uuid);
- cJSON_AddStringToObject(common_obj, "http_request_body", uuid);
+ datalen=file_bucket_upload_once(handle, uuid, log_msg->req_body);
+ if(datalen>0)
+ {
+ cJSON_AddStringToObject(common_obj, "http_request_body", uuid);
+ }
+ else
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Upload req_body failed.");
+ }
}
}
if(log_msg->resp_body!=NULL)
@@ -305,7 +356,15 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
else
{
get_http_body_uuid(uuid);
- cJSON_AddStringToObject(common_obj, "http_response_body", uuid);
+ datalen=file_bucket_upload_once(handle, uuid, log_msg->resp_body);
+ if(datalen>0)
+ {
+ cJSON_AddStringToObject(common_obj, "http_response_body", uuid);
+ }
+ else
+ {
+ TFE_LOG_ERROR(handle->local_logger, "Upload resp_body failed.");
+ }
}
}
}
@@ -361,7 +420,7 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg)
TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload);
- kafka_status = tfe_kafka_logger_send(handle->kafka_logger, log_payload, strlen(log_payload));
+ kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload));
free(log_payload);
cJSON_Delete(per_hit_obj);
if(kafka_status<0)