diff options
| author | fengweihao <[email protected]> | 2024-07-12 11:13:56 +0800 |
|---|---|---|
| committer | fengweihao <[email protected]> | 2024-07-12 11:13:56 +0800 |
| commit | 88a7a8c5c48f681cdd0478a809652ac94c2647ee (patch) | |
| tree | b0b56b4faaa0a5d667112dd1a3868ef0bc6c05a1 /common | |
| parent | 5eccfbb882852a9c4779ee4d7039b1ae7e6a9bbb (diff) | |
TSG-21787 Proxy初始化kafka topic时增加client.id参数v4.9.7-20240712
TSG-21730 Proxy优化Manipulation日志中host和fqdn字段的格式
Diffstat (limited to 'common')
| -rw-r--r-- | common/include/tfe_kafka_logger.h | 6 | ||||
| -rw-r--r-- | common/src/tfe_kafka_logger.cpp | 69 | ||||
| -rw-r--r-- | common/src/tfe_resource.cpp | 38 |
3 files changed, 70 insertions, 43 deletions
diff --git a/common/include/tfe_kafka_logger.h b/common/include/tfe_kafka_logger.h index 82db3d7..0ea0d20 100644 --- a/common/include/tfe_kafka_logger.h +++ b/common/include/tfe_kafka_logger.h @@ -28,12 +28,12 @@ typedef struct tfe_kafka_logger_s char topic_name[TOPIC_MAX][TFE_STRING_MAX]; char broker_list[TFE_STRING_MAX]; - rd_kafka_t *kafka_handle; + rd_kafka_t *kafka_handle[TOPIC_MAX]; rd_kafka_topic_t *kafka_topic[TOPIC_MAX]; } tfe_kafka_logger_t; -tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger); -int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger); +tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger); +int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger); void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger); int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len); diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp index 26cfea6..28f4ce8 100644 --- a/common/src/tfe_kafka_logger.cpp +++ b/common/src/tfe_kafka_logger.cpp @@ -34,7 +34,7 @@ error: return INADDR_NONE; } -static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger) +static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, const char *topic_name, void *local_logger) { int ret; char kafka_errstr[1024] = {0}; @@ -64,6 +64,13 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_ rd_kafka_conf_destroy(rconf); return NULL; } + ret = rd_kafka_conf_set(rconf, "client.id", topic_name, kafka_errstr, sizeof(kafka_errstr)); + if (ret != RD_KAFKA_CONF_OK) + { + TFE_LOG_ERROR(local_logger, "Error to set kafka \"client.id\", %s.", kafka_errstr); + rd_kafka_conf_destroy(rconf); + return NULL; + } if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0) { @@ -109,11 +116,11 @@ int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_nam if(logger && logger->enable) { strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1); - logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL); + logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle[topic_id], topic_name, NULL); if (logger->kafka_topic[topic_id] == NULL) { TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name); - rd_kafka_destroy(logger->kafka_handle); + rd_kafka_destroy(logger->kafka_handle[topic_id]); free(logger); return -1; } @@ -121,22 +128,27 @@ int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_nam return 0; } -tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger) +tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger) { char *override_sled_ip=NULL; + tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t)); if (!logger) - return NULL; + { + return NULL; + } logger->enable = enable; if (!logger->enable) - return logger; + { + return logger; + } override_sled_ip = getenv("OVERRIDE_SLED_IP"); if(override_sled_ip != NULL) { strncpy(logger->local_ip_str, override_sled_ip, sizeof(logger->local_ip_str)-1); - goto create_kafka; + goto finish; } logger->local_ip_num = get_ip_by_eth_name(nic_name); @@ -147,32 +159,45 @@ tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, co return NULL; } inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str)); +finish: + strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1); + return logger; +} -create_kafka: - strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1); - logger->kafka_handle = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, local_logger); - if (logger->kafka_handle == NULL) +int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger) +{ + if(!logger->enable) + { + return 0; + } + + logger->kafka_handle[topic_id] = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, topic_name, local_logger); + if (logger->kafka_handle[topic_id] == NULL) { TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list); free(logger); - return NULL; + return -1; } - return logger; + tfe_kafka_logger_topic_new(logger, topic_name, topic_id, logger); + return 0; } void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger) { if (logger) { - if (logger->kafka_handle) - rd_kafka_destroy(logger->kafka_handle); - - if (logger->kafka_topic[TOPIC_LOGGER]) - rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_LOGGER]); - - if (logger->kafka_topic[TOPIC_BUCKET]) - rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_BUCKET]); - + for(int i=0; i<TOPIC_MAX; i++) + { + if(logger->kafka_topic[i]) + { + rd_kafka_topic_destroy(logger->kafka_topic[i]); + } + + if(logger->kafka_handle[i]) + { + rd_kafka_destroy(logger->kafka_handle[i]); + } + } free(logger); logger = NULL; } diff --git a/common/src/tfe_resource.cpp b/common/src/tfe_resource.cpp index d696ecf..c0c4e6d 100644 --- a/common/src/tfe_resource.cpp +++ b/common/src/tfe_resource.cpp @@ -175,7 +175,7 @@ error_out: static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char *section, void *logger) { - int enable = 0, vsystem_id = 0; + int ret=0, enable=0, vsystem_id=0; char nic_name[TFE_SYMBOL_MAX] = {0}; char brokerlist[TFE_STRING_MAX] = {0}; char logger_topic[TFE_STRING_MAX] = {0}; @@ -199,24 +199,26 @@ static tfe_kafka_logger_t *create_kafka_logger(const char *profile, const char * return NULL; } - kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, sasl_username, sasl_passwd, logger); - if (kafka_logger == NULL) - { - TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger."); - return NULL; - } + kafka_logger = tfe_kafka_logger_create(enable, nic_name, brokerlist, logger); + if (kafka_logger == NULL) + { + TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create kafka logger."); + return NULL; + } - int ret = tfe_kafka_logger_topic_new(kafka_logger, logger_topic, TOPIC_LOGGER, logger); - if(ret < 0) - { - return NULL; - } - - ret = tfe_kafka_logger_topic_new(kafka_logger, bucket_topic, TOPIC_BUCKET, logger); - if(ret < 0) - { - return NULL; - } + ret = tfe_logger_create_kafka_topic(kafka_logger, sasl_username, sasl_passwd, logger_topic, TOPIC_LOGGER, logger); + if(ret < 0) + { + TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create %s topic.", logger_topic); + return NULL; + } + + ret = tfe_logger_create_kafka_topic(kafka_logger, sasl_username, sasl_passwd, bucket_topic, TOPIC_BUCKET, logger); + if(ret < 0) + { + TFE_LOG_ERROR(logger, "tfe kafka init failed, error to create %s topic.", bucket_topic); + return NULL; + } kafka_logger->t_vsys_id=vsystem_id; TFE_LOG_INFO(logger, "tfe kafka logger : %s", enable ? "ENABLE" : "DISABLE"); |
