diff options
| author | tongzongzhen <[email protected]> | 2024-04-18 16:22:56 +0800 |
|---|---|---|
| committer | tongzongzhen <[email protected]> | 2024-04-18 16:22:56 +0800 |
| commit | da470aef89ff01d435132bdee50fedede6dfab5a (patch) | |
| tree | eba1a3dbfe8cd0bce866949fd4133d37f5b653b2 /src/trace_output.c | |
| parent | a9ab020ae7acdcef516e100c26633a16a179ad9d (diff) | |
add monit
Diffstat (limited to 'src/trace_output.c')
| -rw-r--r-- | src/trace_output.c | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/src/trace_output.c b/src/trace_output.c index ac0a63b..54631b8 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -4,6 +4,7 @@ #include "job_ctx.h" #include "kafka.h" #include "mocking.h" +#include "monit.h" #include <mpack.h> @@ -14,9 +15,9 @@ #define BURST_MAX 64 -extern struct mr_instance * mr_instance; static rd_kafka_t * kafka_handle = NULL; static rd_kafka_topic_t * kafka_topic = NULL; +__thread struct record_saving_stat * saving_stat = NULL; int dp_trace_file_mutex_lock(job_bitmap_t job_id); int dp_trace_file_mutex_unlock(job_bitmap_t job_id); @@ -69,9 +70,11 @@ void dp_trace_output_init() void * dp_trace_process_thread(void * arg) { + uintptr_t thread_id = (uintptr_t)arg; const struct config * conf = global_config_get(); + saving_stat = record_saving_stat_point_get(thread_id); - uintptr_t qid = (uintptr_t)arg; + uintptr_t qid = thread_id % DP_TRACE_RING_NUM; marsio_buff_t * rx_buff[BURST_MAX]; marsio_buff_t * tx_buff[BURST_MAX]; @@ -89,6 +92,7 @@ void * dp_trace_process_thread(void * arg) sleep(1); continue; } + saving_stat->recv_success += nr_recv; dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs); @@ -114,16 +118,31 @@ void * dp_trace_process_thread(void * arg) { for (unsigned int j = 0; j < nr_mbufs; j++) { - char * data; + char * data = NULL; size_t size; job_bitmap_t job_id = index_to_job_id(i); dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id); - kafka_produce(kafka_topic, (void *)data, size); + if (data == NULL) + { + saving_stat->save_to_kafka_failed_at_decode_messagepack++; + continue; + } + int ret = kafka_produce(kafka_topic, (void *)data, size); + if (ret != 0) + { + saving_stat->save_to_kafka_failed_at_send++; + } + else + { + saving_stat->save_to_kafka_success++; + } } } else { - dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded."); + saving_stat->save_failed_at_job_deleted += nr_mbufs; + // dzlog_info("The job has been deleted. The trace content corresponding to the job has been + // discarded."); } marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs); @@ -176,14 +195,16 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm unsigned int copy_cnt = 0; marsio_buff_t * pcapng_pkt[nr_mbufs]; - if (is_job_id_used(job_id) == 0) + if (dp_trace_file_mutex_lock(job_id) < 0) { - // After the job stops, the remaining data packets in the ring are discarded. + saving_stat->save_failed_at_mutex_lock; goto end; } - if (dp_trace_file_mutex_lock(job_id) < 0) + if (is_job_id_used(job_id) == 0) { + // After the job stops, the remaining data packets in the ring are discarded. + saving_stat->save_failed_at_job_deleted += nr_mbufs; goto end; } @@ -203,6 +224,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path); if (pcapng == NULL) { + saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs; goto unlock; } dp_trace_output[index].pcapng = pcapng; @@ -218,16 +240,25 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size); if (ret < 0) { + saving_stat->save_to_file_failed_at_decode_to_str++; continue; } struct dp_trace_buffer_telemetry trace_buff_info; marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info); marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment); - pcapng_pkt[copy_cnt++] = pkt; + if (pkt != NULL) + { + pcapng_pkt[copy_cnt++] = pkt; + } + else + { + saving_stat->save_to_file_failed_at_pcapng_format++; + } } - marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt); + marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt, + saving_stat); if (dp_trace_file_reach_max_size(job_id)) { |
