diff options
| author | 童宗振 <[email protected]> | 2024-04-29 13:46:57 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-29 13:46:57 +0000 |
| commit | 715df2c670ec5a39ba4fba8813c05ad1d9ea47e2 (patch) | |
| tree | a22383e4f6bcaf47d12826ec11caf407c464f8d0 | |
| parent | d56af082055fee1276efd2ad89cc1a143cf95eb3 (diff) | |
dump kafka to logger file
| -rw-r--r-- | etc/dp_trace_zlog.conf | 4 | ||||
| -rw-r--r-- | include/logger.h | 6 | ||||
| -rw-r--r-- | include/mocking.h | 3 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/logger.c | 42 | ||||
| -rw-r--r-- | src/main.c | 15 | ||||
| -rw-r--r-- | src/mocking.c | 57 | ||||
| -rw-r--r-- | src/trace_output.c | 5 | ||||
| -rw-r--r-- | test/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | test/cmocka_test.c | 1 |
10 files changed, 102 insertions, 33 deletions
diff --git a/etc/dp_trace_zlog.conf b/etc/dp_trace_zlog.conf index 25f5047..d5035e4 100644 --- a/etc/dp_trace_zlog.conf +++ b/etc/dp_trace_zlog.conf @@ -1,6 +1,8 @@ [global] strict init = true default format = "%V: %m%n" +rotate lock file = /tmp/dp_trace_zlog.lock [rules] -default_zlog_category.DEBUG >stdout
\ No newline at end of file +default_zlog_category.DEBUG >stdout +kafka_dump.DEBUG "/tmp/dp_trace_kafka_dump", 100MB
\ No newline at end of file diff --git a/include/logger.h b/include/logger.h new file mode 100644 index 0000000..4819d3a --- /dev/null +++ b/include/logger.h @@ -0,0 +1,6 @@ +#pragma once +#include "common.h" + +extern unsigned int zlog_env_is_init; +void logger_init(); +zlog_category_t * kafka_dump_logger_get();
\ No newline at end of file diff --git a/include/mocking.h b/include/mocking.h index 2121463..ed8400a 100644 --- a/include/mocking.h +++ b/include/mocking.h @@ -1,4 +1,5 @@ #pragma once #include <librdkafka/rdkafka.h> +#include <zlog.h> -int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len);
\ No newline at end of file +int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len);
\ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 9d96289..e3f4365 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,6 +12,7 @@ set(DP_TELEMETRY_SRC ${CMAKE_CURRENT_SOURCE_DIR}/monit.c ${CMAKE_CURRENT_SOURCE_DIR}/mocking.c ${CMAKE_CURRENT_SOURCE_DIR}/http_serv.c + ${CMAKE_CURRENT_SOURCE_DIR}/logger.c ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) add_executable(${PROJECT_NAME} ${DP_TELEMETRY_SRC}) diff --git a/src/logger.c b/src/logger.c new file mode 100644 index 0000000..b6bb75e --- /dev/null +++ b/src/logger.c @@ -0,0 +1,42 @@ +#include "logger.h" +#include "config.h" + +unsigned int zlog_env_is_init = 0; + +void logger_init() +{ + const struct config * conf = global_config_get(); + + int ret = dzlog_init(conf->zlog_config_path, "default_zlog_category"); + if (ret != 0) + { + char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR"); + printf("fail in dzlog_int.\n"); + if (zlog_profile_error != NULL) + { + printf("The dzlog_init function error infomation is recorded in:%s\n", zlog_profile_error); + } + exit(EXIT_FAILURE); + } + zlog_env_is_init = 1; + + if (conf->kafka_dump_to_log) + { + ret = zlog_init(conf->zlog_config_path); + DP_TRACE_VERIFY(ret != 0, "zlog init failed."); + } +} + +zlog_category_t * kafka_dump_logger_get() +{ + const struct config * conf = global_config_get(); + + if (conf->kafka_dump_to_log) + { + zlog_category_t * logger = zlog_get_category("kafka_dump"); + DP_TRACE_VERIFY(logger != NULL, "kafka_dump logger is null."); + return logger; + } + + return NULL; +}
\ No newline at end of file @@ -1,6 +1,7 @@ #include "common.h" #include "config.h" #include "http_serv.h" +#include "logger.h" #include "maat.h" #include "monit.h" #include "trace_output.h" @@ -17,7 +18,6 @@ static const char appsym[64] = "dp_trace_telemetry"; struct mr_instance * mr_instance = NULL; -unsigned int zlog_env_is_init = 0; static void signal_handler(evutil_socket_t fd, short what, void * arg) { @@ -121,18 +121,7 @@ int main(int argc, char * argv[]) const struct config * conf = global_config_get(); - ret = dzlog_init(conf->zlog_config_path, "default_zlog_category"); - if (ret != 0) - { - char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR"); - printf("fail in dzlog_int.\n"); - if (zlog_profile_error != NULL) - { - printf("The dzlog_init function error infomation is recorded in:%s\n", zlog_profile_error); - } - exit(EXIT_FAILURE); - } - zlog_env_is_init = 1; + logger_init(); http_serv_init(); diff --git a/src/mocking.c b/src/mocking.c index d405489..1e26222 100644 --- a/src/mocking.c +++ b/src/mocking.c @@ -2,7 +2,7 @@ #include "common.h" #include <mpack.h> -int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len) +int kafka_dump_to_log(zlog_category_t * logger, const void * payload, size_t len) { struct measurements { @@ -63,7 +63,7 @@ int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len) { if (i >= 128) { - dzlog_debug("too many measurements..."); + zlog_debug(logger, "too many measurements..."); continue; } mpack_node_t measurement = mpack_node_array_at(measurements_val, i); @@ -76,22 +76,47 @@ int kafka_dump_to_log(rd_kafka_topic_t * rkt, const void * payload, size_t len) } // print - dzlog_debug("microseconds %ld", packet.microseconds); - dzlog_debug("job_id %s", packet.job_id_str); - dzlog_debug("sled_ip %s", packet.sled_ip); - dzlog_debug("device_group %s", packet.device_group); - dzlog_debug("source_ip %s", packet.source_ip); - dzlog_debug("source_port %d", packet.source_port); - dzlog_debug("server_ip %s", packet.server_ip); - dzlog_debug("server_port %d", packet.server_port); - dzlog_debug("packet_length %d", packet.packet_length); + zlog_debug(logger, "microseconds %ld", packet.microseconds); + zlog_debug(logger, "job_id %s", packet.job_id_str); + zlog_debug(logger, "sled_ip %s", packet.sled_ip); + zlog_debug(logger, "device_group %s", packet.device_group); + + if (strlen(packet.source_ip) == 0) + { + zlog_warn(logger, "source_ip is empty"); + } + else + { + zlog_debug(logger, "source_ip %s", packet.source_ip); + } + + zlog_debug(logger, "source_port %d", packet.source_port); + + if (strlen(packet.server_ip) == 0) + { + zlog_warn(logger, "server_ip is empty"); + } + else + { + zlog_debug(logger, "server_ip %s", packet.server_ip); + } + + zlog_debug(logger, "server_port %d", packet.server_port); + + zlog_debug(logger, "packet_length %d", packet.packet_length); + + if (packet.measurements_num == 0) + { + zlog_debug(logger, "measurements num is zero"); + } + for (unsigned int i = 0; i < packet.measurements_num; i++) { - dzlog_debug("record %u:", i); - dzlog_debug("tv_sec %d", packet.record[i].tv_sec); - dzlog_debug("tv_nsec %d", packet.record[i].tv_nsec); - dzlog_debug("app %.*s", packet.record[i].app_len, packet.record[i].app); - dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments); + zlog_debug(logger, "record %u:", i); + zlog_debug(logger, "tv_sec %d", packet.record[i].tv_sec); + zlog_debug(logger, "tv_nsec %d", packet.record[i].tv_nsec); + zlog_debug(logger, "app %.*s", packet.record[i].app_len, packet.record[i].app); + zlog_debug(logger, "comments %.*s", packet.record[i].comments_len, packet.record[i].comments); } return 0; diff --git a/src/trace_output.c b/src/trace_output.c index c5b912d..33cd174 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -3,6 +3,7 @@ #include "config.h" #include "job_ctx.h" #include "kafka.h" +#include "logger.h" #include "mocking.h" #include "monit.h" #include "pcapng.h" @@ -121,6 +122,8 @@ void * dp_trace_process_thread(void * arg) marsio_thread_init(mr_instance); + zlog_category_t * kafak_dump_logger = kafka_dump_logger_get(); + unsigned int no_pkt_recv_cnt = 0; for (unsigned int qid_index = 0;; qid_index++) { @@ -178,7 +181,7 @@ void * dp_trace_process_thread(void * arg) if (conf->kafka_dump_to_log) { - kafka_dump_to_log(kafka_topic, (void *)data, size); + kafka_dump_to_log(kafak_dump_logger, (void *)data, size); } int ret = kafka_produce(kafka_topic, (void *)data, size); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 5fb20e3..3c175f2 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -9,6 +9,7 @@ set(DP_TRACE_TELEMETRY_SOURCES ${CMAKE_SOURCE_DIR}/src/mocking.c ${CMAKE_SOURCE_DIR}/src/monit.c ${CMAKE_SOURCE_DIR}/src/http_serv.c + ${CMAKE_SOURCE_DIR}/src/logger.c ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) add_executable(cmocka_test cmocka_test.c ${DP_TRACE_TELEMETRY_SOURCES}) diff --git a/test/cmocka_test.c b/test/cmocka_test.c index b96a368..64681b3 100644 --- a/test/cmocka_test.c +++ b/test/cmocka_test.c @@ -10,7 +10,6 @@ #include <cmocka.h> struct mr_instance * mr_instance = NULL; -unsigned int zlog_env_is_init = 0; int setup(void ** state) { |
