diff options
| author | 童宗振 <[email protected]> | 2024-04-12 06:24:39 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-12 06:24:39 +0000 |
| commit | 862045d3aaf61bfaf763e6320f5894f7b55c41ed (patch) | |
| tree | ff316b05c1aca525e25c61c40b48a3e9a4333997 /src | |
| parent | 4d678720f09744a381f458f4305694feefff5eec (diff) | |
Add zlog
Diffstat (limited to 'src')
| -rw-r--r-- | src/CMakeLists.txt | 2 | ||||
| -rw-r--r-- | src/common.h | 5 | ||||
| -rw-r--r-- | src/config.c | 27 | ||||
| -rw-r--r-- | src/config.h | 4 | ||||
| -rw-r--r-- | src/job_ctx.c | 2 | ||||
| -rw-r--r-- | src/kafka.c | 12 | ||||
| -rw-r--r-- | src/main.c | 43 | ||||
| -rw-r--r-- | src/mocking.c | 30 | ||||
| -rw-r--r-- | src/trace_output.c | 29 |
9 files changed, 83 insertions, 71 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 1c7e109..15e9066 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -12,7 +12,7 @@ set(DP_TELEMETRY_SRC ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) add_executable(${PROJECT_NAME} ${DP_TELEMETRY_SRC}) -target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka libcmocka uuid pthread) +target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka libcmocka libzlog uuid pthread) if(ENABLE_DEVELOP_MOCKING) target_link_options(${PROJECT_NAME} PRIVATE -Wl,--wrap=kafka_handle_create,--wrap=kafka_topic_new,--wrap=kafka_produce) endif() diff --git a/src/common.h b/src/common.h index 2d45664..56a5fbf 100644 --- a/src/common.h +++ b/src/common.h @@ -1,11 +1,12 @@ #pragma once #include "marsio.h" +#include <zlog.h> + #include <assert.h> #include <stdint.h> #include <string.h> #include <sys/stat.h> -#include <syslog.h> #include <unistd.h> #ifndef MR_SYMBOL_MAX @@ -27,7 +28,7 @@ { \ if (!(condition)) \ { \ - syslog(LOG_ERR, fmt, ##__VA_ARGS__); \ + dzlog_error(fmt, ##__VA_ARGS__); \ exit(EXIT_FAILURE); \ } \ } while (0) diff --git a/src/config.c b/src/config.c index a9f653e..1834353 100644 --- a/src/config.c +++ b/src/config.c @@ -14,11 +14,14 @@ extern struct mr_instance * mr_instance; static struct config * g_conf = NULL; -struct config * config_create() +const struct config * config_create(const char * config_path, const char * dy_config_path) { struct config * conf = calloc(1, sizeof(struct config)); CHECK_OR_EXIT(conf, "calloc failed: %s", strerror(errno)); + snprintf(conf->config_path, sizeof(conf->config_path), "%s", config_path); + snprintf(conf->dy_config_path, sizeof(conf->dy_config_path), "%s", dy_config_path); + g_conf = conf; return conf; } @@ -38,7 +41,7 @@ void config_load() { if (access(g_conf->config_path, R_OK) != 0) { - syslog(LOG_ERR, "Configure File %s is not existed. Failed. ", g_conf->config_path); + dzlog_error("Configure File %s is not existed. Failed. ", g_conf->config_path); exit(EXIT_FAILURE); } @@ -52,14 +55,8 @@ void config_load() CPU_SET(io_cores[i], &g_conf->cpu_set_io); } - unsigned int loglevel = LOG_ERR; - MESA_load_profile_uint_def(g_conf->config_path, "global", "loglevel", &loglevel, loglevel); - if (!(loglevel <= LOG_DEBUG && loglevel >= LOG_EMERG)) - { - syslog(LOG_ERR, "global loglevel Must between LOG_DEBUG(7) and LOG_EMERG(0)"); - exit(EXIT_FAILURE); - } - g_conf->log_level = loglevel; + MESA_load_profile_string_nodef(g_conf->config_path, "global", "zlog_config_path", g_conf->zlog_config_path, + sizeof(g_conf->zlog_config_path)); char dp_trace_dir[PATH_MAX] = "/tmp"; MESA_load_profile_string_nodef(g_conf->config_path, "global", "dp_trace_dir", dp_trace_dir, sizeof(dp_trace_dir)); @@ -75,7 +72,7 @@ void config_load() g_conf->sled_ip = getenv("SLED_IP"); if (g_conf->sled_ip == NULL) { - syslog(LOG_WARNING, "SLED_IP environment variable does not exist."); + dzlog_warn("SLED_IP environment variable does not exist."); } MESA_load_profile_string_def(g_conf->config_path, "kafka", "borker_list", g_conf->broker_list, @@ -92,12 +89,12 @@ void dynamic_config_load() { if (access(g_conf->dy_config_path, R_OK) != 0) { - syslog(LOG_ERR, "Dynamic configure file %s is not existed. Do not load dynamic configuration files. ", - g_conf->dy_config_path); + dzlog_error("Dynamic configure file %s is not existed. Do not load dynamic configuration files. ", + g_conf->dy_config_path); return; } - syslog(LOG_INFO, "Loading data path trace configuration file..."); + dzlog_info("Loading data path trace configuration file..."); int ret = 0; @@ -148,7 +145,7 @@ void dynamic_config_load() desc_i->snaplen = (snaplen == 0) ? UINT32_MAX : snaplen; } - syslog(LOG_INFO, "Loading data path trace configuration file is completed."); + dzlog_info("Loading data path trace configuration file is completed."); } void dynamic_config_load_and_apply() diff --git a/src/config.h b/src/config.h index ae8cc13..e69bba9 100644 --- a/src/config.h +++ b/src/config.h @@ -12,9 +12,9 @@ struct config { char config_path[PATH_MAX]; char dy_config_path[PATH_MAX]; + char zlog_config_path[PATH_MAX]; cpu_set_t cpu_set_io; - uint8_t log_level; // device Information char * sled_ip; @@ -33,7 +33,7 @@ struct config struct dp_trace_job_desc desc[DP_TRACE_JOB_NUM_MAX]; }; -struct config * config_create(); +const struct config * config_create(const char * config_path, const char * dy_config_path); const struct config * global_config_get(); void global_config_destroy(); void config_load(); diff --git a/src/job_ctx.c b/src/job_ctx.c index 6630368..f42afc2 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -52,7 +52,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8 int ret = marsio_dp_trace_job_id_uesd_get(mr_instance, &jobs_id_used); if (ret < 0) { - syslog(LOG_ERR, "marsio_dp_trace_job_id_uesd_get faild"); + dzlog_error("marsio_dp_trace_job_id_uesd_get faild"); return; } diff --git a/src/kafka.c b/src/kafka.c index cf3a4d2..7801ee1 100644 --- a/src/kafka.c +++ b/src/kafka.c @@ -13,13 +13,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { - syslog(LOG_ERR, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); + dzlog_error("Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr); goto error; } ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { - syslog(LOG_ERR, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); + dzlog_error("Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr); goto error; } @@ -31,13 +31,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { - syslog(LOG_ERR, "Error to set kafka \"sasl.username\", %s.", kafka_errstr); + dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr); goto error; } ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr)); if (ret != RD_KAFKA_CONF_OK) { - syslog(LOG_ERR, "Error to set kafka \"sasl.password\", %s.", kafka_errstr); + dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr); goto error; } @@ -46,13 +46,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user rconf = NULL; if (handle == NULL) { - syslog(LOG_ERR, "Error to new kafka, %s.", kafka_errstr); + dzlog_error("Error to new kafka, %s.", kafka_errstr); goto error; } if (rd_kafka_brokers_add(handle, brokerlist) == 0) { - syslog(LOG_ERR, "Error to add kakfa bokers."); + dzlog_error("Error to add kakfa bokers."); goto error; } @@ -23,7 +23,7 @@ static void signal_handler(evutil_socket_t fd, short what, void * arg) case SIGHUP: dynamic_config_load_and_apply(); default: - syslog(LOG_WARNING, "Received unexpected signal: %d", fd); + dzlog_warn("Received unexpected signal: %d", fd); } } @@ -54,7 +54,7 @@ void signal_event_init() pthread_t tid; if (pthread_create(&tid, NULL, signal_event_thread_dispatch, evbase) != 0) { - syslog(LOG_ERR, "failed to create thread for event_base dispatch"); + dzlog_error("failed to create thread for event_base dispatch"); exit(EXIT_FAILURE); } } @@ -76,11 +76,10 @@ void usage() int main(int argc, char * argv[]) { - openlog("dp_trace_telemetry", LOG_CONS, 0); - syslog(LOG_DEBUG, "dp_trace_telemetry start."); + int ret = 0; - // Specify configuration file location - struct config * conf = config_create(); + char config_path[PATH_MAX]; + char dy_config_path[PATH_MAX]; int opt = 0; while ((opt = getopt(argc, argv, "c:d:h")) != -1) { @@ -92,11 +91,11 @@ int main(int argc, char * argv[]) break; } case 'c': { - snprintf(conf->config_path, sizeof(conf->config_path), "%s", optarg); + snprintf(config_path, sizeof(config_path), "%s", optarg); break; } case 'd': { - snprintf(conf->dy_config_path, sizeof(conf->dy_config_path), "%s", optarg); + snprintf(dy_config_path, sizeof(dy_config_path), "%s", optarg); break; } default: @@ -105,15 +104,31 @@ int main(int argc, char * argv[]) } } + config_create(config_path, dy_config_path); + // Load configuration file config_load(); - dp_trace_output_init(); - setlogmask(LOG_UPTO(conf->log_level)); + const struct config * conf = global_config_get(); + + ret = dzlog_init(conf->zlog_config_path, "default_zlog_category"); + if (ret != 0) + { + char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR"); + printf("fail in dzlog_int.\n"); + if (zlog_profile_error != NULL) + { + printf("The zlog error log is recorded in:%s\n", zlog_profile_error); + } + return 0; + } + + dp_trace_output_init(); mr_instance = marsio_create(); - marsio_option_set(mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &conf->cpu_set_io, sizeof(conf->cpu_set_io)); + cpu_set_t cpu_set_io = conf->cpu_set_io; + marsio_option_set(mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &cpu_set_io, sizeof(cpu_set_io)); marsio_init(mr_instance, appsym); @@ -125,12 +140,12 @@ int main(int argc, char * argv[]) if (nr_thread != 4) { - syslog(LOG_EMERG, "Currently, four threads must be created to read the data. This restriction will be removed " - "later"); + dzlog_error("Currently, four threads must be created to read the data. This restriction will be removed " + "later"); return 0; } - syslog(LOG_INFO, "Thread Count = %d\n", nr_thread); + dzlog_info("Thread Count = %d", nr_thread); pthread_t tmp_pid[nr_thread]; for (int i = 0; i < nr_thread; i++) diff --git a/src/mocking.c b/src/mocking.c index 63788a4..5419e36 100644 --- a/src/mocking.c +++ b/src/mocking.c @@ -73,7 +73,7 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len) { if (i >= 128) { - printf("too many measurements..."); + dzlog_debug("too many measurements..."); continue; } mpack_node_t measurement = mpack_node_array_at(measurements_val, i); @@ -86,22 +86,22 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len) } // print - printf("microseconds %ld\n", packet.microseconds); - printf("job_id %s\n", packet.job_id_str); - printf("sled_ip %s\n", packet.sled_ip); - printf("device_group %s\n", packet.device_group); - printf("source_ip %s\n", packet.source_ip); - printf("source_port %d\n", packet.source_port); - printf("server_ip %s\n", packet.server_ip); - printf("server_port %d\n", packet.server_port); - printf("packet_length %d\n", packet.packet_length); + dzlog_debug("microseconds %ld", packet.microseconds); + dzlog_debug("job_id %s", packet.job_id_str); + dzlog_debug("sled_ip %s", packet.sled_ip); + dzlog_debug("device_group %s", packet.device_group); + dzlog_debug("source_ip %s", packet.source_ip); + dzlog_debug("source_port %d", packet.source_port); + dzlog_debug("server_ip %s", packet.server_ip); + dzlog_debug("server_port %d", packet.server_port); + dzlog_debug("packet_length %d", packet.packet_length); for (unsigned int i = 0; i < packet.measurements_num; i++) { - printf("record %u:\n", i); - printf("tv_sec %d\n", packet.record[i].tv_sec); - printf("tv_nsec %d\n", packet.record[i].tv_nsec); - printf("app %.*s\n", packet.record[i].app_len, packet.record[i].app); - printf("comments %.*s\n", packet.record[i].comments_len, packet.record[i].comments); + dzlog_debug("record %u:", i); + dzlog_debug("tv_sec %d", packet.record[i].tv_sec); + dzlog_debug("tv_nsec %d", packet.record[i].tv_nsec); + dzlog_debug("app %.*s", packet.record[i].app_len, packet.record[i].app); + dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments); } free(payload); diff --git a/src/trace_output.c b/src/trace_output.c index 79e9d14..a135d59 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -49,7 +49,7 @@ void dp_trace_output_init() int ret = mkdir(dp_trace_dir, 0755); if (ret != 0 && errno != EEXIST) { - syslog(LOG_ERR, "Failed to create directory:%s.%s", dp_trace_dir, strerror(errno)); + dzlog_error("Failed to create directory:%s.%s", dp_trace_dir, strerror(errno)); exit(EXIT_FAILURE); } @@ -69,7 +69,7 @@ void dp_trace_output_init() ret = pthread_mutex_init(&dp_trace_output[i].file_mutex, &attr); if (ret != 0) { - syslog(LOG_ERR, "pthread_mutex_init failed(ret=%d)", ret); + dzlog_error("pthread_mutex_init failed(ret=%d)", ret); exit(EXIT_FAILURE); } } @@ -133,8 +133,7 @@ void * dp_trace_process_thread(void * arg) } else { - syslog(LOG_INFO, - "The job has been deleted. The trace content corresponding to the job has been discarded."); + dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded."); marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); } } @@ -272,7 +271,7 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id) struct stat file_stat; if (unlikely(stat(dp_trace_output[index].file_path, &file_stat) == -1)) { - syslog(LOG_ERR, "Failed to obtain data path trace file status."); + dzlog_error("Failed to obtain data path trace file status."); ret = true; goto end; } @@ -302,7 +301,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id) const char * file_bak_path = dp_trace_output[index].file_bak_path; if (rename(file_path, file_bak_path) < 0) { - syslog(LOG_ERR, "rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno)); + dzlog_error("rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno)); } pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path); @@ -337,39 +336,39 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id) if (rename(file_path, file_middle_path) < 0) { - syslog(LOG_ERR, "rename %s to %s failed. error info: %s", file_path, file_middle_path, strerror(errno)); + dzlog_error("rename %s to %s failed. error info: %s", file_path, file_middle_path, strerror(errno)); goto end; } char command[2 * PATH_MAX]; snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", global_config_get()->dp_trace_merge_timeout, file_path, file_middle_path, file_bak_path); - syslog(LOG_INFO, "merge trace file: %s", command); + dzlog_info("merge trace file: %s", command); FILE * fp; char buffer[1024]; fp = popen(command, "r"); if (fp == NULL) { - syslog(LOG_ERR, "open pipe failed: %s", strerror(errno)); + dzlog_error("open pipe failed: %s", strerror(errno)); goto end; } while (fgets(buffer, sizeof(buffer), fp) != NULL) { - syslog(LOG_ERR, "merge trace file output: %s", buffer); + dzlog_error("merge trace file output: %s", buffer); } pclose(fp); if (remove(file_middle_path) < 0) { - syslog(LOG_ERR, "remove %s failed. error info: %s", file_middle_path, strerror(errno)); + dzlog_error("remove %s failed. error info: %s", file_middle_path, strerror(errno)); } if (remove(file_bak_path) < 0) { - syslog(LOG_ERR, "remove %s failed. error info: %s", file_bak_path, strerror(errno)); + dzlog_error("remove %s failed. error info: %s", file_bak_path, strerror(errno)); } end: @@ -386,13 +385,13 @@ int dp_trace_file_mutex_lock(job_bitmap_t job_id) ret = pthread_mutex_unlock(&dp_trace_output[index].file_mutex); if (ret != 0) { - syslog(LOG_ERR, "EOWNERDEAD -> job ctx unlock failed"); + dzlog_error("EOWNERDEAD -> job ctx unlock failed"); return -1; } } else if (ret != 0) { - syslog(LOG_ERR, "job ctx lock failed"); + dzlog_error("job ctx lock failed"); return -1; } return 0; @@ -501,7 +500,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat /* finish writing */ if (mpack_writer_destroy(&writer) != mpack_ok) { - syslog(LOG_ERR, "An error occurred during the data path decode to message pack!\n"); + dzlog_error("An error occurred during the data path decode to message pack!"); } } |
