summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2021-08-19 16:24:19 +0800
committerfengweihao <[email protected]>2021-08-19 16:24:19 +0800
commitc41a67ca2b410c13bfdc651af4992d8f8bc824e4 (patch)
treef0eb6ca13e54be4bdeae3664fdc310a9da2fe443
parent2a729a48b318faaa0ab1285a73b00e2cfed344c7 (diff)
TSG-7471 Proxy连接kafka时增加认证信息
-rw-r--r--common/include/tfe_kafka_logger.h3
-rw-r--r--common/src/tfe_kafka_logger.cpp26
-rw-r--r--common/src/tfe_resource.cpp12
-rw-r--r--conf/tfe/tfe.conf4
-rw-r--r--platform/src/ssl_fetch_cert.cpp7
5 files changed, 46 insertions, 6 deletions
diff --git a/common/include/tfe_kafka_logger.h b/common/include/tfe_kafka_logger.h
index 85cac25..0ccd223 100644
--- a/common/include/tfe_kafka_logger.h
+++ b/common/include/tfe_kafka_logger.h
@@ -23,7 +23,8 @@ extern "C"
rd_kafka_topic_t *kafka_topic;
} tfe_kafka_logger_t;
- tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger);
+ tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name,
+ const char *sasl_username, const char *sasl_passwd, void *local_logger);
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len);
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp
index 549f9a8..59a90f2 100644
--- a/common/src/tfe_kafka_logger.cpp
+++ b/common/src/tfe_kafka_logger.cpp
@@ -34,7 +34,7 @@ error:
return INADDR_NONE;
}
-static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logger)
+static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{
int ret;
char kafka_errstr[1024] = {0};
@@ -65,6 +65,26 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logge
return NULL;
}
+ if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
+ {
+ rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
+ rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
+ ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
+ rd_kafka_conf_destroy(rconf);
+ return NULL;
+ }
+ ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
+ rd_kafka_conf_destroy(rconf);
+ return NULL;
+ }
+ }
+
//The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
rconf = NULL;
@@ -84,7 +104,7 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, void *local_logge
return handle;
}
-tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, void *local_logger)
+tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
if (!logger)
@@ -104,7 +124,7 @@ tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, co
inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
strncpy(logger->broker_list, brokerlist, strlen(brokerlist));
- logger->kafka_handle = create_kafka_handle(logger->broker_list, local_logger);
+ logger->kafka_handle = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, local_logger);
if (logger->kafka_handle == NULL)
{
TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);
diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp
index c1b6e71..1d1c8ad 100644
--- a/common/src/tfe_resource.cpp
+++ b/common/src/tfe_resource.cpp
@@ -159,12 +159,16 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
char nic_name[64] = {0};
char brokerlist[TFE_STRING_MAX] = {0};
char topic_name[TFE_STRING_MAX] = {0};
+ char sasl_username[TFE_STRING_MAX] = {0};
+ char sasl_passwd[TFE_STRING_MAX] = {0};
tfe_kafka_logger_t *kafka_logger = NULL;
MESA_load_profile_int_def(profile, section, "enable", &enable, 1);
MESA_load_profile_string_def(profile, section, "NIC_NAME", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "KAFKA_BROKERLIST", brokerlist, sizeof(brokerlist), "");
MESA_load_profile_string_def(profile, section, "KAFKA_TOPIC", topic_name, sizeof(topic_name), "POLICY-EVENT-LOG");
+ MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
+ MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
if (!strlen(brokerlist))
{
@@ -172,7 +176,7 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
return NULL;
}
- kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, logger);
+ kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, topic_name, sasl_username, sasl_passwd, logger);
if (kafka_logger == NULL)
{
TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
@@ -183,6 +187,12 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
TFE_LOG_INFO(logger, "tfe kafka topic : %s", topic_name);
TFE_LOG_INFO(logger, "tfe kafka brokerlist : %s", brokerlist);
+ if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
+ {
+ TFE_LOG_INFO(logger, "tfe kafka sasl_username : %s", sasl_username);
+ TFE_LOG_INFO(logger, "tfe kafka sasl_passwd : %s", sasl_passwd);
+ }
+
return kafka_logger;
}
diff --git a/conf/tfe/tfe.conf b/conf/tfe/tfe.conf
index e5ca8ea..9577012 100644
--- a/conf/tfe/tfe.conf
+++ b/conf/tfe/tfe.conf
@@ -90,6 +90,8 @@ mc_cache_enable=1
mc_cache_eth=eth0
mc_cache_broker_list=192.168.40.224:9092
mc_cache_topic=PXY-EXCH-INTERMEDIA-CERT
+sasl_username=admin
+sasl_passwd=galaxy2019
[key_keeper]
#Mode: debug - generate cert with ca_path, normal - generate cert with cert store
@@ -159,6 +161,8 @@ enable=1
NIC_NAME=enp2s0
kafka_brokerlist=192.168.40.224:9092
kafka_topic=PROXY-EVENT-LOG
+sasl_username=admin
+sasl_passwd=galaxy2019
device_id_filepath=/opt/tsg/etc/tsg_sn.json
[maat]
diff --git a/platform/src/ssl_fetch_cert.cpp b/platform/src/ssl_fetch_cert.cpp
index c2e48b5..61e97df 100644
--- a/platform/src/ssl_fetch_cert.cpp
+++ b/platform/src/ssl_fetch_cert.cpp
@@ -48,10 +48,15 @@ int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
char nic_name[64] = {0};
char broker_list[TFE_SYMBOL_MAX] = {0};
char topic_name[TFE_SYMBOL_MAX] = {0};
+ char sasl_username[TFE_STRING_MAX] = {0};
+ char sasl_passwd[TFE_STRING_MAX] = {0};
MESA_load_profile_int_def(profile, section, "mc_cache_enable", &enable, 0);
MESA_load_profile_string_def(profile, section, "mc_cache_eth", nic_name, sizeof(nic_name), "eth0");
MESA_load_profile_string_def(profile, section, "mc_cache_topic", topic_name, sizeof(topic_name), "PXY-EXCH-INTERMEDIA-CERT");
+ MESA_load_profile_string_def(profile, section, "SASL_USERNAME", sasl_username, sizeof(sasl_username), "");
+ MESA_load_profile_string_def(profile, section, "SASL_PASSWD", sasl_passwd, sizeof(sasl_passwd), "");
+
if (!enable)
goto skip;
if (MESA_load_profile_string_def(profile, section, "mc_cache_broker_list", broker_list, sizeof(broker_list), NULL) < 0)
@@ -60,7 +65,7 @@ int ssl_mid_cert_kafka_logger_create(const char *profile, const char *section)
return -1;
}
skip:
- g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, g_default_logger);
+ g_kafka_logger = tfe_kafka_logger_create(enable, nic_name, broker_list, topic_name, sasl_username, sasl_passwd, g_default_logger);
if (g_kafka_logger)
return 0;
else