summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-12 06:24:39 +0000
committer童宗振 <[email protected]>2024-04-12 06:24:39 +0000
commit862045d3aaf61bfaf763e6320f5894f7b55c41ed (patch)
treeff316b05c1aca525e25c61c40b48a3e9a4333997 /src
parent4d678720f09744a381f458f4305694feefff5eec (diff)
Add zlog
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt2
-rw-r--r--src/common.h5
-rw-r--r--src/config.c27
-rw-r--r--src/config.h4
-rw-r--r--src/job_ctx.c2
-rw-r--r--src/kafka.c12
-rw-r--r--src/main.c43
-rw-r--r--src/mocking.c30
-rw-r--r--src/trace_output.c29
9 files changed, 83 insertions, 71 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index 1c7e109..15e9066 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -12,7 +12,7 @@ set(DP_TELEMETRY_SRC
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
add_executable(${PROJECT_NAME} ${DP_TELEMETRY_SRC})
-target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka libcmocka uuid pthread)
+target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka libcmocka libzlog uuid pthread)
if(ENABLE_DEVELOP_MOCKING)
target_link_options(${PROJECT_NAME} PRIVATE -Wl,--wrap=kafka_handle_create,--wrap=kafka_topic_new,--wrap=kafka_produce)
endif()
diff --git a/src/common.h b/src/common.h
index 2d45664..56a5fbf 100644
--- a/src/common.h
+++ b/src/common.h
@@ -1,11 +1,12 @@
#pragma once
#include "marsio.h"
+#include <zlog.h>
+
#include <assert.h>
#include <stdint.h>
#include <string.h>
#include <sys/stat.h>
-#include <syslog.h>
#include <unistd.h>
#ifndef MR_SYMBOL_MAX
@@ -27,7 +28,7 @@
{ \
if (!(condition)) \
{ \
- syslog(LOG_ERR, fmt, ##__VA_ARGS__); \
+ dzlog_error(fmt, ##__VA_ARGS__); \
exit(EXIT_FAILURE); \
} \
} while (0)
diff --git a/src/config.c b/src/config.c
index a9f653e..1834353 100644
--- a/src/config.c
+++ b/src/config.c
@@ -14,11 +14,14 @@
extern struct mr_instance * mr_instance;
static struct config * g_conf = NULL;
-struct config * config_create()
+const struct config * config_create(const char * config_path, const char * dy_config_path)
{
struct config * conf = calloc(1, sizeof(struct config));
CHECK_OR_EXIT(conf, "calloc failed: %s", strerror(errno));
+ snprintf(conf->config_path, sizeof(conf->config_path), "%s", config_path);
+ snprintf(conf->dy_config_path, sizeof(conf->dy_config_path), "%s", dy_config_path);
+
g_conf = conf;
return conf;
}
@@ -38,7 +41,7 @@ void config_load()
{
if (access(g_conf->config_path, R_OK) != 0)
{
- syslog(LOG_ERR, "Configure File %s is not existed. Failed. ", g_conf->config_path);
+ dzlog_error("Configure File %s is not existed. Failed. ", g_conf->config_path);
exit(EXIT_FAILURE);
}
@@ -52,14 +55,8 @@ void config_load()
CPU_SET(io_cores[i], &g_conf->cpu_set_io);
}
- unsigned int loglevel = LOG_ERR;
- MESA_load_profile_uint_def(g_conf->config_path, "global", "loglevel", &loglevel, loglevel);
- if (!(loglevel <= LOG_DEBUG && loglevel >= LOG_EMERG))
- {
- syslog(LOG_ERR, "global loglevel Must between LOG_DEBUG(7) and LOG_EMERG(0)");
- exit(EXIT_FAILURE);
- }
- g_conf->log_level = loglevel;
+ MESA_load_profile_string_nodef(g_conf->config_path, "global", "zlog_config_path", g_conf->zlog_config_path,
+ sizeof(g_conf->zlog_config_path));
char dp_trace_dir[PATH_MAX] = "/tmp";
MESA_load_profile_string_nodef(g_conf->config_path, "global", "dp_trace_dir", dp_trace_dir, sizeof(dp_trace_dir));
@@ -75,7 +72,7 @@ void config_load()
g_conf->sled_ip = getenv("SLED_IP");
if (g_conf->sled_ip == NULL)
{
- syslog(LOG_WARNING, "SLED_IP environment variable does not exist.");
+ dzlog_warn("SLED_IP environment variable does not exist.");
}
MESA_load_profile_string_def(g_conf->config_path, "kafka", "borker_list", g_conf->broker_list,
@@ -92,12 +89,12 @@ void dynamic_config_load()
{
if (access(g_conf->dy_config_path, R_OK) != 0)
{
- syslog(LOG_ERR, "Dynamic configure file %s is not existed. Do not load dynamic configuration files. ",
- g_conf->dy_config_path);
+ dzlog_error("Dynamic configure file %s is not existed. Do not load dynamic configuration files. ",
+ g_conf->dy_config_path);
return;
}
- syslog(LOG_INFO, "Loading data path trace configuration file...");
+ dzlog_info("Loading data path trace configuration file...");
int ret = 0;
@@ -148,7 +145,7 @@ void dynamic_config_load()
desc_i->snaplen = (snaplen == 0) ? UINT32_MAX : snaplen;
}
- syslog(LOG_INFO, "Loading data path trace configuration file is completed.");
+ dzlog_info("Loading data path trace configuration file is completed.");
}
void dynamic_config_load_and_apply()
diff --git a/src/config.h b/src/config.h
index ae8cc13..e69bba9 100644
--- a/src/config.h
+++ b/src/config.h
@@ -12,9 +12,9 @@ struct config
{
char config_path[PATH_MAX];
char dy_config_path[PATH_MAX];
+ char zlog_config_path[PATH_MAX];
cpu_set_t cpu_set_io;
- uint8_t log_level;
// device Information
char * sled_ip;
@@ -33,7 +33,7 @@ struct config
struct dp_trace_job_desc desc[DP_TRACE_JOB_NUM_MAX];
};
-struct config * config_create();
+const struct config * config_create(const char * config_path, const char * dy_config_path);
const struct config * global_config_get();
void global_config_destroy();
void config_load();
diff --git a/src/job_ctx.c b/src/job_ctx.c
index 6630368..f42afc2 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -52,7 +52,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8
int ret = marsio_dp_trace_job_id_uesd_get(mr_instance, &jobs_id_used);
if (ret < 0)
{
- syslog(LOG_ERR, "marsio_dp_trace_job_id_uesd_get faild");
+ dzlog_error("marsio_dp_trace_job_id_uesd_get faild");
return;
}
diff --git a/src/kafka.c b/src/kafka.c
index cf3a4d2..7801ee1 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -13,13 +13,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user
ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
- syslog(LOG_ERR, "Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
+ dzlog_error("Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
goto error;
}
ret = rd_kafka_conf_set(rconf, "topic.metadata.refresh.interval.ms", "600000", kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
- syslog(LOG_ERR, "Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
+ dzlog_error("Error to set kafka \"topic.metadata.refresh.interval.ms\", %s.", kafka_errstr);
goto error;
}
@@ -31,13 +31,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user
ret = rd_kafka_conf_set(rconf, "sasl.username", sasl_username, kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
- syslog(LOG_ERR, "Error to set kafka \"sasl.username\", %s.", kafka_errstr);
+ dzlog_error("Error to set kafka \"sasl.username\", %s.", kafka_errstr);
goto error;
}
ret = rd_kafka_conf_set(rconf, "sasl.password", sasl_passwd, kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
- syslog(LOG_ERR, "Error to set kafka \"sasl.password\", %s.", kafka_errstr);
+ dzlog_error("Error to set kafka \"sasl.password\", %s.", kafka_errstr);
goto error;
}
@@ -46,13 +46,13 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user
rconf = NULL;
if (handle == NULL)
{
- syslog(LOG_ERR, "Error to new kafka, %s.", kafka_errstr);
+ dzlog_error("Error to new kafka, %s.", kafka_errstr);
goto error;
}
if (rd_kafka_brokers_add(handle, brokerlist) == 0)
{
- syslog(LOG_ERR, "Error to add kakfa bokers.");
+ dzlog_error("Error to add kakfa bokers.");
goto error;
}
diff --git a/src/main.c b/src/main.c
index 4588c63..1f318df 100644
--- a/src/main.c
+++ b/src/main.c
@@ -23,7 +23,7 @@ static void signal_handler(evutil_socket_t fd, short what, void * arg)
case SIGHUP:
dynamic_config_load_and_apply();
default:
- syslog(LOG_WARNING, "Received unexpected signal: %d", fd);
+ dzlog_warn("Received unexpected signal: %d", fd);
}
}
@@ -54,7 +54,7 @@ void signal_event_init()
pthread_t tid;
if (pthread_create(&tid, NULL, signal_event_thread_dispatch, evbase) != 0)
{
- syslog(LOG_ERR, "failed to create thread for event_base dispatch");
+ dzlog_error("failed to create thread for event_base dispatch");
exit(EXIT_FAILURE);
}
}
@@ -76,11 +76,10 @@ void usage()
int main(int argc, char * argv[])
{
- openlog("dp_trace_telemetry", LOG_CONS, 0);
- syslog(LOG_DEBUG, "dp_trace_telemetry start.");
+ int ret = 0;
- // Specify configuration file location
- struct config * conf = config_create();
+ char config_path[PATH_MAX];
+ char dy_config_path[PATH_MAX];
int opt = 0;
while ((opt = getopt(argc, argv, "c:d:h")) != -1)
{
@@ -92,11 +91,11 @@ int main(int argc, char * argv[])
break;
}
case 'c': {
- snprintf(conf->config_path, sizeof(conf->config_path), "%s", optarg);
+ snprintf(config_path, sizeof(config_path), "%s", optarg);
break;
}
case 'd': {
- snprintf(conf->dy_config_path, sizeof(conf->dy_config_path), "%s", optarg);
+ snprintf(dy_config_path, sizeof(dy_config_path), "%s", optarg);
break;
}
default:
@@ -105,15 +104,31 @@ int main(int argc, char * argv[])
}
}
+ config_create(config_path, dy_config_path);
+
// Load configuration file
config_load();
- dp_trace_output_init();
- setlogmask(LOG_UPTO(conf->log_level));
+ const struct config * conf = global_config_get();
+
+ ret = dzlog_init(conf->zlog_config_path, "default_zlog_category");
+ if (ret != 0)
+ {
+ char * zlog_profile_error = getenv("ZLOG_PROFILE_ERROR");
+ printf("fail in dzlog_int.\n");
+ if (zlog_profile_error != NULL)
+ {
+ printf("The zlog error log is recorded in:%s\n", zlog_profile_error);
+ }
+ return 0;
+ }
+
+ dp_trace_output_init();
mr_instance = marsio_create();
- marsio_option_set(mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &conf->cpu_set_io, sizeof(conf->cpu_set_io));
+ cpu_set_t cpu_set_io = conf->cpu_set_io;
+ marsio_option_set(mr_instance, MARSIO_OPT_THREAD_MASK_IN_CPUSET, &cpu_set_io, sizeof(cpu_set_io));
marsio_init(mr_instance, appsym);
@@ -125,12 +140,12 @@ int main(int argc, char * argv[])
if (nr_thread != 4)
{
- syslog(LOG_EMERG, "Currently, four threads must be created to read the data. This restriction will be removed "
- "later");
+ dzlog_error("Currently, four threads must be created to read the data. This restriction will be removed "
+ "later");
return 0;
}
- syslog(LOG_INFO, "Thread Count = %d\n", nr_thread);
+ dzlog_info("Thread Count = %d", nr_thread);
pthread_t tmp_pid[nr_thread];
for (int i = 0; i < nr_thread; i++)
diff --git a/src/mocking.c b/src/mocking.c
index 63788a4..5419e36 100644
--- a/src/mocking.c
+++ b/src/mocking.c
@@ -73,7 +73,7 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
{
if (i >= 128)
{
- printf("too many measurements...");
+ dzlog_debug("too many measurements...");
continue;
}
mpack_node_t measurement = mpack_node_array_at(measurements_val, i);
@@ -86,22 +86,22 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
}
// print
- printf("microseconds %ld\n", packet.microseconds);
- printf("job_id %s\n", packet.job_id_str);
- printf("sled_ip %s\n", packet.sled_ip);
- printf("device_group %s\n", packet.device_group);
- printf("source_ip %s\n", packet.source_ip);
- printf("source_port %d\n", packet.source_port);
- printf("server_ip %s\n", packet.server_ip);
- printf("server_port %d\n", packet.server_port);
- printf("packet_length %d\n", packet.packet_length);
+ dzlog_debug("microseconds %ld", packet.microseconds);
+ dzlog_debug("job_id %s", packet.job_id_str);
+ dzlog_debug("sled_ip %s", packet.sled_ip);
+ dzlog_debug("device_group %s", packet.device_group);
+ dzlog_debug("source_ip %s", packet.source_ip);
+ dzlog_debug("source_port %d", packet.source_port);
+ dzlog_debug("server_ip %s", packet.server_ip);
+ dzlog_debug("server_port %d", packet.server_port);
+ dzlog_debug("packet_length %d", packet.packet_length);
for (unsigned int i = 0; i < packet.measurements_num; i++)
{
- printf("record %u:\n", i);
- printf("tv_sec %d\n", packet.record[i].tv_sec);
- printf("tv_nsec %d\n", packet.record[i].tv_nsec);
- printf("app %.*s\n", packet.record[i].app_len, packet.record[i].app);
- printf("comments %.*s\n", packet.record[i].comments_len, packet.record[i].comments);
+ dzlog_debug("record %u:", i);
+ dzlog_debug("tv_sec %d", packet.record[i].tv_sec);
+ dzlog_debug("tv_nsec %d", packet.record[i].tv_nsec);
+ dzlog_debug("app %.*s", packet.record[i].app_len, packet.record[i].app);
+ dzlog_debug("comments %.*s", packet.record[i].comments_len, packet.record[i].comments);
}
free(payload);
diff --git a/src/trace_output.c b/src/trace_output.c
index 79e9d14..a135d59 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -49,7 +49,7 @@ void dp_trace_output_init()
int ret = mkdir(dp_trace_dir, 0755);
if (ret != 0 && errno != EEXIST)
{
- syslog(LOG_ERR, "Failed to create directory:%s.%s", dp_trace_dir, strerror(errno));
+ dzlog_error("Failed to create directory:%s.%s", dp_trace_dir, strerror(errno));
exit(EXIT_FAILURE);
}
@@ -69,7 +69,7 @@ void dp_trace_output_init()
ret = pthread_mutex_init(&dp_trace_output[i].file_mutex, &attr);
if (ret != 0)
{
- syslog(LOG_ERR, "pthread_mutex_init failed(ret=%d)", ret);
+ dzlog_error("pthread_mutex_init failed(ret=%d)", ret);
exit(EXIT_FAILURE);
}
}
@@ -133,8 +133,7 @@ void * dp_trace_process_thread(void * arg)
}
else
{
- syslog(LOG_INFO,
- "The job has been deleted. The trace content corresponding to the job has been discarded.");
+ dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded.");
marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
}
}
@@ -272,7 +271,7 @@ bool dp_trace_file_reach_max_size(job_bitmap_t job_id)
struct stat file_stat;
if (unlikely(stat(dp_trace_output[index].file_path, &file_stat) == -1))
{
- syslog(LOG_ERR, "Failed to obtain data path trace file status.");
+ dzlog_error("Failed to obtain data path trace file status.");
ret = true;
goto end;
}
@@ -302,7 +301,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id)
const char * file_bak_path = dp_trace_output[index].file_bak_path;
if (rename(file_path, file_bak_path) < 0)
{
- syslog(LOG_ERR, "rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno));
+ dzlog_error("rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno));
}
pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path);
@@ -337,39 +336,39 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id)
if (rename(file_path, file_middle_path) < 0)
{
- syslog(LOG_ERR, "rename %s to %s failed. error info: %s", file_path, file_middle_path, strerror(errno));
+ dzlog_error("rename %s to %s failed. error info: %s", file_path, file_middle_path, strerror(errno));
goto end;
}
char command[2 * PATH_MAX];
snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1",
global_config_get()->dp_trace_merge_timeout, file_path, file_middle_path, file_bak_path);
- syslog(LOG_INFO, "merge trace file: %s", command);
+ dzlog_info("merge trace file: %s", command);
FILE * fp;
char buffer[1024];
fp = popen(command, "r");
if (fp == NULL)
{
- syslog(LOG_ERR, "open pipe failed: %s", strerror(errno));
+ dzlog_error("open pipe failed: %s", strerror(errno));
goto end;
}
while (fgets(buffer, sizeof(buffer), fp) != NULL)
{
- syslog(LOG_ERR, "merge trace file output: %s", buffer);
+ dzlog_error("merge trace file output: %s", buffer);
}
pclose(fp);
if (remove(file_middle_path) < 0)
{
- syslog(LOG_ERR, "remove %s failed. error info: %s", file_middle_path, strerror(errno));
+ dzlog_error("remove %s failed. error info: %s", file_middle_path, strerror(errno));
}
if (remove(file_bak_path) < 0)
{
- syslog(LOG_ERR, "remove %s failed. error info: %s", file_bak_path, strerror(errno));
+ dzlog_error("remove %s failed. error info: %s", file_bak_path, strerror(errno));
}
end:
@@ -386,13 +385,13 @@ int dp_trace_file_mutex_lock(job_bitmap_t job_id)
ret = pthread_mutex_unlock(&dp_trace_output[index].file_mutex);
if (ret != 0)
{
- syslog(LOG_ERR, "EOWNERDEAD -> job ctx unlock failed");
+ dzlog_error("EOWNERDEAD -> job ctx unlock failed");
return -1;
}
}
else if (ret != 0)
{
- syslog(LOG_ERR, "job ctx lock failed");
+ dzlog_error("job ctx lock failed");
return -1;
}
return 0;
@@ -501,7 +500,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
/* finish writing */
if (mpack_writer_destroy(&writer) != mpack_ok)
{
- syslog(LOG_ERR, "An error occurred during the data path decode to message pack!\n");
+ dzlog_error("An error occurred during the data path decode to message pack!");
}
}