summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-18 08:39:56 +0000
committer童宗振 <[email protected]>2024-04-18 08:39:56 +0000
commit02248a63b89645d2a355943761b4835a2636d938 (patch)
treeeba1a3dbfe8cd0bce866949fd4133d37f5b653b2
parenta9ab020ae7acdcef516e100c26633a16a179ad9d (diff)
parentda470aef89ff01d435132bdee50fedede6dfab5a (diff)
Merge branch 'add_monit' into 'master'v0.1.1-20240418
add monit See merge request tsg/dp_telemetry_app!15
-rw-r--r--etc/dp_trace.conf1
-rw-r--r--include/config.h1
-rw-r--r--include/monit.h11
-rw-r--r--src/CMakeLists.txt1
-rw-r--r--src/config.c5
-rw-r--r--src/job_ctx.c3
-rw-r--r--src/main.c11
-rw-r--r--src/monit.c95
-rw-r--r--src/trace_output.c51
-rw-r--r--test/CMakeLists.txt1
10 files changed, 164 insertions, 16 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index da9d05a..9fad227 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -3,6 +3,7 @@ iocore=4,5,6,7
zlog_config_path=../etc/dp_trace_zlog.conf
dp_trace_dir=./
device_group=
+monit_file_path=/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving
[kafka]
borker_list="192.168.44.12"
diff --git a/include/config.h b/include/config.h
index 594832a..7bd942c 100644
--- a/include/config.h
+++ b/include/config.h
@@ -15,6 +15,7 @@ struct config
char config_path[PATH_MAX];
char dy_config_path[PATH_MAX];
char zlog_config_path[PATH_MAX];
+ char monit_file_path[PATH_MAX];
cpu_set_t cpu_set_io;
diff --git a/include/monit.h b/include/monit.h
new file mode 100644
index 0000000..b304410
--- /dev/null
+++ b/include/monit.h
@@ -0,0 +1,11 @@
+#pragma once
+#include "common.h"
+
+struct monit
+{
+ unsigned nr_stat;
+ struct record_saving_stat savint_stats[0];
+};
+
+void monit_init(unsigned int thread_num);
+struct record_saving_stat * record_saving_stat_point_get(unsigned int thread_id); \ No newline at end of file
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index dbcdbf0..645d2dc 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -9,6 +9,7 @@ set(DP_TELEMETRY_SRC
${CMAKE_CURRENT_SOURCE_DIR}/trace_output.c
${CMAKE_CURRENT_SOURCE_DIR}/kafka.c
${CMAKE_CURRENT_SOURCE_DIR}/maat.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/monit.c
${CMAKE_CURRENT_SOURCE_DIR}/mocking.c
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
diff --git a/src/config.c b/src/config.c
index 14f474f..6f970a2 100644
--- a/src/config.c
+++ b/src/config.c
@@ -11,7 +11,6 @@
#define MAX_LCORE 128
-extern struct mr_instance * mr_instance;
static struct config * g_conf = NULL;
static void main_program_realpath_get(char * absolute_path, size_t size);
@@ -73,6 +72,10 @@ void config_load()
}
snprintf(g_conf->dp_trace_dir, sizeof(g_conf->dp_trace_dir), "%s", tmp_path);
+ MESA_load_profile_string_def(config_path, "global", "monit_file_path", g_conf->monit_file_path,
+ sizeof(g_conf->monit_file_path),
+ "/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving");
+
MESA_load_profile_string_def(g_conf->device_group, "global", "device_group", g_conf->device_group,
sizeof(g_conf->device_group), "unknow");
diff --git a/src/job_ctx.c b/src/job_ctx.c
index ec69bd9..8c5b0be 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -114,9 +114,12 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke
DP_TRACE_VERIFY(cJSON_IsString(bpf_expr_obj), "bpf expr is not string");
snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_expr_obj->valuestring);
+#if 0
cJSON * pkt_cnt_max_obj = cJSON_GetObjectItem(json, "pkt_cnt_max");
DP_TRACE_VERIFY(cJSON_IsNumber(pkt_cnt_max_obj), "pkt_cnt_max is not number");
job_desc->pkt_cnt_max = pkt_cnt_max_obj->valueint;
+#endif
+ job_desc->pkt_cnt_max = 0;
cJSON * sampling_obj = cJSON_GetObjectItem(json, "sampling");
DP_TRACE_VERIFY(cJSON_IsNumber(sampling_obj), "sampling is not number");
diff --git a/src/main.c b/src/main.c
index 4fae839..af1f978 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,6 +1,7 @@
#include "common.h"
#include "config.h"
#include "maat.h"
+#include "monit.h"
#include "trace_output.h"
#include <getopt.h>
@@ -148,15 +149,15 @@ int main(int argc, char * argv[])
signal_event_init();
unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
-
- if (nr_thread != 4)
+ if (nr_thread < DP_TRACE_RING_NUM)
{
- dzlog_error("Currently, four threads must be created to read the data. This restriction will be removed "
- "later");
+ dzlog_error("The number of cores must be greater than %u", DP_TRACE_RING_NUM);
return 0;
}
-
dzlog_info("Thread Count = %d", nr_thread);
+
+ monit_init(nr_thread);
+
pthread_t tmp_pid[nr_thread];
for (int i = 0; i < nr_thread; i++)
diff --git a/src/monit.c b/src/monit.c
new file mode 100644
index 0000000..f71361d
--- /dev/null
+++ b/src/monit.c
@@ -0,0 +1,95 @@
+#include "monit.h"
+#include "config.h"
+#include <cjson/cJSON.h>
+#include <pthread.h>
+
+static struct monit * monit = NULL;
+
+static void monit_dump();
+static void * monit_loop(void * args);
+
+void monit_init(unsigned int thread_num)
+{
+ monit = calloc(1, sizeof(struct monit) + thread_num * sizeof(struct record_saving_stat));
+ DP_TRACE_VERIFY(monit != NULL, "calloc failed.");
+ monit->nr_stat = thread_num;
+
+ pthread_t tid;
+ int ret = pthread_create(&tid, NULL, monit_loop, NULL);
+ DP_TRACE_VERIFY(ret == 0, "failed to create thread for monit_loop.return value:%d", ret);
+}
+
+struct record_saving_stat * record_saving_stat_point_get(unsigned int thread_id)
+{
+ assert(thread_id < monit->nr_stat);
+ return &monit->savint_stats[thread_id];
+}
+
+static void monit_dump()
+{
+ const struct config * conf = global_config_get();
+
+ struct record_saving_stat total_stat = {};
+ for (unsigned int i = 0; i < monit->nr_stat; i++)
+ {
+ total_stat.recv_success += monit->savint_stats[i].recv_success;
+ total_stat.save_failed_at_job_deleted += monit->savint_stats[i].save_failed_at_job_deleted;
+ total_stat.save_failed_at_mutex_lock += monit->savint_stats[i].save_failed_at_mutex_lock;
+
+ total_stat.save_to_kafka_failed_at_decode_messagepack +=
+ monit->savint_stats[i].save_to_kafka_failed_at_decode_messagepack;
+ total_stat.save_to_kafka_failed_at_send += monit->savint_stats[i].save_to_kafka_failed_at_send;
+ total_stat.save_to_kafka_success += monit->savint_stats[i].save_to_kafka_success;
+
+ total_stat.save_to_file_failed_at_decode_to_str += monit->savint_stats[i].save_to_file_failed_at_decode_to_str;
+ total_stat.save_to_file_failed_at_pcapng_open += monit->savint_stats[i].save_to_file_failed_at_pcapng_open;
+ total_stat.save_to_file_failed_at_pcapng_format += monit->savint_stats[i].save_to_file_failed_at_pcapng_format;
+ total_stat.save_to_file_failed_at_write_to_disk += monit->savint_stats[i].save_to_file_failed_at_write_to_disk;
+ total_stat.save_to_file_success += monit->savint_stats[i].save_to_file_success;
+ }
+
+ struct cJSON * json_root = cJSON_CreateObject();
+ cJSON_AddNumberToObject(json_root, "recv_success", total_stat.recv_success);
+ cJSON_AddNumberToObject(json_root, "save_failed_at_job_deleted", total_stat.save_failed_at_job_deleted);
+ cJSON_AddNumberToObject(json_root, "save_failed_at_mutex_lock", total_stat.save_failed_at_mutex_lock);
+
+ cJSON_AddNumberToObject(json_root, "save_to_kafka_failed_at_decode_messagepack",
+ total_stat.save_to_kafka_failed_at_decode_messagepack);
+ cJSON_AddNumberToObject(json_root, "save_to_kafka_failed_at_send", total_stat.save_to_kafka_failed_at_send);
+ cJSON_AddNumberToObject(json_root, "save_to_kafka_success", total_stat.save_to_kafka_success);
+
+ cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_decode_to_str",
+ total_stat.save_to_file_failed_at_decode_to_str);
+ cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_pcapng_open",
+ total_stat.save_to_file_failed_at_pcapng_open);
+ cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_pcapng_format",
+ total_stat.save_to_file_failed_at_pcapng_format);
+ cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_write_to_disk",
+ total_stat.save_to_file_failed_at_write_to_disk);
+ cJSON_AddNumberToObject(json_root, "save_to_file_success", total_stat.save_to_file_success);
+
+ char * str_json_print = cJSON_Print(json_root);
+ FILE * fp_monit = fopen(conf->monit_file_path, "w");
+ if (fp_monit == NULL)
+ {
+ dzlog_warn("monit file %s open failed, cannot dump program stat info : %s", conf->monit_file_path,
+ strerror(errno));
+ return;
+ }
+
+ fprintf(fp_monit, "%s", str_json_print);
+ cJSON_Delete(json_root);
+ free(str_json_print);
+ fclose(fp_monit);
+ return;
+}
+
+static void * monit_loop(void * args)
+{
+ pthread_detach(pthread_self());
+ while (1)
+ {
+ monit_dump();
+ sleep(1);
+ }
+} \ No newline at end of file
diff --git a/src/trace_output.c b/src/trace_output.c
index ac0a63b..54631b8 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -4,6 +4,7 @@
#include "job_ctx.h"
#include "kafka.h"
#include "mocking.h"
+#include "monit.h"
#include <mpack.h>
@@ -14,9 +15,9 @@
#define BURST_MAX 64
-extern struct mr_instance * mr_instance;
static rd_kafka_t * kafka_handle = NULL;
static rd_kafka_topic_t * kafka_topic = NULL;
+__thread struct record_saving_stat * saving_stat = NULL;
int dp_trace_file_mutex_lock(job_bitmap_t job_id);
int dp_trace_file_mutex_unlock(job_bitmap_t job_id);
@@ -69,9 +70,11 @@ void dp_trace_output_init()
void * dp_trace_process_thread(void * arg)
{
+ uintptr_t thread_id = (uintptr_t)arg;
const struct config * conf = global_config_get();
+ saving_stat = record_saving_stat_point_get(thread_id);
- uintptr_t qid = (uintptr_t)arg;
+ uintptr_t qid = thread_id % DP_TRACE_RING_NUM;
marsio_buff_t * rx_buff[BURST_MAX];
marsio_buff_t * tx_buff[BURST_MAX];
@@ -89,6 +92,7 @@ void * dp_trace_process_thread(void * arg)
sleep(1);
continue;
}
+ saving_stat->recv_success += nr_recv;
dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs);
@@ -114,16 +118,31 @@ void * dp_trace_process_thread(void * arg)
{
for (unsigned int j = 0; j < nr_mbufs; j++)
{
- char * data;
+ char * data = NULL;
size_t size;
job_bitmap_t job_id = index_to_job_id(i);
dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id);
- kafka_produce(kafka_topic, (void *)data, size);
+ if (data == NULL)
+ {
+ saving_stat->save_to_kafka_failed_at_decode_messagepack++;
+ continue;
+ }
+ int ret = kafka_produce(kafka_topic, (void *)data, size);
+ if (ret != 0)
+ {
+ saving_stat->save_to_kafka_failed_at_send++;
+ }
+ else
+ {
+ saving_stat->save_to_kafka_success++;
+ }
}
}
else
{
- dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded.");
+ saving_stat->save_failed_at_job_deleted += nr_mbufs;
+ // dzlog_info("The job has been deleted. The trace content corresponding to the job has been
+ // discarded.");
}
marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
@@ -176,14 +195,16 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
unsigned int copy_cnt = 0;
marsio_buff_t * pcapng_pkt[nr_mbufs];
- if (is_job_id_used(job_id) == 0)
+ if (dp_trace_file_mutex_lock(job_id) < 0)
{
- // After the job stops, the remaining data packets in the ring are discarded.
+ saving_stat->save_failed_at_mutex_lock;
goto end;
}
- if (dp_trace_file_mutex_lock(job_id) < 0)
+ if (is_job_id_used(job_id) == 0)
{
+ // After the job stops, the remaining data packets in the ring are discarded.
+ saving_stat->save_failed_at_job_deleted += nr_mbufs;
goto end;
}
@@ -203,6 +224,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path);
if (pcapng == NULL)
{
+ saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs;
goto unlock;
}
dp_trace_output[index].pcapng = pcapng;
@@ -218,16 +240,25 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size);
if (ret < 0)
{
+ saving_stat->save_to_file_failed_at_decode_to_str++;
continue;
}
struct dp_trace_buffer_telemetry trace_buff_info;
marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info);
marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment);
- pcapng_pkt[copy_cnt++] = pkt;
+ if (pkt != NULL)
+ {
+ pcapng_pkt[copy_cnt++] = pkt;
+ }
+ else
+ {
+ saving_stat->save_to_file_failed_at_pcapng_format++;
+ }
}
- marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt);
+ marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt,
+ saving_stat);
if (dp_trace_file_reach_max_size(job_id))
{
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index accbbfa..1a6b3bd 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -7,6 +7,7 @@ set(DP_TRACE_TELEMETRY_SOURCES
${CMAKE_SOURCE_DIR}/src/trace_output.c
${CMAKE_SOURCE_DIR}/src/kafka.c
${CMAKE_SOURCE_DIR}/src/mocking.c
+ ${CMAKE_SOURCE_DIR}/src/monit.c
${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
add_executable(cmocka_test cmocka_test.c ${DP_TRACE_TELEMETRY_SOURCES})