summaryrefslogtreecommitdiff
path: root/common/src/tfe_kafka_logger.cpp
diff options
context:
space:
mode:
authorluwenpeng <[email protected]>2024-07-19 18:20:04 +0800
committerluwenpeng <[email protected]>2024-07-19 18:57:57 +0800
commit2045d517cabbf7559e18367fa33b3a170143ef79 (patch)
tree15c70ad522c455c6b51622181442e12cea75f3e7 /common/src/tfe_kafka_logger.cpp
parent88a7a8c5c48f681cdd0478a809652ac94c2647ee (diff)
feature: TSG-21853 Refactoring TFE Kafka infrastructure
Diffstat (limited to 'common/src/tfe_kafka_logger.cpp')
-rw-r--r--common/src/tfe_kafka_logger.cpp212
1 files changed, 0 insertions, 212 deletions
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp
deleted file mode 100644
index 28f4ce8..0000000
--- a/common/src/tfe_kafka_logger.cpp
+++ /dev/null
@@ -1,212 +0,0 @@
-#include <sys/ioctl.h>
-#include <unistd.h>
-#include <arpa/inet.h>
-#include <net/if.h>
-
-#include <tfe_kafka_logger.h>
-
-// return INADDR_NONE if error occur
-static unsigned int get_ip_by_eth_name(const char *ifname)
-{
- int sockfd = -1;
- struct ifreq ifr;
- unsigned int ip;
-
- sockfd = socket(AF_INET, SOCK_DGRAM, 0);
- if (-1 == sockfd)
- {
- goto error;
- }
-
- strcpy(ifr.ifr_name, ifname);
- if (ioctl(sockfd, SIOCGIFADDR, &ifr) == -1)
- {
- goto error;
- }
- close(sockfd);
-
- ip = ((struct sockaddr_in *)&(ifr.ifr_addr))->sin_addr.s_addr;
- return ip;
-
-error:
- if (sockfd > 0)
- close(sockfd);
- return INADDR_NONE;
-}
-
-static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_username, const char *sasl_passwd, const char *topic_name, void *local_logger)
-{
- int ret;
- char kafka_errstr[1024] = {0};
- rd_kafka_t *handle = NULL;
- rd_kafka_conf_t *rconf = NULL;
-
- rconf = rd_kafka_conf_new();
-
- ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
- ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
- ret = rd_kafka_conf_set(rconf, "security.protocol", "plaintext", kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"security.protocol\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
- ret = rd_kafka_conf_set(rconf, "client.id", topic_name, kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"client.id\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
-
- if (strlen(sasl_username) > 0 && strlen(sasl_passwd) > 0)
- {
- rd_kafka_conf_set(rconf, "security.protocol", "sasl_plaintext", kafka_errstr, sizeof(kafka_errstr));
- rd_kafka_conf_set(rconf, "sasl.mechanisms", "PLAIN", kafka_errstr, sizeof(kafka_errstr));
- ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
- ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
- if (ret != RD_KAFKA_CONF_OK)
- {
- TFE_LOG_ERROR(local_logger, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
- rd_kafka_conf_destroy(rconf);
- return NULL;
- }
- }
-
- //The conf object is freed by this function and must not be used or destroyed by the application sub-sequently.
- handle = rd_kafka_new(RD_KAFKA_PRODUCER, rconf, kafka_errstr, sizeof(kafka_errstr));
- rconf = NULL;
- if (handle == NULL)
- {
- TFE_LOG_ERROR(local_logger, "Error to new kafka, %s.", kafka_errstr);
- return NULL;
- }
-
- if (rd_kafka_brokers_add(handle, brokerlist) == 0)
- {
- TFE_LOG_ERROR(local_logger, "Error to add kakfa bokers.");
- rd_kafka_destroy(handle);
- return NULL;
- }
-
- return handle;
-}
-
-int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger)
-{
- if(logger && logger->enable)
- {
- strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1);
- logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle[topic_id], topic_name, NULL);
- if (logger->kafka_topic[topic_id] == NULL)
- {
- TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
- rd_kafka_destroy(logger->kafka_handle[topic_id]);
- free(logger);
- return -1;
- }
- }
- return 0;
-}
-
-tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, void *local_logger)
-{
- char *override_sled_ip=NULL;
-
- tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
- if (!logger)
- {
- return NULL;
- }
-
- logger->enable = enable;
- if (!logger->enable)
- {
- return logger;
- }
-
- override_sled_ip = getenv("OVERRIDE_SLED_IP");
- if(override_sled_ip != NULL)
- {
- strncpy(logger->local_ip_str, override_sled_ip, sizeof(logger->local_ip_str)-1);
- goto finish;
- }
-
- logger->local_ip_num = get_ip_by_eth_name(nic_name);
- if (logger->local_ip_num == INADDR_NONE)
- {
- TFE_LOG_ERROR(local_logger, "Error to get NIC_NAME: %s.", nic_name);
- free(logger);
- return NULL;
- }
- inet_ntop(AF_INET, &(logger->local_ip_num), logger->local_ip_str, sizeof(logger->local_ip_str));
-finish:
- strncpy(logger->broker_list, brokerlist, sizeof(logger->broker_list)-1);
- return logger;
-}
-
-int tfe_logger_create_kafka_topic(tfe_kafka_logger_t *logger, const char *sasl_username, const char *sasl_passwd, const char *topic_name, int topic_id, void *local_logger)
-{
- if(!logger->enable)
- {
- return 0;
- }
-
- logger->kafka_handle[topic_id] = create_kafka_handle(logger->broker_list, sasl_username, sasl_passwd, topic_name, local_logger);
- if (logger->kafka_handle[topic_id] == NULL)
- {
- TFE_LOG_ERROR(local_logger, "Error to creat kafka handler with brokerlist: %s.", logger->broker_list);
- free(logger);
- return -1;
- }
- tfe_kafka_logger_topic_new(logger, topic_name, topic_id, logger);
- return 0;
-}
-
-void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger)
-{
- if (logger)
- {
- for(int i=0; i<TOPIC_MAX; i++)
- {
- if(logger->kafka_topic[i])
- {
- rd_kafka_topic_destroy(logger->kafka_topic[i]);
- }
-
- if(logger->kafka_handle[i])
- {
- rd_kafka_destroy(logger->kafka_handle[i]);
- }
- }
- free(logger);
- logger = NULL;
- }
-}
-
-int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len)
-{
- if (logger && logger->enable)
- return rd_kafka_produce(logger->kafka_topic[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL);
- else
- return 0;
-}