diff options
| author | 童宗振 <[email protected]> | 2024-04-18 08:39:56 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-18 08:39:56 +0000 |
| commit | 02248a63b89645d2a355943761b4835a2636d938 (patch) | |
| tree | eba1a3dbfe8cd0bce866949fd4133d37f5b653b2 | |
| parent | a9ab020ae7acdcef516e100c26633a16a179ad9d (diff) | |
| parent | da470aef89ff01d435132bdee50fedede6dfab5a (diff) | |
Merge branch 'add_monit' into 'master'v0.1.1-20240418
add monit
See merge request tsg/dp_telemetry_app!15
| -rw-r--r-- | etc/dp_trace.conf | 1 | ||||
| -rw-r--r-- | include/config.h | 1 | ||||
| -rw-r--r-- | include/monit.h | 11 | ||||
| -rw-r--r-- | src/CMakeLists.txt | 1 | ||||
| -rw-r--r-- | src/config.c | 5 | ||||
| -rw-r--r-- | src/job_ctx.c | 3 | ||||
| -rw-r--r-- | src/main.c | 11 | ||||
| -rw-r--r-- | src/monit.c | 95 | ||||
| -rw-r--r-- | src/trace_output.c | 51 | ||||
| -rw-r--r-- | test/CMakeLists.txt | 1 |
10 files changed, 164 insertions, 16 deletions
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf index da9d05a..9fad227 100644 --- a/etc/dp_trace.conf +++ b/etc/dp_trace.conf @@ -3,6 +3,7 @@ iocore=4,5,6,7 zlog_config_path=../etc/dp_trace_zlog.conf dp_trace_dir=./ device_group= +monit_file_path=/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving [kafka] borker_list="192.168.44.12" diff --git a/include/config.h b/include/config.h index 594832a..7bd942c 100644 --- a/include/config.h +++ b/include/config.h @@ -15,6 +15,7 @@ struct config char config_path[PATH_MAX]; char dy_config_path[PATH_MAX]; char zlog_config_path[PATH_MAX]; + char monit_file_path[PATH_MAX]; cpu_set_t cpu_set_io; diff --git a/include/monit.h b/include/monit.h new file mode 100644 index 0000000..b304410 --- /dev/null +++ b/include/monit.h @@ -0,0 +1,11 @@ +#pragma once +#include "common.h" + +struct monit +{ + unsigned nr_stat; + struct record_saving_stat savint_stats[0]; +}; + +void monit_init(unsigned int thread_num); +struct record_saving_stat * record_saving_stat_point_get(unsigned int thread_id);
\ No newline at end of file diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index dbcdbf0..645d2dc 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -9,6 +9,7 @@ set(DP_TELEMETRY_SRC ${CMAKE_CURRENT_SOURCE_DIR}/trace_output.c ${CMAKE_CURRENT_SOURCE_DIR}/kafka.c ${CMAKE_CURRENT_SOURCE_DIR}/maat.c + ${CMAKE_CURRENT_SOURCE_DIR}/monit.c ${CMAKE_CURRENT_SOURCE_DIR}/mocking.c ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) diff --git a/src/config.c b/src/config.c index 14f474f..6f970a2 100644 --- a/src/config.c +++ b/src/config.c @@ -11,7 +11,6 @@ #define MAX_LCORE 128 -extern struct mr_instance * mr_instance; static struct config * g_conf = NULL; static void main_program_realpath_get(char * absolute_path, size_t size); @@ -73,6 +72,10 @@ void config_load() } snprintf(g_conf->dp_trace_dir, sizeof(g_conf->dp_trace_dir), "%s", tmp_path); + MESA_load_profile_string_def(config_path, "global", "monit_file_path", g_conf->monit_file_path, + sizeof(g_conf->monit_file_path), + "/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving"); + MESA_load_profile_string_def(g_conf->device_group, "global", "device_group", g_conf->device_group, sizeof(g_conf->device_group), "unknow"); diff --git a/src/job_ctx.c b/src/job_ctx.c index ec69bd9..8c5b0be 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -114,9 +114,12 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke DP_TRACE_VERIFY(cJSON_IsString(bpf_expr_obj), "bpf expr is not string"); snprintf(job_desc->bpf_expr, sizeof(job_desc->bpf_expr), "%s", bpf_expr_obj->valuestring); +#if 0 cJSON * pkt_cnt_max_obj = cJSON_GetObjectItem(json, "pkt_cnt_max"); DP_TRACE_VERIFY(cJSON_IsNumber(pkt_cnt_max_obj), "pkt_cnt_max is not number"); job_desc->pkt_cnt_max = pkt_cnt_max_obj->valueint; +#endif + job_desc->pkt_cnt_max = 0; cJSON * sampling_obj = cJSON_GetObjectItem(json, "sampling"); DP_TRACE_VERIFY(cJSON_IsNumber(sampling_obj), "sampling is not number"); @@ -1,6 +1,7 @@ #include "common.h" #include "config.h" #include "maat.h" +#include "monit.h" #include "trace_output.h" #include <getopt.h> @@ -148,15 +149,15 @@ int main(int argc, char * argv[]) signal_event_init(); unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); - - if (nr_thread != 4) + if (nr_thread < DP_TRACE_RING_NUM) { - dzlog_error("Currently, four threads must be created to read the data. This restriction will be removed " - "later"); + dzlog_error("The number of cores must be greater than %u", DP_TRACE_RING_NUM); return 0; } - dzlog_info("Thread Count = %d", nr_thread); + + monit_init(nr_thread); + pthread_t tmp_pid[nr_thread]; for (int i = 0; i < nr_thread; i++) diff --git a/src/monit.c b/src/monit.c new file mode 100644 index 0000000..f71361d --- /dev/null +++ b/src/monit.c @@ -0,0 +1,95 @@ +#include "monit.h" +#include "config.h" +#include <cjson/cJSON.h> +#include <pthread.h> + +static struct monit * monit = NULL; + +static void monit_dump(); +static void * monit_loop(void * args); + +void monit_init(unsigned int thread_num) +{ + monit = calloc(1, sizeof(struct monit) + thread_num * sizeof(struct record_saving_stat)); + DP_TRACE_VERIFY(monit != NULL, "calloc failed."); + monit->nr_stat = thread_num; + + pthread_t tid; + int ret = pthread_create(&tid, NULL, monit_loop, NULL); + DP_TRACE_VERIFY(ret == 0, "failed to create thread for monit_loop.return value:%d", ret); +} + +struct record_saving_stat * record_saving_stat_point_get(unsigned int thread_id) +{ + assert(thread_id < monit->nr_stat); + return &monit->savint_stats[thread_id]; +} + +static void monit_dump() +{ + const struct config * conf = global_config_get(); + + struct record_saving_stat total_stat = {}; + for (unsigned int i = 0; i < monit->nr_stat; i++) + { + total_stat.recv_success += monit->savint_stats[i].recv_success; + total_stat.save_failed_at_job_deleted += monit->savint_stats[i].save_failed_at_job_deleted; + total_stat.save_failed_at_mutex_lock += monit->savint_stats[i].save_failed_at_mutex_lock; + + total_stat.save_to_kafka_failed_at_decode_messagepack += + monit->savint_stats[i].save_to_kafka_failed_at_decode_messagepack; + total_stat.save_to_kafka_failed_at_send += monit->savint_stats[i].save_to_kafka_failed_at_send; + total_stat.save_to_kafka_success += monit->savint_stats[i].save_to_kafka_success; + + total_stat.save_to_file_failed_at_decode_to_str += monit->savint_stats[i].save_to_file_failed_at_decode_to_str; + total_stat.save_to_file_failed_at_pcapng_open += monit->savint_stats[i].save_to_file_failed_at_pcapng_open; + total_stat.save_to_file_failed_at_pcapng_format += monit->savint_stats[i].save_to_file_failed_at_pcapng_format; + total_stat.save_to_file_failed_at_write_to_disk += monit->savint_stats[i].save_to_file_failed_at_write_to_disk; + total_stat.save_to_file_success += monit->savint_stats[i].save_to_file_success; + } + + struct cJSON * json_root = cJSON_CreateObject(); + cJSON_AddNumberToObject(json_root, "recv_success", total_stat.recv_success); + cJSON_AddNumberToObject(json_root, "save_failed_at_job_deleted", total_stat.save_failed_at_job_deleted); + cJSON_AddNumberToObject(json_root, "save_failed_at_mutex_lock", total_stat.save_failed_at_mutex_lock); + + cJSON_AddNumberToObject(json_root, "save_to_kafka_failed_at_decode_messagepack", + total_stat.save_to_kafka_failed_at_decode_messagepack); + cJSON_AddNumberToObject(json_root, "save_to_kafka_failed_at_send", total_stat.save_to_kafka_failed_at_send); + cJSON_AddNumberToObject(json_root, "save_to_kafka_success", total_stat.save_to_kafka_success); + + cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_decode_to_str", + total_stat.save_to_file_failed_at_decode_to_str); + cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_pcapng_open", + total_stat.save_to_file_failed_at_pcapng_open); + cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_pcapng_format", + total_stat.save_to_file_failed_at_pcapng_format); + cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_write_to_disk", + total_stat.save_to_file_failed_at_write_to_disk); + cJSON_AddNumberToObject(json_root, "save_to_file_success", total_stat.save_to_file_success); + + char * str_json_print = cJSON_Print(json_root); + FILE * fp_monit = fopen(conf->monit_file_path, "w"); + if (fp_monit == NULL) + { + dzlog_warn("monit file %s open failed, cannot dump program stat info : %s", conf->monit_file_path, + strerror(errno)); + return; + } + + fprintf(fp_monit, "%s", str_json_print); + cJSON_Delete(json_root); + free(str_json_print); + fclose(fp_monit); + return; +} + +static void * monit_loop(void * args) +{ + pthread_detach(pthread_self()); + while (1) + { + monit_dump(); + sleep(1); + } +}
\ No newline at end of file diff --git a/src/trace_output.c b/src/trace_output.c index ac0a63b..54631b8 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -4,6 +4,7 @@ #include "job_ctx.h" #include "kafka.h" #include "mocking.h" +#include "monit.h" #include <mpack.h> @@ -14,9 +15,9 @@ #define BURST_MAX 64 -extern struct mr_instance * mr_instance; static rd_kafka_t * kafka_handle = NULL; static rd_kafka_topic_t * kafka_topic = NULL; +__thread struct record_saving_stat * saving_stat = NULL; int dp_trace_file_mutex_lock(job_bitmap_t job_id); int dp_trace_file_mutex_unlock(job_bitmap_t job_id); @@ -69,9 +70,11 @@ void dp_trace_output_init() void * dp_trace_process_thread(void * arg) { + uintptr_t thread_id = (uintptr_t)arg; const struct config * conf = global_config_get(); + saving_stat = record_saving_stat_point_get(thread_id); - uintptr_t qid = (uintptr_t)arg; + uintptr_t qid = thread_id % DP_TRACE_RING_NUM; marsio_buff_t * rx_buff[BURST_MAX]; marsio_buff_t * tx_buff[BURST_MAX]; @@ -89,6 +92,7 @@ void * dp_trace_process_thread(void * arg) sleep(1); continue; } + saving_stat->recv_success += nr_recv; dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs); @@ -114,16 +118,31 @@ void * dp_trace_process_thread(void * arg) { for (unsigned int j = 0; j < nr_mbufs; j++) { - char * data; + char * data = NULL; size_t size; job_bitmap_t job_id = index_to_job_id(i); dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id); - kafka_produce(kafka_topic, (void *)data, size); + if (data == NULL) + { + saving_stat->save_to_kafka_failed_at_decode_messagepack++; + continue; + } + int ret = kafka_produce(kafka_topic, (void *)data, size); + if (ret != 0) + { + saving_stat->save_to_kafka_failed_at_send++; + } + else + { + saving_stat->save_to_kafka_success++; + } } } else { - dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded."); + saving_stat->save_failed_at_job_deleted += nr_mbufs; + // dzlog_info("The job has been deleted. The trace content corresponding to the job has been + // discarded."); } marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); @@ -176,14 +195,16 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm unsigned int copy_cnt = 0; marsio_buff_t * pcapng_pkt[nr_mbufs]; - if (is_job_id_used(job_id) == 0) + if (dp_trace_file_mutex_lock(job_id) < 0) { - // After the job stops, the remaining data packets in the ring are discarded. + saving_stat->save_failed_at_mutex_lock; goto end; } - if (dp_trace_file_mutex_lock(job_id) < 0) + if (is_job_id_used(job_id) == 0) { + // After the job stops, the remaining data packets in the ring are discarded. + saving_stat->save_failed_at_job_deleted += nr_mbufs; goto end; } @@ -203,6 +224,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path); if (pcapng == NULL) { + saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs; goto unlock; } dp_trace_output[index].pcapng = pcapng; @@ -218,16 +240,25 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size); if (ret < 0) { + saving_stat->save_to_file_failed_at_decode_to_str++; continue; } struct dp_trace_buffer_telemetry trace_buff_info; marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info); marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment); - pcapng_pkt[copy_cnt++] = pkt; + if (pkt != NULL) + { + pcapng_pkt[copy_cnt++] = pkt; + } + else + { + saving_stat->save_to_file_failed_at_pcapng_format++; + } } - marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt); + marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt, + saving_stat); if (dp_trace_file_reach_max_size(job_id)) { diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index accbbfa..1a6b3bd 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -7,6 +7,7 @@ set(DP_TRACE_TELEMETRY_SOURCES ${CMAKE_SOURCE_DIR}/src/trace_output.c ${CMAKE_SOURCE_DIR}/src/kafka.c ${CMAKE_SOURCE_DIR}/src/mocking.c + ${CMAKE_SOURCE_DIR}/src/monit.c ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c) add_executable(cmocka_test cmocka_test.c ${DP_TRACE_TELEMETRY_SOURCES}) |
