summaryrefslogtreecommitdiff
path: root/platform/src
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-07-19 18:20:04 +0800
committerluwenpeng <[email protected]>2024-07-19 18:57:57 +0800
commit2045d517cabbf7559e18367fa33b3a170143ef79 (patch)
tree15c70ad522c455c6b51622181442e12cea75f3e7 /platform/src
parent88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff)
feature: TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'platform/src')
-rw-r--r--platform/src/acceptor_kni_v4.cpp4
-rw-r--r--platform/src/proxy.cpp4
-rw-r--r--platform/src/ssl_fetch_cert.cpp70
-rw-r--r--platform/src/ssl_service_cache.cpp21
-rw-r--r--platform/src/ssl_stream.cpp5
5 files changed, 20 insertions, 84 deletions
diff --git a/platform/src/acceptor_kni_v4.cpp b/platform/src/acceptor_kni_v4.cpp
index 49ad2ba..ab73596 100644
--- a/platform/src/acceptor_kni_v4.cpp
+++ b/platform/src/acceptor_kni_v4.cpp
@@ -171,7 +171,7 @@ static void *worker_thread_cycle(void *arg)
io_uring_set_read_cb(io_uring_on_tap_s, handle_decryption_packet_from_tap, thread_ctx);
}
- TFE_LOG_INFO(logger, "%s: worker thread %d is running", "LOG_TAG_KNI", thread_index);
+ TFE_LOG_INFO(logger, "worker thread %d is running", thread_index);
while (1)
{
@@ -216,7 +216,7 @@ static void *worker_thread_cycle(void *arg)
}
error_out:
- TFE_LOG_ERROR(logger, "%s: worker thread %d exiting", LOG_TAG_SCE, thread_index);
+ TFE_LOG_ERROR(logger, "worker thread %d exiting", thread_index);
return (void *)NULL;
}
diff --git a/platform/src/proxy.cpp b/platform/src/proxy.cpp
index ddfe5c4..746ba29 100644
--- a/platform/src/proxy.cpp
+++ b/platform/src/proxy.cpp
@@ -272,7 +272,7 @@ static void __signal_handler_cb(evutil_socket_t fd, short what, void * arg)
TFE_LOG_ERROR(ctx->logger, "recv SIGHUP, reload zlog.conf");
MESA_handle_runtime_log_reconstruction(NULL);
MESA_load_profile_int_def(profile_path, "maat", "log_level", &(log_level), LOG_LEVEL_FATAL);
- maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT);
+ maat = tfe_get_maat_handle();
if(maat)
{
maat_reload_log_level(maat, (enum log_level)log_level);
@@ -688,7 +688,7 @@ int main(int argc, char * argv[])
CHECK_OR_EXIT(g_default_proxy->key_keeper_handler, "Failed at init Key keeper. Exit.");
/* RESOURCE INIT */
- ret = tfe_bussiness_resouce_init();
+ ret = tfe_env_init();
CHECK_OR_EXIT(ret == 0, "TFE bussiness resource init failed. Exit.");
/* SSL INIT */
diff --git a/platform/src/ssl_fetch_cert.cpp b/platform/src/ssl_fetch_cert.cpp
index 7a4a0de..a206260 100644
--- a/platform/src/ssl_fetch_cert.cpp
+++ b/platform/src/ssl_fetch_cert.cpp
@@ -1,12 +1,9 @@
-//
-// Created by lwp on 2019/10/16.
-//
-
#include <assert.h>
#include <cjson/cJSON.h>
+#include "kafka.h"
+#include "tfe_utils.h"
#include <ssl_utils.h>
-#include <tfe_kafka_logger.h>
#include <tfe_resource.h>
#include <MESA/MESA_prof_load.h>
@@ -36,51 +33,8 @@ static char cert_type_desc[MAX_TYPE][64] = {
{"Root certificate"},
};
-struct ssl_mid_cert_ctx
-{
- int enable;
- tfe_kafka_logger_t *g_kafka_logger;
-};
-struct ssl_mid_cert_ctx mid_cert_ctx;
-
-int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
-{
- int ret=0;
- char topic_name[TFE_SYMBOL_MAX] = {0};
- char sasl_username[TFE_STRING_MAX] = {0};
- char sasl_passwd[TFE_STRING_MAX] = {0};
-
- MESA_load_profile_int_def(profile, section, "mc_cache_enable", &mid_cert_ctx.enable, 0);
- MESA_load_profile_string_def(profile, "kafka", "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
- MESA_load_profile_string_def(profile, "kafka", "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
- MESA_load_profile_string_def(profile, "kafka", "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
-
- if(mid_cert_ctx.enable == 0)
- {
- return 0;
- }
-
- mid_cert_ctx.g_kafka_logger = (tfe_kafka_logger_t *)tfe_bussiness_resouce_get(KAFKA_LOGGER);
- if(!mid_cert_ctx.g_kafka_logger)
- {
- return -1;
- }
-
- ret = tfe_logger_create_kafka_topic(mid_cert_ctx.g_kafka_logger, sasl_username, sasl_passwd, topic_name, TOPIC_MC_CACHE, g_default_logger);
- if(ret < 0)
- {
- return -1;
- }
- return 0;
-}
-
-static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerprint, const char *cert)
+static void send_cert_to_kafka(const char *sni, const char *fingerprint, const char *cert)
{
- if (mid_cert_ctx.g_kafka_logger->enable == 0)
- {
- return;
- }
-
cJSON *obj = NULL;
cJSON *dup = NULL;
char *msg = NULL;
@@ -88,13 +42,12 @@ static void ssl_mid_cert_kafka_logger_send(const char *sni, const char *fingerpr
obj = cJSON_CreateObject();
cJSON_AddStringToObject(obj, "sni", sni);
cJSON_AddStringToObject(obj, "fingerprint", fingerprint);
- cJSON_AddNumberToObject(obj, "vsys_id", mid_cert_ctx.g_kafka_logger->t_vsys_id);
+ cJSON_AddNumberToObject(obj, "vsys_id", tfe_get_vsys_id());
cJSON_AddStringToObject(obj, "cert", cert);
- cJSON_AddStringToObject(obj, "tfe_ip", mid_cert_ctx.g_kafka_logger->local_ip_str);
+ cJSON_AddStringToObject(obj, "tfe_ip", tfe_get_sled_ip());
dup = cJSON_Duplicate(obj, 1);
msg = cJSON_PrintUnformatted(dup);
- TFE_LOG_DEBUG(g_default_logger, "log to [%s] msg:%s", mid_cert_ctx.g_kafka_logger->topic_name[TOPIC_MC_CACHE], msg);
- tfe_kafka_logger_send(mid_cert_ctx.g_kafka_logger, TOPIC_MC_CACHE, msg, strlen(msg));
+ kafka_send(tfe_get_kafka_handle(), TOPIC_EXCH_CERT, msg, strlen(msg));
free(msg);
cJSON_Delete(dup);
@@ -142,10 +95,6 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE *
char *fingerprint = NULL;
X509 *cert = NULL;
X509_OBJECT *obj = NULL;
- if (!mid_cert_ctx.g_kafka_logger || !mid_cert_ctx.enable)
- {
- return;
- }
deep = sk_X509_num(cert_chain);
for (int i = 0; i < deep; i++)
@@ -197,13 +146,12 @@ void ssl_fetch_trusted_cert_from_chain(STACK_OF(X509) * cert_chain, X509_STORE *
if (!in_store && fingerprint && pem)
{
- ssl_mid_cert_kafka_logger_send(hostname, fingerprint, pem);
+ send_cert_to_kafka(hostname, fingerprint, pem);
}
end:
- TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s; cert:%s",
- i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"),
- ((pem && mid_cert_ctx.g_kafka_logger->enable == 0x10) ? pem : " ..."));
+ TFE_LOG_DEBUG(g_default_logger, "[dep:%d/%d] is %s, in_trusted_store:%d, sin:%s; subject:(%s); issuer:(%s); fingerprint:%s",
+ i, deep, cert_type_desc[type], in_store, (hostname ? hostname : "NULL"), (subj ? subj : "NULL"), (issuer ? issuer : "NULL"), (fingerprint ? fingerprint : "NULL"));
if (pem)
{
free(pem);
diff --git a/platform/src/ssl_service_cache.cpp b/platform/src/ssl_service_cache.cpp
index 0561f43..13c7adc 100644
--- a/platform/src/ssl_service_cache.cpp
+++ b/platform/src/ssl_service_cache.cpp
@@ -3,12 +3,6 @@
#include <tfe_resource.h>
#include <ssl_stream.h>
-struct ssl_ja3_enforcer
-{
- struct maat *maat;
- int table_id;
-};
-
struct ssl_svc_ja3
{
char ja3_hash[33];
@@ -26,7 +20,7 @@ struct ssl_svc_addr
const char *dport;
};
-static struct ssl_ja3_enforcer g_static_enforcer = {0};
+static int table_id = 0;
static void ssl_svc_ja3_param_dup_cb(int table_id, void **to, void **from, long argl, void *argp)
{
@@ -93,24 +87,23 @@ static void ssl_svc_ja3_param_free(struct ssl_svc_ja3 *param)
static int ssl_svc_ja3_init(const char *table_name)
{
- g_static_enforcer.maat = (struct maat *)tfe_bussiness_resouce_get(STATIC_MAAT);
- g_static_enforcer.table_id = maat_get_table_id(g_static_enforcer.maat, table_name);
- if (g_static_enforcer.table_id < 0)
+ table_id = maat_get_table_id(tfe_get_maat_handle(), table_name);
+ if (table_id < 0)
{
TFE_LOG_ERROR(g_default_logger, "Maat table %s register failed.", table_name);
return 0;
}
- int ret = maat_plugin_table_ex_schema_register(g_static_enforcer.maat,
+ int ret = maat_plugin_table_ex_schema_register(tfe_get_maat_handle(),
table_name,
ssl_svc_ja3_param_new_cb,
ssl_svc_ja3_param_free_cb,
ssl_svc_ja3_param_dup_cb,
0,
- &g_static_enforcer);
+ NULL);
if (ret < 0)
{
TFE_LOG_ERROR(g_default_logger, "failed at Maat_plugin_EX_register(%s), table_id = %d, ret = %d",
- table_name, g_static_enforcer.table_id, ret);
+ table_name, table_id, ret);
return 0;
}
@@ -122,7 +115,7 @@ enum ssl_ja3_pinning_status ssl_svc_ja3_scan(char *ja3_hash, const char *addr_st
enum ssl_ja3_pinning_status ret = JA3_PINNING_STATUS_UNKNOWN;
struct ssl_svc_ja3 *param = NULL;
- param = (struct ssl_svc_ja3 *)maat_plugin_table_get_ex_data(g_static_enforcer.maat, g_static_enforcer.table_id, ja3_hash, strlen(ja3_hash));
+ param = (struct ssl_svc_ja3 *)maat_plugin_table_get_ex_data(tfe_get_maat_handle(), table_id, ja3_hash, strlen(ja3_hash));
if (param == NULL)
{
ret = JA3_PINNING_STATUS_UNKNOWN;
diff --git a/platform/src/ssl_stream.cpp b/platform/src/ssl_stream.cpp
index 46fb62a..fbbf6d3 100644
--- a/platform/src/ssl_stream.cpp
+++ b/platform/src/ssl_stream.cpp
@@ -636,11 +636,6 @@ struct ssl_mgr * ssl_manager_init(const char * ini_profile, const char * section
mgr->logger = logger;
mgr->ev_base_gc=ev_base_gc;
- if (ssl_mid_cert_kafka_logger_create(ini_profile, section))
- {
- goto error_out;
- }
-
MESA_load_profile_uint_def(ini_profile, section, "ssl_debug", &(ssl_debug), 0);
MESA_load_profile_string_def(ini_profile, section, "ssl_min_version", version_str, sizeof(version_str), "ssl3");
mgr->ssl_min_version = sslver_str2num(version_str);