diff options
| author | tongzongzhen <[email protected]> | 2024-05-24 11:27:47 +0800 |
|---|---|---|
| committer | tongzongzhen <[email protected]> | 2024-05-24 11:27:47 +0800 |
| commit | 80f72348cc78709f57f177424021402dd514f366 (patch) | |
| tree | 9df602f2d95d6bae59a745cdc568317bca2fdc69 | |
| parent | 5752ca8b51651a942542922bcb2c699f226f041c (diff) | |
add kafka queue size as config
| -rw-r--r-- | etc/dp_trace.conf | 1 | ||||
| -rw-r--r-- | include/config.h | 1 | ||||
| -rw-r--r-- | include/kafka.h | 3 | ||||
| -rw-r--r-- | src/config.c | 3 | ||||
| -rw-r--r-- | src/kafka.c | 9 | ||||
| -rw-r--r-- | src/mocking.c | 4 | ||||
| -rw-r--r-- | src/trace_output.c | 3 |
7 files changed, 17 insertions, 7 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf index a4d990b..82348c6 100644 --- a/etc/dp_trace.conf +++ b/etc/dp_trace.conf @@ -16,6 +16,7 @@ borker_list="192.168.44.12:9094" topic_name="DATAPATH-TELEMETRY-RECORD" sasl_username=admin sasl_password=galaxy2019 +queue_size=100000 [maat] maat_log_level=3 diff --git a/include/config.h b/include/config.h index 2ef5440..8de481e 100644 --- a/include/config.h +++ b/include/config.h @@ -34,6 +34,7 @@ struct config char broker_list[1024]; char sasl_username[MR_SYMBOL_MAX]; char sasl_password[MR_SYMBOL_MAX]; + uint32_t kafka_queue_size; // maat unsigned int maat_log_level; diff --git a/include/kafka.h b/include/kafka.h index 197c4f8..5e61e3a 100644 --- a/include/kafka.h +++ b/include/kafka.h @@ -1,6 +1,7 @@ #pragma once #include <librdkafka/rdkafka.h> -rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd); +rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd, + uint32_t kafka_queue_size); rd_kafka_topic_t * kafka_topic_new(rd_kafka_t * rk, const char * topic, rd_kafka_topic_conf_t * conf); int kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len);
\ No newline at end of file diff --git a/src/config.c b/src/config.c index df50762..b866d2f 100644 --- a/src/config.c +++ b/src/config.c @@ -56,7 +56,7 @@ void config_load() int nr_io_cores = 0; nr_io_cores = MESA_load_profile_uint_range(config_path, "global", "iocore", sizeof(io_cores) / sizeof(io_cores[0]), io_cores); - for (unsigned int i = 0; i < nr_io_cores; i++) + for (int i = 0; i < nr_io_cores; i++) { CPU_SET(io_cores[i], &g_conf->cpu_set_io); } @@ -103,6 +103,7 @@ void config_load() sizeof(g_conf->sasl_username), ""); MESA_load_profile_string_def(config_path, "kafka", "sasl_password", g_conf->sasl_password, sizeof(g_conf->sasl_password), ""); + MESA_load_profile_uint_def(config_path, "kafka", "queue_size", &g_conf->kafka_queue_size, 100000); MESA_load_profile_int_def(config_path, "maat", "maat_log_level", &(g_conf->maat_log_level), LOG_LEVEL_FATAL); MESA_load_profile_int_def(config_path, "maat", "maat_input_mode", &(g_conf->maat_input_mode), 0); diff --git a/src/kafka.c b/src/kafka.c index 2a857f9..bd426d0 100644 --- a/src/kafka.c +++ b/src/kafka.c @@ -2,16 +2,21 @@ #include "common.h" #include <errno.h> -rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd) +rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd, + uint32_t kafka_queue_size) { int ret; char kafka_errstr[1024] = {0}; rd_kafka_t * handle = NULL; rd_kafka_conf_t * rconf = NULL; + char kafka_queue_size_str[10]; + snprintf(kafka_queue_size_str, sizeof(kafka_queue_size_str), "%u", kafka_queue_size); + rconf = rd_kafka_conf_new(); - ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "100000", kafka_errstr, sizeof(kafka_errstr)); + ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", kafka_queue_size_str, kafka_errstr, + sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { dzlog_error("Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); diff --git a/src/mocking.c b/src/mocking.c index 5c7e3bf..79a49a9 100644 --- a/src/mocking.c +++ b/src/mocking.c @@ -78,7 +78,7 @@ int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len mpack_node_t measurements_val = mpack_node_map_cstr(root, "measurements"); packet.measurements_num = mpack_node_array_length(mpack_node_map_cstr(root, "measurements")); - for (unsigned int i = 0; i < packet.measurements_num; i++) + for (int i = 0; i < packet.measurements_num; i++) { if (i >= 128) { @@ -130,7 +130,7 @@ int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len zlog_debug(logger, "measurements num is zero"); } - for (unsigned int i = 0; i < packet.measurements_num; i++) + for (int i = 0; i < packet.measurements_num; i++) { zlog_debug(logger, "record %u:", i); zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec); diff --git a/src/trace_output.c b/src/trace_output.c index ffc1961..d32bf65 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -87,7 +87,8 @@ void dp_trace_output_init() DP_TRACE_VERIFY(ret == 0, "pthread_mutex_init failed(ret = % d)", ret); } - kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_username, conf->sasl_password); + kafka_handle = + kafka_handle_create(conf->broker_list, conf->sasl_username, conf->sasl_password, conf->kafka_queue_size); kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL); for (unsigned int i = 0; i < TELEMETRY_DIM(dp_trace_output); i++) |
