diff options
| author | luwenpeng <[email protected]> | 2024-07-19 18:20:04 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-07-19 18:57:57 +0800 |
| commit | 2045d517cabbf7559e18367fa33b3a170143ef79 (patch) | |
| tree | 15c70ad522c455c6b51622181442e12cea75f3e7 /platform/src | |
| parent | 88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff) | |
featureļ¼ TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'platform/src')
| -rw-r--r-- | platform/src/acceptor_kni_v4.cpp | 4 | ||||
| -rw-r--r-- | platform/src/proxy.cpp | 4 | ||||
| -rw-r--r-- | platform/src/ssl_fetch_cert.cpp | 70 | ||||
| -rw-r--r-- | platform/src/ssl_service_cache.cpp | 21 | ||||
| -rw-r--r-- | platform/src/ssl_stream.cpp | 5 |
5 files changed, 20 insertions, 84 deletions
diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp index 49ad2ba..ab73596 100644 --- a/platform/src/acceptor_kni_v4.cpp +++ b/platform/src/acceptor_kni_v4.cpp @@ -171,7 +171,7 @@ static void *worker_thread_cycle(void *arg) io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx); } - TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index); + TFE_LOG_INFO(logger, "worker thread %d is running", thread_index); while (1) { @@ -216,7 +216,7 @@ static void *worker_thread_cycle(void *arg) } error_out: - TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", LOG_TAG_SCE, thread_index); + TFE_LOG_ERROR(logger, "worker thread %d exiting", thread_index); return (void *)NULL; } diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp index ddfe5c4..746ba29 100644 --- a/platform/src/proxy.cpp +++ b/platform/src/proxy.cpp @@ -272,7 +272,7 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg) TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf"); MESA_handle_runtime_log_reconstruction(NULL); MESA_load_profile_int_def(profile_path, "maat", "log_level", &(log_level), LOG_LEVEL_FATAL); - maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); + maat = tfe_get_maat_handle(); if(maat) { maat_reload_log_level(maat, (enum log_level)log_level); @@ -688,7 +688,7 @@ int main(int argc, char * argv[]) CHECK_OR_EXIT(g_default_proxy->key_keeper_handler, "Failed at init Key keeper. Exit."); /* RESOURCE INIT */ - ret = tfe_bussiness_resouce_init(); + ret = tfe_env_init(); CHECK_OR_EXIT(ret == 0, "TFE bussiness resource init failed. Exit."); /* SSL INIT */ diff --git a/platform/src/ssl_fetch_cert.cpp b/platform/src/ssl_fetch_cert.cpp index 7a4a0de..a206260 100644 --- a/platform/src/ssl_fetch_cert.cpp +++ b/platform/src/ssl_fetch_cert.cpp @@ -1,12 +1,9 @@ -// -// Created by lwp on 2019/10/16. -// - #include <assert.h> #include <cjson/cJSON.h> +#include "kafka.h" +#include "tfe_utils.h" #include <ssl_utils.h> -#include <tfe_kafka_logger.h> #include <tfe_resource.h> #include <MESA/MESA_prof_load.h> @@ -36,51 +33,8 @@ static char cert_type_desc[MAX_TYPE][64] = { {"Root certificate"}, }; -struct ssl_mid_cert_ctx -{ - int enable; - tfe_kafka_logger_t *g_kafka_logger; -}; -struct ssl_mid_cert_ctx mid_cert_ctx; - -int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section) -{ - int ret=0; - char topic_name[TFE_SYMBOL_MAX] = {0}; - char sasl_username[TFE_STRING_MAX] = {0}; - char sasl_passwd[TFE_STRING_MAX] = {0}; - - MESA_load_profile_int_def(profile, section, "mc_cache_enable", &mid_cert_ctx.enable, 0); - MESA_load_profile_string_def(profile, "kafka", "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT"); - MESA_load_profile_string_def(profile, "kafka", "SASL_USERNAME", sasl_username, sizeof(sasl_username), ""); - MESA_load_profile_string_def(profile, "kafka", "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), ""); - - if(mid_cert_ctx.enable == 0) - { - return 0; - } - - mid_cert_ctx.g_kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER); - if(!mid_cert_ctx.g_kafka_logger) - { - return -1; - } - - ret = tfe_logger_create_kafka_topic(mid_cert_ctx.g_kafka_logger, sasl_username, sasl_passwd, topic_name, TOPIC_MC_CACHE, g_default_logger); - if(ret < 0) - { - return -1; - } - return 0; -} - -static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert) +static void send_cert_to_kafka(const char *sni, const char *fingerprint, const char *cert) { - if (mid_cert_ctx.g_kafka_logger->enable == 0) - { - return; - } - cJSON *obj = NULL; cJSON *dup = NULL; char *msg = NULL; @@ -88,13 +42,12 @@ static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerpr obj = cJSON_CreateObject(); cJSON_AddStringToObject(obj, "sni", sni); cJSON_AddStringToObject(obj, "fingerprint", fingerprint); - cJSON_AddNumberToObject(obj, "vsys_id", mid_cert_ctx.g_kafka_logger->t_vsys_id); + cJSON_AddNumberToObject(obj, "vsys_id", tfe_get_vsys_id()); cJSON_AddStringToObject(obj, "cert", cert); - cJSON_AddStringToObject(obj, "tfe_ip", mid_cert_ctx.g_kafka_logger->local_ip_str); + cJSON_AddStringToObject(obj, "tfe_ip", tfe_get_sled_ip()); dup = cJSON_Duplicate(obj, 1); msg = cJSON_PrintUnformatted(dup); - TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", mid_cert_ctx.g_kafka_logger->topic_name[TOPIC_MC_CACHE], msg); - tfe_kafka_logger_send(mid_cert_ctx.g_kafka_logger, TOPIC_MC_CACHE, msg, strlen(msg)); + kafka_send(tfe_get_kafka_handle(), TOPIC_EXCH_CERT, msg, strlen(msg)); free(msg); cJSON_Delete(dup); @@ -142,10 +95,6 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE * char *fingerprint = NULL; X509 *cert = NULL; X509_OBJECT *obj = NULL; - if (!mid_cert_ctx.g_kafka_logger || !mid_cert_ctx.enable) - { - return; - } deep = sk_X509_num(cert_chain); for (int i = 0; i < deep; i++) @@ -197,13 +146,12 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE * if (!in_store && fingerprint && pem) { - ssl_mid_cert_kafka_logger_send(hostname, fingerprint, pem); + send_cert_to_kafka(hostname, fingerprint, pem); } end: - TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s; cert:%s", - i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"), - ((pem && mid_cert_ctx.g_kafka_logger->enable == 0x10) ? pem : " ...")); + TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s", + i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL")); if (pem) { free(pem); diff --git a/platform/src/ssl_service_cache.cpp b/platform/src/ssl_service_cache.cpp index 0561f43..13c7adc 100644 --- a/platform/src/ssl_service_cache.cpp +++ b/platform/src/ssl_service_cache.cpp @@ -3,12 +3,6 @@ #include <tfe_resource.h> #include <ssl_stream.h> -struct ssl_ja3_enforcer -{ - struct maat *maat; - int table_id; -}; - struct ssl_svc_ja3 { char ja3_hash[33]; @@ -26,7 +20,7 @@ struct ssl_svc_addr const char *dport; }; -static struct ssl_ja3_enforcer g_static_enforcer = {0}; +static int table_id = 0; static void ssl_svc_ja3_param_dup_cb(int table_id, void **to, void **from, long argl, void *argp) { @@ -93,24 +87,23 @@ static void ssl_svc_ja3_param_free(struct ssl_svc_ja3 *param) static int ssl_svc_ja3_init(const char *table_name) { - g_static_enforcer.maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT); - g_static_enforcer.table_id = maat_get_table_id(g_static_enforcer.maat, table_name); - if (g_static_enforcer.table_id < 0) + table_id = maat_get_table_id(tfe_get_maat_handle(), table_name); + if (table_id < 0) { TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", table_name); return 0; } - int ret = maat_plugin_table_ex_schema_register(g_static_enforcer.maat, + int ret = maat_plugin_table_ex_schema_register(tfe_get_maat_handle(), table_name, ssl_svc_ja3_param_new_cb, ssl_svc_ja3_param_free_cb, ssl_svc_ja3_param_dup_cb, 0, - &g_static_enforcer); + NULL); if (ret < 0) { TFE_LOG_ERROR(g_default_logger, "failed at Maat_plugin_EX_register(%s), table_id = %d, ret = %d", - table_name, g_static_enforcer.table_id, ret); + table_name, table_id, ret); return 0; } @@ -122,7 +115,7 @@ enum ssl_ja3_pinning_status ssl_svc_ja3_scan(char *ja3_hash, const char *addr_st enum ssl_ja3_pinning_status ret = JA3_PINNING_STATUS_UNKNOWN; struct ssl_svc_ja3 *param = NULL; - param = (struct ssl_svc_ja3 *)maat_plugin_table_get_ex_data(g_static_enforcer.maat, g_static_enforcer.table_id, ja3_hash, strlen(ja3_hash)); + param = (struct ssl_svc_ja3 *)maat_plugin_table_get_ex_data(tfe_get_maat_handle(), table_id, ja3_hash, strlen(ja3_hash)); if (param == NULL) { ret = JA3_PINNING_STATUS_UNKNOWN; diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp index 46fb62a..fbbf6d3 100644 --- a/platform/src/ssl_stream.cpp +++ b/platform/src/ssl_stream.cpp @@ -636,11 +636,6 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section mgr->logger = logger; mgr->ev_base_gc=ev_base_gc; - if (ssl_mid_cert_kafka_logger_create(ini_profile, section)) - { - goto error_out; - } - MESA_load_profile_uint_def(ini_profile, section, "ssl_debug", &(ssl_debug), 0); MESA_load_profile_string_def(ini_profile, section, "ssl_min_version", version_str, sizeof(version_str), "ssl3"); mgr->ssl_min_version = sslver_str2num(version_str); |
