summaryrefslogtreecommitdiff
path: root/common/src/tfe_kafka_logger.cpp
diff options
context:
space:
mode:
authorfengweihao <[email protected]>2023-12-14 15:08:19 +0800
committerfengweihao <[email protected]>2023-12-14 15:08:19 +0800
commitb3700966fd3931763ee8e09aa9a72d87c3ae8d9e (patch)
tree8d764ca7e674cf9887355b4d2c242b5f32ae2e52 /common/src/tfe_kafka_logger.cpp
parentb801ca9d3b652c9aa494cb3b3fb2e0a5fbe31731 (diff)
TSG-17862 Proxy支持Internal IP address和Exteral IP Address的扫描, 支持以Topic方式上传HTTP请求体/应答体v4.8.51-20231215
Diffstat (limited to 'common/src/tfe_kafka_logger.cpp')
-rw-r--r--common/src/tfe_kafka_logger.cpp34
1 files changed, 26 insertions, 8 deletions
diff --git a/common/src/tfe_kafka_logger.cpp b/common/src/tfe_kafka_logger.cpp
index 3a59595..d34a332 100644
--- a/common/src/tfe_kafka_logger.cpp
+++ b/common/src/tfe_kafka_logger.cpp
@@ -104,6 +104,21 @@ static rd_kafka_t *create_kafka_handle(const char *brokerlist, const char *sasl_
return handle;
}
+int tfe_kafka_logger_topic_new(tfe_kafka_logger_t *logger, const char *topic_name, void *local_logger)
+{
+ strncpy(logger->topic_name[TOPIC_BUCKET], topic_name, sizeof(logger->topic_name[TOPIC_BUCKET])-1);
+ logger->kafka_topic[TOPIC_BUCKET] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
+ if (logger->kafka_topic[TOPIC_BUCKET] == NULL)
+ {
+ TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", topic_name);
+ rd_kafka_destroy(logger->kafka_handle);
+ free(logger);
+ return 0;
+ }
+
+ return 1;
+}
+
tfe_kafka_logger_t *tfe_kafka_logger_create(int enable, const char *nic_name, const char *brokerlist, const char *topic_name, const char *sasl_username, const char *sasl_passwd, void *local_logger)
{
char *override_sled_ip=NULL;
@@ -141,11 +156,11 @@ create_kafka:
return NULL;
}
- strncpy(logger->topic_name, topic_name, sizeof(logger->topic_name)-1);
- logger->kafka_topic = rd_kafka_topic_new(logger->kafka_handle, logger->topic_name, NULL);
- if (logger->kafka_topic == NULL)
+ strncpy(logger->topic_name[TOPIC_LOGGER], topic_name, sizeof(logger->topic_name[TOPIC_LOGGER])-1);
+ logger->kafka_topic[TOPIC_LOGGER] = rd_kafka_topic_new(logger->kafka_handle, topic_name, NULL);
+ if (logger->kafka_topic[TOPIC_LOGGER] == NULL)
{
- TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name);
+ TFE_LOG_ERROR(local_logger, "Error to creat kafka topic: %s.", logger->topic_name[TOPIC_LOGGER]);
rd_kafka_destroy(logger->kafka_handle);
free(logger);
return NULL;
@@ -161,18 +176,21 @@ void tfe_kafka_logger_destroy(tfe_kafka_logger_t *logger)
if (logger->kafka_handle)
rd_kafka_destroy(logger->kafka_handle);
- if (logger->kafka_topic)
- rd_kafka_topic_destroy(logger->kafka_topic);
+ if (logger->kafka_topic[TOPIC_LOGGER])
+ rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_LOGGER]);
+
+ if (logger->kafka_topic[TOPIC_BUCKET])
+ rd_kafka_topic_destroy(logger->kafka_topic[TOPIC_BUCKET]);
free(logger);
logger = NULL;
}
}
-int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, const char *data, int len)
+int tfe_kafka_logger_send(tfe_kafka_logger_t *logger, int topic_id, const char *data, int len)
{
if (logger && logger->enable)
- return rd_kafka_produce(logger->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL);
+ return rd_kafka_produce(logger->kafka_topic[topic_id], RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_COPY, (void *)data, len, NULL, 0, NULL);
else
return 0;
}