summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-29 13:46:57 +0000
committer童宗振 <[email protected]>2024-04-29 13:46:57 +0000
commit427e7e702c3eb91bcaa5bb90c710986dd30678e9 (patch)
treea22383e4f6bcaf47d12826ec11caf407c464f8d0
parentd56af082055fee1276efd2ad89cc1a143cf95eb3 (diff)
parent715df2c670ec5a39ba4fba8813c05ad1d9ea47e2 (diff)
Merge branch 'field_missing' into 'master'
dump kafka to logger file See merge request tsg/dp_telemetry_app!25
-rw-r--r--etc/dp_trace_zlog.conf4
-rw-r--r--include/logger.h6
-rw-r--r--include/mocking.h3
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/logger.c42
-rw-r--r--src/main.c15
-rw-r--r--src/mocking.c57
-rw-r--r--src/trace_output.c5
-rw-r--r--test/CMakeLists.txt1
-rw-r--r--test/cmocka_test.c1
10 files changed, 102 insertions, 33 deletions
diff --git a/etc/dp_trace_zlog.conf b/etc/dp_trace_zlog.conf
index 25f5047..d5035e4 100644
--- a/etc/dp_trace_zlog.conf
+++ b/etc/dp_trace_zlog.conf
@@ -1,6 +1,8 @@
[global]
strict init = true
default format = "%V: %m%n"
+rotate lock file = /tmp/dp_trace_zlog.lock
[rules]
-default_zlog_category.DEBUG >stdout \ No newline at end of file
+default_zlog_category.DEBUG >stdout
+kafka_dump.DEBUG "/tmp/dp_trace_kafka_dump", 100MB \ No newline at end of file
diff --git a/include/logger.h b/include/logger.h
new file mode 100644
index 0000000..4819d3a
--- /dev/null
+++ b/include/logger.h
@@ -0,0 +1,6 @@
+#pragma once
+#include "common.h"
+
+extern unsigned int zlog_env_is_init;
+void logger_init();
+zlog_category_t * kafka_dump_logger_get(); \ No newline at end of file
diff --git a/include/mocking.h b/include/mocking.h
index 2121463..ed8400a 100644
--- a/include/mocking.h
+++ b/include/mocking.h
@@ -1,4 +1,5 @@
#pragma once
#include <librdkafka/rdkafka.h>
+#include <zlog.h>
-int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len); \ No newline at end of file
+int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len); \ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 9d96289..e3f4365 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -12,6 +12,7 @@ set(DP_TELEMETRY_SRC
${CMAKE_CURRENT_SOURCE_DIR}/monit.c
${CMAKE_CURRENT_SOURCE_DIR}/mocking.c
${CMAKE_CURRENT_SOURCE_DIR}/http_serv.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/logger.c
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
add_executable(${PROJECT_NAME} ${DP_TELEMETRY_SRC})
diff --git a/src/logger.c b/src/logger.c
new file mode 100644
index 0000000..b6bb75e
--- /dev/null
+++ b/src/logger.c
@@ -0,0 +1,42 @@
+#include "logger.h"
+#include "config.h"
+
+unsigned int zlog_env_is_init = 0;
+
+void logger_init()
+{
+ const struct config * conf = global_config_get();
+
+ int ret = dzlog_init(conf->zlog_config_path, "default_zlog_category");
+ if (ret != 0)
+ {
+ char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR");
+ printf("fail in dzlog_int.\n");
+ if (zlog_profile_error != NULL)
+ {
+ printf("The dzlog_init function error infomation is recorded in:%s\n", zlog_profile_error);
+ }
+ exit(EXIT_FAILURE);
+ }
+ zlog_env_is_init = 1;
+
+ if (conf->kafka_dump_to_log)
+ {
+ ret = zlog_init(conf->zlog_config_path);
+ DP_TRACE_VERIFY(ret != 0, "zlog init failed.");
+ }
+}
+
+zlog_category_t * kafka_dump_logger_get()
+{
+ const struct config * conf = global_config_get();
+
+ if (conf->kafka_dump_to_log)
+ {
+ zlog_category_t * logger = zlog_get_category("kafka_dump");
+ DP_TRACE_VERIFY(logger != NULL, "kafka_dump logger is null.");
+ return logger;
+ }
+
+ return NULL;
+} \ No newline at end of file
diff --git a/src/main.c b/src/main.c
index ee607b8..d0c78e6 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,6 +1,7 @@
#include "common.h"
#include "config.h"
#include "http_serv.h"
+#include "logger.h"
#include "maat.h"
#include "monit.h"
#include "trace_output.h"
@@ -17,7 +18,6 @@
static const char appsym[64] = "dp_trace_telemetry";
struct mr_instance * mr_instance = NULL;
-unsigned int zlog_env_is_init = 0;
static void signal_handler(evutil_socket_t fd, short what, void * arg)
{
@@ -121,18 +121,7 @@ int main(int argc, char * argv[])
const struct config * conf = global_config_get();
- ret = dzlog_init(conf->zlog_config_path, "default_zlog_category");
- if (ret != 0)
- {
- char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR");
- printf("fail in dzlog_int.\n");
- if (zlog_profile_error != NULL)
- {
- printf("The dzlog_init function error infomation is recorded in:%s\n", zlog_profile_error);
- }
- exit(EXIT_FAILURE);
- }
- zlog_env_is_init = 1;
+ logger_init();
http_serv_init();
diff --git a/src/mocking.c b/src/mocking.c
index d405489..1e26222 100644
--- a/src/mocking.c
+++ b/src/mocking.c
@@ -2,7 +2,7 @@
#include "common.h"
#include <mpack.h>
-int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len)
+int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len)
{
struct measurements
{
@@ -63,7 +63,7 @@ int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len)
{
if (i >= 128)
{
- dzlog_debug("too many measurements...");
+ zlog_debug(logger, "too many measurements...");
continue;
}
mpack_node_t measurement = mpack_node_array_at(measurements_val, i);
@@ -76,22 +76,47 @@ int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len)
}
// print
- dzlog_debug("microseconds %ld", packet.microseconds);
- dzlog_debug("job_id %s", packet.job_id_str);
- dzlog_debug("sled_ip %s", packet.sled_ip);
- dzlog_debug("device_group %s", packet.device_group);
- dzlog_debug("source_ip %s", packet.source_ip);
- dzlog_debug("source_port %d", packet.source_port);
- dzlog_debug("server_ip %s", packet.server_ip);
- dzlog_debug("server_port %d", packet.server_port);
- dzlog_debug("packet_length %d", packet.packet_length);
+ zlog_debug(logger, "microseconds %ld", packet.microseconds);
+ zlog_debug(logger, "job_id %s", packet.job_id_str);
+ zlog_debug(logger, "sled_ip %s", packet.sled_ip);
+ zlog_debug(logger, "device_group %s", packet.device_group);
+
+ if (strlen(packet.source_ip) == 0)
+ {
+ zlog_warn(logger, "source_ip is empty");
+ }
+ else
+ {
+ zlog_debug(logger, "source_ip %s", packet.source_ip);
+ }
+
+ zlog_debug(logger, "source_port %d", packet.source_port);
+
+ if (strlen(packet.server_ip) == 0)
+ {
+ zlog_warn(logger, "server_ip is empty");
+ }
+ else
+ {
+ zlog_debug(logger, "server_ip %s", packet.server_ip);
+ }
+
+ zlog_debug(logger, "server_port %d", packet.server_port);
+
+ zlog_debug(logger, "packet_length %d", packet.packet_length);
+
+ if (packet.measurements_num == 0)
+ {
+ zlog_debug(logger, "measurements num is zero");
+ }
+
for (unsigned int i = 0; i < packet.measurements_num; i++)
{
- dzlog_debug("record %u:", i);
- dzlog_debug("tv_sec %d", packet.record[i].tv_sec);
- dzlog_debug("tv_nsec %d", packet.record[i].tv_nsec);
- dzlog_debug("app %.*s", packet.record[i].app_len, packet.record[i].app);
- dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
+ zlog_debug(logger, "record %u:", i);
+ zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec);
+ zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec);
+ zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app);
+ zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
}
return 0;
diff --git a/src/trace_output.c b/src/trace_output.c
index c5b912d..33cd174 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -3,6 +3,7 @@
#include "config.h"
#include "job_ctx.h"
#include "kafka.h"
+#include "logger.h"
#include "mocking.h"
#include "monit.h"
#include "pcapng.h"
@@ -121,6 +122,8 @@ void * dp_trace_process_thread(void * arg)
marsio_thread_init(mr_instance);
+ zlog_category_t * kafak_dump_logger = kafka_dump_logger_get();
+
unsigned int no_pkt_recv_cnt = 0;
for (unsigned int qid_index = 0;; qid_index++)
{
@@ -178,7 +181,7 @@ void * dp_trace_process_thread(void * arg)
if (conf->kafka_dump_to_log)
{
- kafka_dump_to_log(kafka_topic, (void *)data, size);
+ kafka_dump_to_log(kafak_dump_logger, (void *)data, size);
}
int ret = kafka_produce(kafka_topic, (void *)data, size);
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 5fb20e3..3c175f2 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -9,6 +9,7 @@ set(DP_TRACE_TELEMETRY_SOURCES
${CMAKE_SOURCE_DIR}/src/mocking.c
${CMAKE_SOURCE_DIR}/src/monit.c
${CMAKE_SOURCE_DIR}/src/http_serv.c
+ ${CMAKE_SOURCE_DIR}/src/logger.c
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
add_executable(cmocka_test cmocka_test.c ${DP_TRACE_TELEMETRY_SOURCES})
diff --git a/test/cmocka_test.c b/test/cmocka_test.c
index b96a368..64681b3 100644
--- a/test/cmocka_test.c
+++ b/test/cmocka_test.c
@@ -10,7 +10,6 @@
#include <cmocka.h>
struct mr_instance * mr_instance = NULL;
-unsigned int zlog_env_is_init = 0;
int setup(void ** state)
{