diff options
| author | luwenpeng <[email protected]> | 2024-07-19 18:20:04 +0800 |
|---|---|---|
| committer | luwenpeng <[email protected]> | 2024-07-19 18:57:57 +0800 |
| commit | 2045d517cabbf7559e18367fa33b3a170143ef79 (patch) | |
| tree | 15c70ad522c455c6b51622181442e12cea75f3e7 /common/src/tfe_kafka_logger.cpp | |
| parent | 88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff) | |
featureļ¼ TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'common/src/tfe_kafka_logger.cpp')
| -rw-r--r-- | common/src/tfe_kafka_logger.cpp | 212 |
1 files changed, 0 insertions, 212 deletions
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp deleted file mode 100644 index 28f4ce8..0000000 --- a/common/src/tfe_kafka_logger.cpp +++ /dev/null @@ -1,212 +0,0 @@ -#include <sys/ioctl.h> -#include <unistd.h> -#include <arpa/inet.h> -#include <net/if.h> - -#include <tfe_kafka_logger.h> - -// return INADDR_NONE if error occur -static unsigned int get_ip_by_eth_name(const char *ifname) -{ - int sockfd = -1; - struct ifreq ifr; - unsigned int ip; - - sockfd = socket(AF_INET, SOCK_DGRAM, 0); - if (-1 == sockfd) - { - goto error; - } - - strcpy(ifr.ifr_name, ifname); - if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1) - { - goto error; - } - close(sockfd); - - ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr; - return ip; - -error: - if (sockfd > 0) - close(sockfd); - return INADDR_NONE; -} - -static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, const char *topic_name, void *local_logger) -{ - int ret; - char kafka_errstr[1024] = {0}; - rd_kafka_t *handle = NULL; - rd_kafka_conf_t *rconf = NULL; - - rconf = rd_kafka_conf_new(); - - ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - ret = rd_kafka_conf_set(rconf, "client.id", topic_name, kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"client.id\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - - if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0) - { - rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr)); - rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr)); - ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.username\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); - if (ret != RD_KAFKA_CONF_OK) - { - TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.password\", %s.", kafka_errstr); - rd_kafka_conf_destroy(rconf); - return NULL; - } - } - - //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently. - handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr)); - rconf = NULL; - if (handle == NULL) - { - TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr); - return NULL; - } - - if (rd_kafka_brokers_add(handle, brokerlist) == 0) - { - TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers."); - rd_kafka_destroy(handle); - return NULL; - } - - return handle; -} - -int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger) -{ - if(logger && logger->enable) - { - strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1); - logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle[topic_id], topic_name, NULL); - if (logger->kafka_topic[topic_id] == NULL) - { - TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name); - rd_kafka_destroy(logger->kafka_handle[topic_id]); - free(logger); - return -1; - } - } - return 0; -} - -tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger) -{ - char *override_sled_ip=NULL; - - tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t)); - if (!logger) - { - return NULL; - } - - logger->enable = enable; - if (!logger->enable) - { - return logger; - } - - override_sled_ip = getenv("OVERRIDE_SLED_IP"); - if(override_sled_ip != NULL) - { - strncpy(logger->local_ip_str, override_sled_ip, sizeof(logger->local_ip_str)-1); - goto finish; - } - - logger->local_ip_num = get_ip_by_eth_name(nic_name); - if (logger->local_ip_num == INADDR_NONE) - { - TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name); - free(logger); - return NULL; - } - inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str)); -finish: - strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1); - return logger; -} - -int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger) -{ - if(!logger->enable) - { - return 0; - } - - logger->kafka_handle[topic_id] = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, topic_name, local_logger); - if (logger->kafka_handle[topic_id] == NULL) - { - TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list); - free(logger); - return -1; - } - tfe_kafka_logger_topic_new(logger, topic_name, topic_id, logger); - return 0; -} - -void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger) -{ - if (logger) - { - for(int i=0; i<TOPIC_MAX; i++) - { - if(logger->kafka_topic[i]) - { - rd_kafka_topic_destroy(logger->kafka_topic[i]); - } - - if(logger->kafka_handle[i]) - { - rd_kafka_destroy(logger->kafka_handle[i]); - } - } - free(logger); - logger = NULL; - } -} - -int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len) -{ - if (logger && logger->enable) - return rd_kafka_produce(logger->kafka_topic[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL); - else - return 0; -} |
