summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-05-24 03:29:34 +0000
committer童宗振 <[email protected]>2024-05-24 03:29:34 +0000
commit1cd860ff387362537b2c92f0731ef752cb9107c8 (patch)
tree9df602f2d95d6bae59a745cdc568317bca2fdc69
parent5752ca8b51651a942542922bcb2c699f226f041c (diff)
parent80f72348cc78709f57f177424021402dd514f366 (diff)
Merge branch 'kafka_queue_from_conf' into 'master'
Kafka queue from conf See merge request tsg/dp_telemetry_app!43
-rw-r--r--etc/dp_trace.conf1
-rw-r--r--include/config.h1
-rw-r--r--include/kafka.h3
-rw-r--r--src/config.c3
-rw-r--r--src/kafka.c9
-rw-r--r--src/mocking.c4
-rw-r--r--src/trace_output.c3
7 files changed, 17 insertions, 7 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index a4d990b..82348c6 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -16,6 +16,7 @@ borker_list="192.168.44.12:9094"
topic_name="DATAPATH-TELEMETRY-RECORD"
sasl_username=admin
sasl_password=galaxy2019
+queue_size=100000
[maat]
maat_log_level=3
diff --git a/include/config.h b/include/config.h
index 2ef5440..8de481e 100644
--- a/include/config.h
+++ b/include/config.h
@@ -34,6 +34,7 @@ struct config
char broker_list[1024];
char sasl_username[MR_SYMBOL_MAX];
char sasl_password[MR_SYMBOL_MAX];
+ uint32_t kafka_queue_size;
// maat
unsigned int maat_log_level;
diff --git a/include/kafka.h b/include/kafka.h
index 197c4f8..5e61e3a 100644
--- a/include/kafka.h
+++ b/include/kafka.h
@@ -1,6 +1,7 @@
#pragma once
#include <librdkafka/rdkafka.h>
-rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd);
+rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd,
+ uint32_t kafka_queue_size);
rd_kafka_topic_t * kafka_topic_new(rd_kafka_t * rk, const char * topic, rd_kafka_topic_conf_t * conf);
int kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len); \ No newline at end of file
diff --git a/src/config.c b/src/config.c
index df50762..b866d2f 100644
--- a/src/config.c
+++ b/src/config.c
@@ -56,7 +56,7 @@ void config_load()
int nr_io_cores = 0;
nr_io_cores =
MESA_load_profile_uint_range(config_path, "global", "iocore", sizeof(io_cores) / sizeof(io_cores[0]), io_cores);
- for (unsigned int i = 0; i < nr_io_cores; i++)
+ for (int i = 0; i < nr_io_cores; i++)
{
CPU_SET(io_cores[i], &g_conf->cpu_set_io);
}
@@ -103,6 +103,7 @@ void config_load()
sizeof(g_conf->sasl_username), "");
MESA_load_profile_string_def(config_path, "kafka", "sasl_password", g_conf->sasl_password,
sizeof(g_conf->sasl_password), "");
+ MESA_load_profile_uint_def(config_path, "kafka", "queue_size", &g_conf->kafka_queue_size, 100000);
MESA_load_profile_int_def(config_path, "maat", "maat_log_level", &(g_conf->maat_log_level), LOG_LEVEL_FATAL);
MESA_load_profile_int_def(config_path, "maat", "maat_input_mode", &(g_conf->maat_input_mode), 0);
diff --git a/src/kafka.c b/src/kafka.c
index 2a857f9..bd426d0 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -2,16 +2,21 @@
#include "common.h"
#include <errno.h>
-rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd)
+rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd,
+ uint32_t kafka_queue_size)
{
int ret;
char kafka_errstr[1024] = {0};
rd_kafka_t * handle = NULL;
rd_kafka_conf_t * rconf = NULL;
+ char kafka_queue_size_str[10];
+ snprintf(kafka_queue_size_str, sizeof(kafka_queue_size_str), "%u", kafka_queue_size);
+
rconf = rd_kafka_conf_new();
- ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "100000", kafka_errstr, sizeof(kafka_errstr));
+ ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", kafka_queue_size_str, kafka_errstr,
+ sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
dzlog_error("Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
diff --git a/src/mocking.c b/src/mocking.c
index 5c7e3bf..79a49a9 100644
--- a/src/mocking.c
+++ b/src/mocking.c
@@ -78,7 +78,7 @@ int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len
mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements");
packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements"));
- for (unsigned int i = 0; i < packet.measurements_num; i++)
+ for (int i = 0; i < packet.measurements_num; i++)
{
if (i >= 128)
{
@@ -130,7 +130,7 @@ int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len
zlog_debug(logger, "measurements num is zero");
}
- for (unsigned int i = 0; i < packet.measurements_num; i++)
+ for (int i = 0; i < packet.measurements_num; i++)
{
zlog_debug(logger, "record %u:", i);
zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec);
diff --git a/src/trace_output.c b/src/trace_output.c
index ffc1961..d32bf65 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -87,7 +87,8 @@ void dp_trace_output_init()
DP_TRACE_VERIFY(ret == 0, "pthread_mutex_init failed(ret = % d)", ret);
}
- kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_username, conf->sasl_password);
+ kafka_handle =
+ kafka_handle_create(conf->broker_list, conf->sasl_username, conf->sasl_password, conf->kafka_queue_size);
kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL);
for (unsigned int i = 0; i < TELEMETRY_DIM(dp_trace_output); i++)