summaryrefslogtreecommitdiff
path: root/src/trace_output.c
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-22 11:38:10 +0000
committer童宗振 <[email protected]>2024-04-22 11:38:10 +0000
commit2443bf8e3fb4f1c9297d260b9829ddabcc7e8cfc (patch)
tree51663d262cc8d30f40bea3c40ba9f0e828ef8a5d /src/trace_output.c
parent56e4a85b7f277ffd739335c0866388405b209726 (diff)
Port pcapng
Diffstat (limited to 'src/trace_output.c')
-rw-r--r--src/trace_output.c56
1 files changed, 35 insertions, 21 deletions
diff --git a/src/trace_output.c b/src/trace_output.c
index 1a2c7d5..1cdab09 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -5,10 +5,12 @@
#include "kafka.h"
#include "mocking.h"
#include "monit.h"
+#include "pcapng.h"
#include <mpack.h>
#include <errno.h>
+#include <fcntl.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
@@ -35,8 +37,11 @@ struct dp_trace_output
char * file_path;
char * file_bak_path;
char * file_middle_path;
- pcapng_file_t * pcapng;
+ struct pcapng_t * pcapng;
pthread_mutex_t file_mutex;
+ unsigned int comment_max_size;
+ char * comment;
+ struct pcapng_enhance_packet_block * epbs[BURST_MAX];
};
static struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {};
@@ -66,6 +71,14 @@ void dp_trace_output_init()
kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_password, conf->sasl_username);
kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL);
+
+ for (unsigned int i = 0; i < TELEMETRY_DIM(dp_trace_output); i++)
+ {
+ dp_trace_output[i].comment_max_size = 8192;
+ dp_trace_output[i].comment = malloc(dp_trace_output[i].comment_max_size);
+ DP_TRACE_VERIFY(dp_trace_output[i].comment != NULL, "malloc failed in dp_trace_output_init:%s",
+ strerror(errno));
+ }
}
void * dp_trace_process_thread(void * arg)
@@ -191,9 +204,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs
void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id)
{
int ret = 0;
- char * comment = NULL;
- unsigned int copy_cnt = 0;
- marsio_buff_t * pcapng_pkt[nr_mbufs];
+ unsigned int nb_epb = 0;
+ struct pcapng_enhance_packet_block * epbs[nr_mbufs];
if (dp_trace_file_mutex_lock(job_id) < 0)
{
@@ -221,7 +233,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
remove(dp_trace_output[index].file_bak_path);
}
- pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path);
+ struct pcapng_t * pcapng = pcapng_open(dp_trace_output[index].file_path);
if (pcapng == NULL)
{
saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs;
@@ -230,13 +242,10 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
dp_trace_output[index].pcapng = pcapng;
}
- // todo: optimization
- unsigned int comment_max_size = 8192;
- comment = malloc(comment_max_size);
- DP_TRACE_VERIFY(comment, "malloc fail, insufficient memory");
-
for (unsigned int i = 0; i < nr_mbufs; i++)
{
+ char * comment = dp_trace_output[index].comment;
+ unsigned int comment_max_size = dp_trace_output[index].comment_max_size;
ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size);
if (ret < 0)
{
@@ -246,10 +255,10 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
struct dp_trace_buffer_telemetry trace_buff_info;
marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info);
- marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment);
- if (pkt != NULL)
+ ret = pcapng_copy(mbufs[i], trace_buff_info.snaplen, comment, &epbs[nb_epb]);
+ if (ret >= 0)
{
- pcapng_pkt[copy_cnt++] = pkt;
+ nb_epb++;
}
else
{
@@ -257,8 +266,9 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm
}
}
- marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt,
- saving_stat);
+ ret = pcapng_write_packets(dp_trace_output[index].pcapng, epbs, nb_epb);
+ saving_stat->save_to_file_success += ret;
+ saving_stat->save_to_file_failed_at_write_to_disk += (nb_epb - ret);
if (dp_trace_file_reach_max_size(job_id))
{
@@ -269,7 +279,10 @@ unlock:
dp_trace_file_mutex_unlock(job_id);
end:
- free(comment);
+ for (unsigned int i = 0; i < nb_epb; i++)
+ {
+ free(epbs[i]);
+ }
return;
}
@@ -315,7 +328,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id)
unsigned int index = job_id_to_index(job_id);
- marsio_dp_trace_pcapng_fclose(mr_instance, dp_trace_output[index].pcapng);
+ pcapng_close(dp_trace_output[index].pcapng);
dp_trace_output[index].pcapng = NULL;
const char * file_path = dp_trace_output[index].file_path;
@@ -325,7 +338,8 @@ void dp_trace_file_rollbak(job_bitmap_t job_id)
dzlog_error("rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno));
}
- pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path);
+ struct pcapng_t * pcapng = pcapng_open(file_path);
+
if (pcapng == NULL)
{
goto unlock;
@@ -348,7 +362,7 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id)
goto end;
}
- marsio_dp_trace_pcapng_fclose(mr_instance, dp_trace_output[index].pcapng);
+ pcapng_close(dp_trace_output[index].pcapng);
dp_trace_output[index].pcapng = NULL;
const char * file_path = dp_trace_output[index].file_path;
@@ -493,7 +507,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat
const char * comment = cur + sizeof(struct dp_trace_record_header);
const unsigned int comment_len = record_header->recode_len;
- if ((record_header->type == DP_TRACE_RECORD_TYPE_TELEMETRY) != 0)
+ if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0)
{
mpack_start_map(&writer, 4);
@@ -544,7 +558,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u
const char * str = cur + sizeof(struct dp_trace_record_header);
const unsigned int str_len = record_header->recode_len;
- if (record_header->type == DP_TRACE_RECORD_TYPE_TRACE)
+ if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE)
{
int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module,
record_header->ts.tv_sec, record_header->ts.tv_nsec);