summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-16 07:02:53 +0000
committer童宗振 <[email protected]>2024-04-16 07:02:53 +0000
commita505ace34b763afb14c57cfcf7f41b0a89464abe (patch)
tree3a6170aff2d900d653b6dac6b7b6a04104429219 /src
parent4e5f500ec7f3db33a74d46154fae2fcb660a5a76 (diff)
Modify relative path
Diffstat (limited to 'src')
-rw-r--r--src/config.c75
-rw-r--r--src/config.h2
-rw-r--r--src/kafka.c2
-rw-r--r--src/maat.c1
-rw-r--r--src/mocking.c6
-rw-r--r--src/trace_output.c6
6 files changed, 72 insertions, 20 deletions
diff --git a/src/config.c b/src/config.c
index 78a9aa8..b42335e 100644
--- a/src/config.c
+++ b/src/config.c
@@ -14,6 +14,10 @@
extern struct mr_instance * mr_instance;
static struct config * g_conf = NULL;
+static void main_process_realpath_get(char * absolute_path, size_t size);
+static char * paths_combine(const char * restrict absolute_path, const char * restrict relative_path,
+ char * restrict resolve_path);
+
const struct config * config_create(const char * config_path, const char * dy_config_path)
{
struct config * conf = calloc(1, sizeof(struct config));
@@ -21,6 +25,8 @@ const struct config * config_create(const char * config_path, const char * dy_co
snprintf(conf->config_path, sizeof(conf->config_path), "%s", config_path);
snprintf(conf->dy_config_path, sizeof(conf->dy_config_path), "%s", dy_config_path);
+ main_process_realpath_get(conf->absolute_path, sizeof(conf->absolute_path));
+ printf("process absolute path %s\n", conf->absolute_path);
g_conf = conf;
return conf;
@@ -39,10 +45,12 @@ void global_config_destroy()
void config_load()
{
+ char tmp_path[PATH_MAX];
+
const char * config_path = g_conf->config_path;
if (access(config_path, R_OK) != 0)
{
- printf("Configure File(%s) load failed:%s ", config_path, strerror(errno));
+ printf("Configure File(%s) load failed:%s \n", config_path, strerror(errno));
exit(EXIT_FAILURE);
}
@@ -56,16 +64,16 @@ void config_load()
CPU_SET(io_cores[i], &g_conf->cpu_set_io);
}
- MESA_load_profile_string_nodef(config_path, "global", "zlog_config_path", g_conf->zlog_config_path,
- sizeof(g_conf->zlog_config_path));
+ MESA_load_profile_string_nodef(config_path, "global", "zlog_config_path", tmp_path, sizeof(tmp_path));
+ paths_combine(g_conf->absolute_path, tmp_path, g_conf->zlog_config_path);
+ printf("zlog config path: %s\n", g_conf->zlog_config_path);
- char dp_trace_dir[PATH_MAX] = "/tmp";
- MESA_load_profile_string_nodef(config_path, "global", "dp_trace_dir", dp_trace_dir, sizeof(dp_trace_dir));
- if (dp_trace_dir[strlen(dp_trace_dir) - 1] == '/')
+ MESA_load_profile_string_def(config_path, "global", "dp_trace_dir", tmp_path, sizeof(tmp_path), "/tmp");
+ if (tmp_path[strlen(tmp_path) - 1] == '/')
{
- dp_trace_dir[strlen(dp_trace_dir) - 1] = '\0';
+ tmp_path[strlen(tmp_path) - 1] = '\0';
}
- snprintf(g_conf->dp_trace_dir, sizeof(g_conf->dp_trace_dir), "%s", dp_trace_dir);
+ snprintf(g_conf->dp_trace_dir, sizeof(g_conf->dp_trace_dir), "%s", tmp_path);
MESA_load_profile_string_def(g_conf->device_group, "global", "device_group", g_conf->device_group,
sizeof(g_conf->device_group), "unknow");
@@ -73,7 +81,7 @@ void config_load()
g_conf->sled_ip = getenv("SLED_IP");
if (g_conf->sled_ip == NULL)
{
- printf("SLED_IP environment variable does not exist.");
+ printf("SLED_IP environment variable does not exist.\n");
}
MESA_load_profile_string_def(config_path, "kafka", "borker_list", g_conf->broker_list, sizeof(g_conf->broker_list),
@@ -87,10 +95,15 @@ void config_load()
MESA_load_profile_int_def(config_path, "maat", "maat_log_level", &(g_conf->maat_log_level), LOG_LEVEL_FATAL);
MESA_load_profile_int_def(config_path, "maat", "maat_input_mode", &(g_conf->maat_input_mode), 0);
- MESA_load_profile_string_def(config_path, "maat", "table_schema", g_conf->table_schema,
- sizeof(g_conf->table_schema), "");
- MESA_load_profile_string_def(config_path, "maat", "json_cfg_file", g_conf->json_cfg_file,
- sizeof(g_conf->json_cfg_file), "");
+
+ MESA_load_profile_string_def(config_path, "maat", "table_schema", tmp_path, sizeof(tmp_path), "");
+ paths_combine(g_conf->absolute_path, tmp_path, g_conf->table_schema);
+ printf("maat table_schema path: %s\n", g_conf->table_schema);
+
+ MESA_load_profile_string_def(config_path, "maat", "json_cfg_file", tmp_path, sizeof(tmp_path), "");
+ paths_combine(g_conf->absolute_path, tmp_path, g_conf->json_cfg_file);
+ printf("maat json config file path: %s\n", g_conf->json_cfg_file);
+
MESA_load_profile_string_def(config_path, "maat", "maat_redis_server", g_conf->redis_server,
sizeof(g_conf->redis_server), "");
MESA_load_profile_string_def(config_path, "maat", "maat_redis_port_range", g_conf->redis_port_range,
@@ -166,3 +179,39 @@ void dynamic_config_load_and_apply()
dynamic_config_load();
job_rule_apply(g_conf->desc, TELEMETRY_DIM(g_conf->desc), DP_TRACE_ROLE);
}
+
+//////////////////////// helper function /////////////////////////
+static void main_process_realpath_get(char * absolute_path, size_t size)
+{
+ // readlink requires "/proc/self/exe" on Linux and "/proc/curproc/file" on FreeBSD
+ // https://stackoverflow.com/questions/933850/how-do-i-find-the-location-of-the-executable-in-c
+ // this function only for linux
+ ssize_t len = readlink("/proc/self/exe", absolute_path, size - 1);
+ DP_TRACE_VERIFY(len != -1, "get main process realpath failed:%s", strerror(errno));
+ absolute_path[len] = '\0';
+}
+
+// Combine relative paths and absolute paths into a new absolute path
+static char * paths_combine(const char * restrict absolute_path, const char * restrict relative_path,
+ char * restrict resolve_path)
+{
+ if (relative_path[0] == '/')
+ {
+ strcpy(resolve_path, relative_path);
+ return resolve_path;
+ }
+
+ char combine_path[PATH_MAX];
+ strcpy(combine_path, absolute_path);
+
+ char * last_slash = strrchr(combine_path, '/');
+ DP_TRACE_VERIFY(last_slash != NULL, "Invalid absolute path.");
+ *last_slash = '\0';
+
+ strcat(combine_path, "/");
+ strcat(combine_path, relative_path);
+
+ realpath(combine_path, resolve_path);
+
+ return resolve_path;
+} \ No newline at end of file
diff --git a/src/config.h b/src/config.h
index 8f81e59..de8778d 100644
--- a/src/config.h
+++ b/src/config.h
@@ -10,6 +10,8 @@
struct config
{
+ char absolute_path[PATH_MAX];
+
char config_path[PATH_MAX];
char dy_config_path[PATH_MAX];
char zlog_config_path[PATH_MAX];
diff --git a/src/kafka.c b/src/kafka.c
index 7801ee1..60146f7 100644
--- a/src/kafka.c
+++ b/src/kafka.c
@@ -10,7 +10,7 @@ rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_user
rconf = rd_kafka_conf_new();
- ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000000", kafka_errstr, sizeof(kafka_errstr));
+ ret = rd_kafka_conf_set(rconf, "queue.buffering.max.messages", "1000", kafka_errstr, sizeof(kafka_errstr));
if (ret != RD_KAFKA_CONF_OK)
{
dzlog_error("Error to set kafka \"queue.buffering.max.messages\", %s.", kafka_errstr);
diff --git a/src/maat.c b/src/maat.c
index 9062321..836b8ae 100644
--- a/src/maat.c
+++ b/src/maat.c
@@ -148,6 +148,7 @@ static void telemetry_job_add_cb(const char * table_name, int table_id, const ch
}
job_desc->rule_index = index;
+ job_desc->sampling = 1;
job_desc->enable = true;
memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc));
diff --git a/src/mocking.c b/src/mocking.c
index 5419e36..c3b232c 100644
--- a/src/mocking.c
+++ b/src/mocking.c
@@ -47,7 +47,7 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
mpack_tree_parse(&tree);
mpack_node_t root = mpack_tree_root(&tree);
- packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp"));
+ packet.microseconds = mpack_node_i64(mpack_node_map_cstr(root, "timestamp_us"));
mpack_node_copy_cstr(mpack_node_map_cstr(root, "job_id"), packet.job_id_str, sizeof(packet.job_id_str));
@@ -59,9 +59,9 @@ int __wrap_kafka_produce(rd_kafka_topic_t * rkt, void * payload, size_t len)
packet.source_port = mpack_node_i32(mpack_node_map_cstr(root, "source_port"));
- mpack_node_copy_cstr(mpack_node_map_cstr(root, "server_ip"), packet.server_ip, sizeof(packet.server_ip));
+ mpack_node_copy_cstr(mpack_node_map_cstr(root, "destination_ip"), packet.server_ip, sizeof(packet.server_ip));
- packet.server_port = mpack_node_i32(mpack_node_map_cstr(root, "server_port"));
+ packet.server_port = mpack_node_i32(mpack_node_map_cstr(root, "destination_port"));
packet.packet = mpack_node_bin_data(mpack_node_map_cstr(root, "packet"));
diff --git a/src/trace_output.c b/src/trace_output.c
index 96288ae..b984390 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -423,7 +423,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_writer_init_growable(&writer, data, size);
mpack_build_map(&writer);
- mpack_write_cstr(&writer, "timestamp");
+ mpack_write_cstr(&writer, "timestamp_us");
mpack_write_i64(&writer, microseconds);
char job_id_str[10];
@@ -450,10 +450,10 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
mpack_write_cstr(&writer, "source_port");
mpack_write_i32(&writer, pkt_info.src_port);
- mpack_write_cstr(&writer, "server_ip");
+ mpack_write_cstr(&writer, "destination_ip");
mpack_write_cstr(&writer, pkt_info.dst_addr_str);
- mpack_write_cstr(&writer, "server_port");
+ mpack_write_cstr(&writer, "destination_port");
mpack_write_i32(&writer, pkt_info.dst_port);
mpack_write_cstr(&writer, "packet");