summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
authortongzongzhen <[email protected]>2024-04-18 16:22:56 +0800
committertongzongzhen <[email protected]>2024-04-18 16:22:56 +0800
commitda470aef89ff01d435132bdee50fedede6dfab5a (patch)
treeeba1a3dbfe8cd0bce866949fd4133d37f5b653b2 /src/trace_output.c
parenta9ab020ae7acdcef516e100c26633a16a179ad9d (diff)
add monit
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c51
1 files changed, 41 insertions, 10 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index ac0a63b..54631b8 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -4,6 +4,7 @@
#include "job_ctx.h"
#include "kafka.h"
#include "mocking.h"
+#include "monit.h"
#include <mpack.h>
@@ -14,9 +15,9 @@
#define BURST_MAX 64
-extern struct mr_instance * mr_instance;
static rd_kafka_t * kafka_handle = NULL;
static rd_kafka_topic_t * kafka_topic = NULL;
+__thread struct record_saving_stat * saving_stat = NULL;
int dp_trace_file_mutex_lock(job_bitmap_t job_id);
int dp_trace_file_mutex_unlock(job_bitmap_t job_id);
@@ -69,9 +70,11 @@ void dp_trace_output_init()
void * dp_trace_process_thread(void * arg)
{
+ uintptr_t thread_id = (uintptr_t)arg;
const struct config * conf = global_config_get();
+ saving_stat = record_saving_stat_point_get(thread_id);
- uintptr_t qid = (uintptr_t)arg;
+ uintptr_t qid = thread_id % DP_TRACE_RING_NUM;
marsio_buff_t * rx_buff[BURST_MAX];
marsio_buff_t * tx_buff[BURST_MAX];
@@ -89,6 +92,7 @@ void * dp_trace_process_thread(void * arg)
sleep(1);
continue;
}
+ saving_stat->recv_success += nr_recv;
dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_jobs_mbufs);
@@ -114,16 +118,31 @@ void * dp_trace_process_thread(void * arg)
{
for (unsigned int j = 0; j < nr_mbufs; j++)
{
- char * data;
+ char * data = NULL;
size_t size;
job_bitmap_t job_id = index_to_job_id(i);
dp_trace_decode_to_message_pack(class_mbufs[i][j], &data, &size, job_id);
- kafka_produce(kafka_topic, (void *)data, size);
+ if (data == NULL)
+ {
+ saving_stat->save_to_kafka_failed_at_decode_messagepack++;
+ continue;
+ }
+ int ret = kafka_produce(kafka_topic, (void *)data, size);
+ if (ret != 0)
+ {
+ saving_stat->save_to_kafka_failed_at_send++;
+ }
+ else
+ {
+ saving_stat->save_to_kafka_success++;
+ }
}
}
else
{
- dzlog_info("The job has been deleted. The trace content corresponding to the job has been discarded.");
+ saving_stat->save_failed_at_job_deleted += nr_mbufs;
+ // dzlog_info("The job has been deleted. The trace content corresponding to the job has been
+ // discarded.");
}
marsio_dp_trace_free(mr_instance, class_mbufs[i], nr_mbufs);
@@ -176,14 +195,16 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
unsigned int copy_cnt = 0;
marsio_buff_t * pcapng_pkt[nr_mbufs];
- if (is_job_id_used(job_id) == 0)
+ if (dp_trace_file_mutex_lock(job_id) < 0)
{
- // After the job stops, the remaining data packets in the ring are discarded.
+ saving_stat->save_failed_at_mutex_lock;
goto end;
}
- if (dp_trace_file_mutex_lock(job_id) < 0)
+ if (is_job_id_used(job_id) == 0)
{
+ // After the job stops, the remaining data packets in the ring are discarded.
+ saving_stat->save_failed_at_job_deleted += nr_mbufs;
goto end;
}
@@ -203,6 +224,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path);
if (pcapng == NULL)
{
+ saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs;
goto unlock;
}
dp_trace_output[index].pcapng = pcapng;
@@ -218,16 +240,25 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size);
if (ret < 0)
{
+ saving_stat->save_to_file_failed_at_decode_to_str++;
continue;
}
struct dp_trace_buffer_telemetry trace_buff_info;
marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info);
marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment);
- pcapng_pkt[copy_cnt++] = pkt;
+ if (pkt != NULL)
+ {
+ pcapng_pkt[copy_cnt++] = pkt;
+ }
+ else
+ {
+ saving_stat->save_to_file_failed_at_pcapng_format++;
+ }
}
- marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt);
+ marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt,
+ saving_stat);
if (dp_trace_file_reach_max_size(job_id))
{