summaryrefslogtreecommitdiff
path: root/common
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2024-07-12 11:13:56 +0800
committerfengweihao <[email protected]>2024-07-12 11:13:56 +0800
commit88a7a8c5c48f681cdd0478a809652ac94c2647ee (patch)
treeb0b56b4faaa0a5d667112dd1a3868ef0bc6c05a1 /common
parent5eccfbb882852a9c4779ee4d7039b1ae7e6a9bbb (diff)
TSG-21787 Proxy初始化kafka topic时增加client.id参数v4.9.7-20240712
TSG-21730 Proxy优化Manipulation日志中host和fqdn字段的格式
Diffstat (limited to 'common')
-rw-r--r--common/include/tfe_kafka_logger.h6
-rw-r--r--common/src/tfe_kafka_logger.cpp69
-rw-r--r--common/src/tfe_resource.cpp38
3 files changed, 70 insertions, 43 deletions
diff --git a/common/include/tfe_kafka_logger.h b/common/include/tfe_kafka_logger.h
index 82db3d7..0ea0d20 100644
--- a/common/include/tfe_kafka_logger.h
+++ b/common/include/tfe_kafka_logger.h
@@ -28,12 +28,12 @@ typedef struct tfe_kafka_logger_s
char topic_name[TOPIC_MAX][TFE_STRING_MAX];
char broker_list[TFE_STRING_MAX];
- rd_kafka_t *kafka_handle;
+ rd_kafka_t *kafka_handle[TOPIC_MAX];
rd_kafka_topic_t *kafka_topic[TOPIC_MAX];
} tfe_kafka_logger_t;
-tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger);
-int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger);
+tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger);
+int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger);
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger);
int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len);
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp
index 26cfea6..28f4ce8 100644
--- a/common/src/tfe_kafka_logger.cpp
+++ b/common/src/tfe_kafka_logger.cpp
@@ -34,7 +34,7 @@ error:
return INADDR_NONE;
}
-static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
+static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, const char *topic_name, void *local_logger)
{
int ret;
char kafka_errstr[1024] = {0};
@@ -64,6 +64,13 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_
rd_kafka_conf_destroy(rconf);
return NULL;
}
+ ret = rd_kafka_conf_set(rconf, "client.id", topic_name, kafka_errstr, sizeof(kafka_errstr));
+ if (ret != RD_KAFKA_CONF_OK)
+ {
+ TFE_LOG_ERROR(local_logger, "Error to set kafka \"client.id\", %s.", kafka_errstr);
+ rd_kafka_conf_destroy(rconf);
+ return NULL;
+ }
if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
{
@@ -109,11 +116,11 @@ int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_nam
if(logger && logger->enable)
{
strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1);
- logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
+ logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle[topic_id], topic_name, NULL);
if (logger->kafka_topic[topic_id] == NULL)
{
TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
- rd_kafka_destroy(logger->kafka_handle);
+ rd_kafka_destroy(logger->kafka_handle[topic_id]);
free(logger);
return -1;
}
@@ -121,22 +128,27 @@ int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_nam
return 0;
}
-tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
+tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger)
{
char *override_sled_ip=NULL;
+
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
if (!logger)
- return NULL;
+ {
+ return NULL;
+ }
logger->enable = enable;
if (!logger->enable)
- return logger;
+ {
+ return logger;
+ }
override_sled_ip = getenv("OVERRIDE_SLED_IP");
if(override_sled_ip != NULL)
{
strncpy(logger->local_ip_str, override_sled_ip, sizeof(logger->local_ip_str)-1);
- goto create_kafka;
+ goto finish;
}
logger->local_ip_num = get_ip_by_eth_name(nic_name);
@@ -147,32 +159,45 @@ tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, co
return NULL;
}
inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
+finish:
+ strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1);
+ return logger;
+}
-create_kafka:
- strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1);
- logger->kafka_handle = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, local_logger);
- if (logger->kafka_handle == NULL)
+int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger)
+{
+ if(!logger->enable)
+ {
+ return 0;
+ }
+
+ logger->kafka_handle[topic_id] = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, topic_name, local_logger);
+ if (logger->kafka_handle[topic_id] == NULL)
{
TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);
free(logger);
- return NULL;
+ return -1;
}
- return logger;
+ tfe_kafka_logger_topic_new(logger, topic_name, topic_id, logger);
+ return 0;
}
void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger)
{
if (logger)
{
- if (logger->kafka_handle)
- rd_kafka_destroy(logger->kafka_handle);
-
- if (logger->kafka_topic[TOPIC_LOGGER])
- rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_LOGGER]);
-
- if (logger->kafka_topic[TOPIC_BUCKET])
- rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_BUCKET]);
-
+ for(int i=0; i<TOPIC_MAX; i++)
+ {
+ if(logger->kafka_topic[i])
+ {
+ rd_kafka_topic_destroy(logger->kafka_topic[i]);
+ }
+
+ if(logger->kafka_handle[i])
+ {
+ rd_kafka_destroy(logger->kafka_handle[i]);
+ }
+ }
free(logger);
logger = NULL;
}
diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp
index d696ecf..c0c4e6d 100644
--- a/common/src/tfe_resource.cpp
+++ b/common/src/tfe_resource.cpp
@@ -175,7 +175,7 @@ error_out:
static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *section, void *logger)
{
- int enable = 0, vsystem_id = 0;
+ int ret=0, enable=0, vsystem_id=0;
char nic_name[TFE_SYMBOL_MAX] = {0};
char brokerlist[TFE_STRING_MAX] = {0};
char logger_topic[TFE_STRING_MAX] = {0};
@@ -199,24 +199,26 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *
return NULL;
}
- kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, sasl_username, sasl_passwd, logger);
- if (kafka_logger == NULL)
- {
- TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
- return NULL;
- }
+ kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, logger);
+ if (kafka_logger == NULL)
+ {
+ TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger.");
+ return NULL;
+ }
- int ret = tfe_kafka_logger_topic_new(kafka_logger, logger_topic, TOPIC_LOGGER, logger);
- if(ret < 0)
- {
- return NULL;
- }
-
- ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, TOPIC_BUCKET, logger);
- if(ret < 0)
- {
- return NULL;
- }
+ ret = tfe_logger_create_kafka_topic(kafka_logger, sasl_username, sasl_passwd, logger_topic, TOPIC_LOGGER, logger);
+ if(ret < 0)
+ {
+ TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create %s topic.", logger_topic);
+ return NULL;
+ }
+
+ ret = tfe_logger_create_kafka_topic(kafka_logger, sasl_username, sasl_passwd, bucket_topic, TOPIC_BUCKET, logger);
+ if(ret < 0)
+ {
+ TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create %s topic.", bucket_topic);
+ return NULL;
+ }
kafka_logger->t_vsys_id=vsystem_id;
TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE");