diff options
| author | luwenpeng <[email protected]> | 2024-07-19 18:20:04 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-07-19 18:57:57 +0800 |
| commit | 2045d517cabbf7559e18367fa33b3a170143ef79 (patch) | |
| tree | 15c70ad522c455c6b51622181442e12cea75f3e7 /plugin/business/doh | |
| parent | 88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff) | |
featureļ¼ TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'plugin/business/doh')
| -rw-r--r-- | plugin/business/doh/src/doh.cpp | 4 | ||||
| -rw-r--r-- | plugin/business/doh/src/logger.cpp | 33 | ||||
| -rw-r--r-- | plugin/business/doh/src/pub.h | 3 |
3 files changed, 11 insertions, 29 deletions
diff --git a/plugin/business/doh/src/doh.cpp b/plugin/business/doh/src/doh.cpp index 4103f69..15032b4 100644 --- a/plugin/business/doh/src/doh.cpp +++ b/plugin/business/doh/src/doh.cpp @@ -385,7 +385,7 @@ static void doh_maat_scan(const struct tfe_stream *stream, const struct tfe_http static int doh_maat_init(const char *profile, const char *section) { - g_doh_conf->maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + g_doh_conf->maat = tfe_get_maat_handle(); MESA_load_profile_string_def(profile, section, "table_appid", g_doh_conf->tables[TYPE_APPID].name, TFE_STRING_MAX, "ATTR_APP_ID"); MESA_load_profile_string_def(profile, section, "table_qname", g_doh_conf->tables[TYPE_QNAME].name, TFE_STRING_MAX, "ATTR_DOH_QNAME"); MESA_load_profile_string_def(profile, section, "table_host", g_doh_conf->tables[TYPE_HOST].name, TFE_STRING_MAX, "ATTR_SERVER_FQDN"); @@ -824,7 +824,7 @@ int doh_on_data(const struct tfe_stream *stream, const struct tfe_http_session * void doh_send_metric_log(const struct tfe_stream * stream, struct doh_ctx *ctx, unsigned int thread_id) { size_t c2s_byte_num = 0, s2c_byte_num =0; - struct tfe_fieldstat_metric_t *fieldstat = (struct tfe_fieldstat_metric_t *)tfe_bussiness_resouce_get(DYNAMIC_FIELDSTAT); + struct tfe_fieldstat_metric_t *fieldstat = tfe_get_fieldstat_handle(); fieldstat->tags[thread_id][TAG_VSYS_ID].value_int = ctx->result->vsys_id; fieldstat->tags[thread_id][TAG_RULE_ID].value_int = ctx->result->config_id; diff --git a/plugin/business/doh/src/logger.cpp b/plugin/business/doh/src/logger.cpp index a2567c0..279915c 100644 --- a/plugin/business/doh/src/logger.cpp +++ b/plugin/business/doh/src/logger.cpp @@ -1,4 +1,5 @@ #include "logger.h" +#include "kafka.h" struct json_spec { @@ -287,14 +288,6 @@ int doh_kafka_init(const char *profile, struct doh_conf *conf) { return 0; } - conf->device_id = (const char *)tfe_bussiness_resouce_get(DEVICE_ID); - conf->effective_device_tag = (const char *)tfe_bussiness_resouce_get(EFFECTIVE_DEVICE_TAG); - conf->kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); - if (conf->kafka_logger && !conf->kafka_logger->enable) - { - TFE_LOG_ERROR(conf->local_logger, "Doh sendlog ENABLE, but tfe kafka logger DISABLED."); - return -1; - } return 0; } @@ -357,7 +350,6 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c const char *tmp_val = NULL; cJSON *common_obj = NULL, *per_hit_obj = NULL; char *log_payload = NULL; - int kafka_status = 0; int send_cnt = 0; struct timeval cur_time; char src_ip_str[MAX(INET6_ADDRSTRLEN, INET_ADDRSTRLEN)] = {0}; @@ -450,18 +442,18 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c cJSON_AddStringToObject(common_obj, "ip_protocol", "tcp"); cJSON_AddNumberToObject(common_obj, "out_link_id", 0); cJSON_AddNumberToObject(common_obj, "in_link_id", 0); - cJSON_AddStringToObject(common_obj, "sled_ip", handle->kafka_logger->local_ip_str); - cJSON_AddNumberToObject(common_obj, "t_vsys_id", handle->kafka_logger->t_vsys_id); + cJSON_AddStringToObject(common_obj, "sled_ip", tfe_get_sled_ip()); + cJSON_AddNumberToObject(common_obj, "t_vsys_id", tfe_get_vsys_id()); cJSON_AddNumberToObject(common_obj, "vsys_id", ctx->vsys_id); - cJSON_AddStringToObject(common_obj, "device_id", handle->device_id); + cJSON_AddStringToObject(common_obj, "device_id", tfe_get_device_id()); cJSON_AddNumberToObject(common_obj, "sent_bytes", c2s_byte_num); cJSON_AddNumberToObject(common_obj, "received_bytes", s2c_byte_num); cJSON_AddStringToObject(common_obj, "doh_url", http->req->req_spec.url); doh_add_host_to_object(common_obj, http->req->req_spec.host); - if(handle->effective_device_tag) + if(tfe_get_device_tag()) { - cJSON_AddStringToObject(common_obj, "device_tag", handle->effective_device_tag); + cJSON_AddStringToObject(common_obj, "device_tag", tfe_get_device_tag()); } for (size_t i = 0; i < sizeof(req_fields) / sizeof(struct json_spec); i++) @@ -524,19 +516,12 @@ int doh_send_log(struct doh_conf *handle, const struct tfe_http_session *http, c log_payload = cJSON_PrintUnformatted(per_hit_obj); - TFE_LOG_DEBUG(handle->local_logger, "%s", log_payload); - - kafka_status = tfe_kafka_logger_send(handle->kafka_logger, TOPIC_LOGGER, log_payload, strlen(log_payload)); - free(log_payload); - cJSON_Delete(per_hit_obj); - if (kafka_status < 0) - { - TFE_LOG_ERROR(handle->local_logger, "Kafka produce failed: %s", rd_kafka_err2name(rd_kafka_last_error())); - } - else + if (kafka_send(tfe_get_kafka_handle(), TOPIC_PROXY_EVENT, log_payload, strlen(log_payload)) == 0) { send_cnt++; } + free(log_payload); + cJSON_Delete(per_hit_obj); } cJSON_Delete(common_obj); diff --git a/plugin/business/doh/src/pub.h b/plugin/business/doh/src/pub.h index 491bfe2..a0b7d5b 100644 --- a/plugin/business/doh/src/pub.h +++ b/plugin/business/doh/src/pub.h @@ -13,7 +13,6 @@ extern "C" #include <tfe_plugin.h> #include <MESA/maat.h> #include <MESA/MESA_prof_load.h> -#include <tfe_kafka_logger.h> #include "dns.h" @@ -57,9 +56,7 @@ struct doh_conf int entry_id; int en_sendlog; - const char *device_id; const char *effective_device_tag; - tfe_kafka_logger_t *kafka_logger; int fs_id[DOH_STAT_MAX]; long long stat_val[DOH_STAT_MAX]; |
