summaryrefslogtreecommitdiff
path: root/common/src/tfe_kafka_logger.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'common/src/tfe_kafka_logger.cpp')
-rw-r--r--common/src/tfe_kafka_logger.cpp35
1 files changed, 13 insertions, 22 deletions
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp
index d34a332..26cfea6 100644
--- a/common/src/tfe_kafka_logger.cpp
+++ b/common/src/tfe_kafka_logger.cpp
@@ -104,22 +104,24 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_
return handle;
}
-int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger)
+int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, int topic_id, void *local_logger)
{
- strncpy(logger->topic_name[TOPIC_BUCKET], topic_name, sizeof(logger->topic_name[TOPIC_BUCKET])-1);
- logger->kafka_topic[TOPIC_BUCKET] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
- if (logger->kafka_topic[TOPIC_BUCKET] == NULL)
+ if(logger && logger->enable)
{
- TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
- rd_kafka_destroy(logger->kafka_handle);
- free(logger);
- return 0;
+ strncpy(logger->topic_name[topic_id], topic_name, sizeof(logger->topic_name[topic_id])-1);
+ logger->kafka_topic[topic_id] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
+ if (logger->kafka_topic[topic_id] == NULL)
+ {
+ TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
+ rd_kafka_destroy(logger->kafka_handle);
+ free(logger);
+ return -1;
+ }
}
-
- return 1;
+ return 0;
}
-tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
+tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{
char *override_sled_ip=NULL;
tfe_kafka_logger_t *logger = (tfe_kafka_logger_t *)calloc(1, sizeof(tfe_kafka_logger_t));
@@ -155,17 +157,6 @@ create_kafka:
free(logger);
return NULL;
}
-
- strncpy(logger->topic_name[TOPIC_LOGGER], topic_name, sizeof(logger->topic_name[TOPIC_LOGGER])-1);
- logger->kafka_topic[TOPIC_LOGGER] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
- if (logger->kafka_topic[TOPIC_LOGGER] == NULL)
- {
- TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name[TOPIC_LOGGER]);
- rd_kafka_destroy(logger->kafka_handle);
- free(logger);
- return NULL;
- }
-
return logger;
}