diff options
| author | luwenpeng <[email protected]> | 2024-07-19 18:20:04 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-07-19 18:57:57 +0800 |
| commit | 2045d517cabbf7559e18367fa33b3a170143ef79 (patch) | |
| tree | 15c70ad522c455c6b51622181442e12cea75f3e7 /plugin | |
| parent | 88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff) | |
featureļ¼ TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'plugin')
| -rw-r--r-- | plugin/business/chaining-policy/src/chaining_policy.cpp | 2 | ||||
| -rw-r--r-- | plugin/business/doh/src/doh.cpp | 4 | ||||
| -rw-r--r-- | plugin/business/doh/src/logger.cpp | 33 | ||||
| -rw-r--r-- | plugin/business/doh/src/pub.h | 3 | ||||
| -rw-r--r-- | plugin/business/ssl-policy/src/ssl_policy.cpp | 2 | ||||
| -rw-r--r-- | plugin/business/tcp-policy/src/tcp_policy.cpp | 2 | ||||
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_http.cpp | 4 | ||||
| -rw-r--r-- | plugin/business/tsg-http/src/tsg_logger.cpp | 52 |
8 files changed, 27 insertions, 75 deletions
diff --git a/plugin/business/chaining-policy/src/chaining_policy.cpp b/plugin/business/chaining-policy/src/chaining_policy.cpp index ffd3852..1e10910 100644 --- a/plugin/business/chaining-policy/src/chaining_policy.cpp +++ b/plugin/business/chaining-policy/src/chaining_policy.cpp @@ -144,7 +144,7 @@ struct chaining_policy_enforcer *chaining_policy_enforcer_create(void *logger) { int ret = 0; struct chaining_policy_enforcer *enforcer = ALLOC(struct chaining_policy_enforcer, 1); - enforcer->maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + enforcer->maat = tfe_get_maat_handle(); enforcer->logger = logger; enforcer->table_id = maat_get_table_id(enforcer->maat, "SERVICE_CHAINING_COMPILE"); if (enforcer->table_id < 0) diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index 4103f69..15032b4 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -385,7 +385,7 @@ static void doh_maat_scan(const struct tfe_stream *stream, const struct tfe_http static int doh_maat_init(const char *profile, const char *section) { - g_doh_conf->maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + g_doh_conf->maat = tfe_get_maat_handle(); MESA_load_profile_string_def(profile, section, "table_appid", g_doh_conf->tables[TYPE_APPID].name, TFE_STRING_MAX, "ATTR_APP_ID"); MESA_load_profile_string_def(profile, section, "table_qname", g_doh_conf->tables[TYPE_QNAME].name, TFE_STRING_MAX, "ATTR_DOH_QNAME"); MESA_load_profile_string_def(profile, section, "table_host", g_doh_conf->tables[TYPE_HOST].name, TFE_STRING_MAX, "ATTR_SERVER_FQDN"); @@ -824,7 +824,7 @@ int doh_on_data(const struct tfe_stream *stream, const struct tfe_http_session * void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, unsigned int thread_id) { size_t c2s_byte_num = 0, s2c_byte_num =0; - struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT); + struct tfe_fieldstat_metric_t *fieldstat = tfe_get_fieldstat_handle(); fieldstat->tags[thread_id][TAG_VSYS_ID].value_int = ctx->result->vsys_id; fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->result->config_id; diff --git a/plugin/business/doh/src/logger.cpp b/plugin/business/doh/src/logger.cpp index a2567c0..279915c 100644 --- a/plugin/business/doh/src/logger.cpp +++ b/plugin/business/doh/src/logger.cpp @@ -1,4 +1,5 @@ #include "logger.h" +#include "kafka.h" struct json_spec { @@ -287,14 +288,6 @@ int doh_kafka_init(const char *profile, struct doh_conf *conf) { return 0; } - conf->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); - conf->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG); - conf->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); - if (conf->kafka_logger && !conf->kafka_logger->enable) - { - TFE_LOG_ERROR(conf->local_logger, "Doh sendlog ENABLE, but tfe kafka logger DISABLED."); - return -1; - } return 0; } @@ -357,7 +350,6 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c const char *tmp_val = NULL; cJSON *common_obj = NULL, *per_hit_obj = NULL; char *log_payload = NULL; - int kafka_status = 0; int send_cnt = 0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0}; @@ -450,18 +442,18 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); - cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); - cJSON_AddNumberToObject(common_obj, "t_vsys_id", handle->kafka_logger->t_vsys_id); + cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip()); + cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id()); cJSON_AddNumberToObject(common_obj, "vsys_id", ctx->vsys_id); - cJSON_AddStringToObject(common_obj, "device_id", handle->device_id); + cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id()); cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num); cJSON_AddStringToObject(common_obj, "doh_url", http->req->req_spec.url); doh_add_host_to_object(common_obj, http->req->req_spec.host); - if(handle->effective_device_tag) + if(tfe_get_device_tag()) { - cJSON_AddStringToObject(common_obj, "device_tag", handle->effective_device_tag); + cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag()); } for (size_t i = 0; i < sizeof(req_fields) / sizeof(struct json_spec); i++) @@ -524,19 +516,12 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c log_payload = cJSON_PrintUnformatted(per_hit_obj); - TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); - free(log_payload); - cJSON_Delete(per_hit_obj); - if (kafka_status < 0) - { - TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); - } - else + if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0) { send_cnt++; } + free(log_payload); + cJSON_Delete(per_hit_obj); } cJSON_Delete(common_obj); diff --git a/plugin/business/doh/src/pub.h b/plugin/business/doh/src/pub.h index 491bfe2..a0b7d5b 100644 --- a/plugin/business/doh/src/pub.h +++ b/plugin/business/doh/src/pub.h @@ -13,7 +13,6 @@ extern "C" #include <tfe_plugin.h> #include <MESA/maat.h> #include <MESA/MESA_prof_load.h> -#include <tfe_kafka_logger.h> #include "dns.h" @@ -57,9 +56,7 @@ struct doh_conf int entry_id; int en_sendlog; - const char *device_id; const char *effective_device_tag; - tfe_kafka_logger_t *kafka_logger; int fs_id[DOH_STAT_MAX]; long long stat_val[DOH_STAT_MAX]; diff --git a/plugin/business/ssl-policy/src/ssl_policy.cpp b/plugin/business/ssl-policy/src/ssl_policy.cpp index 625ba48..93dc152 100644 --- a/plugin/business/ssl-policy/src/ssl_policy.cpp +++ b/plugin/business/ssl-policy/src/ssl_policy.cpp @@ -171,7 +171,7 @@ struct ssl_policy_enforcer* ssl_policy_enforcer_create(void* logger) { UNUSED int ret=0; struct ssl_policy_enforcer* enforcer=ALLOC(struct ssl_policy_enforcer, 1); - enforcer->maat=(struct maat*)tfe_bussiness_resouce_get(STATIC_MAAT);; + enforcer->maat=tfe_get_maat_handle(); enforcer->logger=logger; enforcer->profile_table_id=maat_get_table_id(enforcer->maat, "PXY_PROFILE_DECRYPTION"); assert(enforcer->profile_table_id >= 0); diff --git a/plugin/business/tcp-policy/src/tcp_policy.cpp b/plugin/business/tcp-policy/src/tcp_policy.cpp index d5360b0..bba0297 100644 --- a/plugin/business/tcp-policy/src/tcp_policy.cpp +++ b/plugin/business/tcp-policy/src/tcp_policy.cpp @@ -209,7 +209,7 @@ struct tcp_policy_enforcer *tcp_policy_enforcer_create(void *logger) { int ret = 0; struct tcp_policy_enforcer *enforcer = ALLOC(struct tcp_policy_enforcer, 1); - enforcer->maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + enforcer->maat = tfe_get_maat_handle(); enforcer->logger = logger; enforcer->table_id = maat_get_table_id(enforcer->maat, "PXY_PROFILE_TCP_OPTION"); if (enforcer->table_id < 0) diff --git a/plugin/business/tsg-http/src/tsg_http.cpp b/plugin/business/tsg-http/src/tsg_http.cpp index d07d529..e30d25a 100644 --- a/plugin/business/tsg-http/src/tsg_http.cpp +++ b/plugin/business/tsg-http/src/tsg_http.cpp @@ -1004,7 +1004,7 @@ int maat_table_ex_init(int profile_idx, int proxy_policy_init(const char* profile_path, const char* static_section, const char* dynamic_section) { int ret = 0; - g_proxy_rt->feather = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + g_proxy_rt->feather = tfe_get_maat_handle(); const char * table_name[__SCAN_TABLE_MAX]; table_name[PXY_CTRL_HTTP_URL] = "ATTR_HTTP_URL"; @@ -1404,7 +1404,7 @@ void proxy_send_metric_log(const struct tfe_stream * stream, struct proxy_http_c proxy_action_map[PX_ACTION_REJECT]="deny"; proxy_action_map[PX_ACTION_WHITELIST]="allow"; const char *manipulate_action_map[]= {"redirect","block","replace","hijack","insert","edit_element","run_script"}; - struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT); + struct tfe_fieldstat_metric_t *fieldstat = tfe_get_fieldstat_handle(); for(i=0; i< ctx->n_enforce; i++) { diff --git a/plugin/business/tsg-http/src/tsg_logger.cpp b/plugin/business/tsg-http/src/tsg_logger.cpp index 51cefe9..348e011 100644 --- a/plugin/business/tsg-http/src/tsg_logger.cpp +++ b/plugin/business/tsg-http/src/tsg_logger.cpp @@ -1,12 +1,12 @@ #include <cjson/cJSON.h> #include <MESA/MESA_prof_load.h> -#include <tfe_kafka_logger.h> #include <tfe_utils.h> #include <tfe_resource.h> #include <event2/event.h> #include <event.h> +#include "kafka.h" #include "mpack.h" #include "tsg_proxy_logger.h" @@ -19,15 +19,12 @@ struct proxy_logger { int entry_id; unsigned int en_sendlog; - const char *device_id; - const char *effective_device_tag; void* local_logger; unsigned long long send_cnt; unsigned long long random_drop; unsigned long long user_abort; char local_log_path[TFE_STRING_MAX]; - tfe_kafka_logger_t *kafka_logger; struct cache_evbase_instance * log_file_upload_instance; }; @@ -59,7 +56,6 @@ void get_http_body_uuid(char *uuid) size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct evbuffer *http_body) { - int kafka_status=0; mpack_writer_t writer; char *mpack_data=NULL, *data=NULL; size_t mpack_size=0, datalen=0; @@ -94,11 +90,7 @@ size_t file_bucket_upload_once(struct proxy_logger* handle, char *uuid, struct e { TFE_LOG_ERROR(handle->local_logger, "Mpack writer destroy is error(%s), uuid: %s", mpack_error_to_string(errorno), uuid); } - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_BUCKET, mpack_data, mpack_size); - if(kafka_status<0) - { - TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); - } + kafka_send(tfe_get_kafka_handle(), TOPIC_FILE_STREAM, mpack_data, mpack_size); free(mpack_data); mpack_data = NULL; @@ -112,29 +104,10 @@ struct proxy_logger* proxy_log_handle_create(const char* profile, const char* se struct proxy_logger* instance=ALLOC(struct proxy_logger,1); instance->local_logger=local_logger; - TFE_LOG_INFO(local_logger,"Tsg-Pxy log is inititating from %s section %s.", profile, section); MESA_load_profile_uint_def(profile, section, "en_sendlog", &instance->en_sendlog, 1); TFE_LOG_INFO(local_logger, "Tsg-Pxy sendlog : %s", instance->en_sendlog ? "ENABLE" : "DISABLE"); - if (!instance->en_sendlog) - { - return instance; - } - - instance->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); - instance->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG); - instance->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); - if (instance->kafka_logger && !instance->kafka_logger->enable) - { - TFE_LOG_ERROR(local_logger, "Tsg-Pxy sendlog ENABLE, but tfe kafka logger DISABLED."); - goto error_out; - } - return instance; - -error_out: - free(instance); - return NULL; } static int get_ip_client_geolocation(struct tfe_cmsg * cmsg, cJSON *per_hit_obj) @@ -193,7 +166,6 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) const char* tmp_val=NULL; cJSON *common_obj=NULL, *per_hit_obj=NULL; char* log_payload=NULL; - int kafka_status=0; int send_cnt=0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN,INET_ADDRSTRLEN)] = {0}; @@ -305,17 +277,17 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); - cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); - cJSON_AddNumberToObject(common_obj, "t_vsys_id", handle->kafka_logger->t_vsys_id); - cJSON_AddStringToObject(common_obj, "device_id", handle->device_id); + cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip()); + cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id()); + cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id()); cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num); cJSON_AddStringToObject(common_obj, "http_url", http->req->req_spec.url); proxy_add_host_to_object(common_obj, http->req->req_spec.host); - if(handle->effective_device_tag) + if (tfe_get_device_tag()) { - cJSON_AddStringToObject(common_obj, "device_tag", handle->effective_device_tag); + cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag()); } for(size_t i=0;i<sizeof(req_fields)/sizeof(struct json_spec);i++) @@ -459,14 +431,12 @@ int proxy_send_log(struct proxy_logger* handle, const struct proxy_log* log_msg) TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); - free(log_payload); - cJSON_Delete(per_hit_obj); - if(kafka_status<0) + if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0) { - TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); + send_cnt++; } - send_cnt++; + free(log_payload); + cJSON_Delete(per_hit_obj); } cJSON_Delete(common_obj); |
