diff options
| author | 童宗振 <[email protected]> | 2024-04-22 11:38:10 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-22 11:38:10 +0000 |
| commit | 2443bf8e3fb4f1c9297d260b9829ddabcc7e8cfc (patch) | |
| tree | 51663d262cc8d30f40bea3c40ba9f0e828ef8a5d /src/trace_output.c | |
| parent | 56e4a85b7f277ffd739335c0866388405b209726 (diff) | |
Port pcapng
Diffstat (limited to 'src/trace_output.c')
| -rw-r--r-- | src/trace_output.c | 56 |
1 files changed, 35 insertions, 21 deletions
diff --git a/src/trace_output.c b/src/trace_output.c index 1a2c7d5..1cdab09 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -5,10 +5,12 @@ #include "kafka.h" #include "mocking.h" #include "monit.h" +#include "pcapng.h" #include <mpack.h> #include <errno.h> +#include <fcntl.h> #include <pthread.h> #include <stdlib.h> #include <unistd.h> @@ -35,8 +37,11 @@ struct dp_trace_output char * file_path; char * file_bak_path; char * file_middle_path; - pcapng_file_t * pcapng; + struct pcapng_t * pcapng; pthread_mutex_t file_mutex; + unsigned int comment_max_size; + char * comment; + struct pcapng_enhance_packet_block * epbs[BURST_MAX]; }; static struct dp_trace_output dp_trace_output[DP_TRACE_JOB_NUM_MAX] = {}; @@ -66,6 +71,14 @@ void dp_trace_output_init() kafka_handle = kafka_handle_create(conf->broker_list, conf->sasl_password, conf->sasl_username); kafka_topic = kafka_topic_new(kafka_handle, conf->topic_name, NULL); + + for (unsigned int i = 0; i < TELEMETRY_DIM(dp_trace_output); i++) + { + dp_trace_output[i].comment_max_size = 8192; + dp_trace_output[i].comment = malloc(dp_trace_output[i].comment_max_size); + DP_TRACE_VERIFY(dp_trace_output[i].comment != NULL, "malloc failed in dp_trace_output_init:%s", + strerror(errno)); + } } void * dp_trace_process_thread(void * arg) @@ -191,9 +204,8 @@ int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id) { int ret = 0; - char * comment = NULL; - unsigned int copy_cnt = 0; - marsio_buff_t * pcapng_pkt[nr_mbufs]; + unsigned int nb_epb = 0; + struct pcapng_enhance_packet_block * epbs[nr_mbufs]; if (dp_trace_file_mutex_lock(job_id) < 0) { @@ -221,7 +233,7 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm remove(dp_trace_output[index].file_bak_path); } - pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path); + struct pcapng_t * pcapng = pcapng_open(dp_trace_output[index].file_path); if (pcapng == NULL) { saving_stat->save_to_file_failed_at_pcapng_open += nr_mbufs; @@ -230,13 +242,10 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm dp_trace_output[index].pcapng = pcapng; } - // todo: optimization - unsigned int comment_max_size = 8192; - comment = malloc(comment_max_size); - DP_TRACE_VERIFY(comment, "malloc fail, insufficient memory"); - for (unsigned int i = 0; i < nr_mbufs; i++) { + char * comment = dp_trace_output[index].comment; + unsigned int comment_max_size = dp_trace_output[index].comment_max_size; ret = dp_trace_record_decode_to_str(mbufs[i], comment, comment_max_size); if (ret < 0) { @@ -246,10 +255,10 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm struct dp_trace_buffer_telemetry trace_buff_info; marsio_dp_trace_buffer_info_get(mbufs[i], &trace_buff_info); - marsio_buff_t * pkt = marsio_dp_trace_pcapng_copy(mr_instance, mbufs[i], trace_buff_info.snaplen, comment); - if (pkt != NULL) + ret = pcapng_copy(mbufs[i], trace_buff_info.snaplen, comment, &epbs[nb_epb]); + if (ret >= 0) { - pcapng_pkt[copy_cnt++] = pkt; + nb_epb++; } else { @@ -257,8 +266,9 @@ void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitm } } - marsio_dp_trace_pcapng_write_and_free(mr_instance, dp_trace_output[index].pcapng, pcapng_pkt, copy_cnt, - saving_stat); + ret = pcapng_write_packets(dp_trace_output[index].pcapng, epbs, nb_epb); + saving_stat->save_to_file_success += ret; + saving_stat->save_to_file_failed_at_write_to_disk += (nb_epb - ret); if (dp_trace_file_reach_max_size(job_id)) { @@ -269,7 +279,10 @@ unlock: dp_trace_file_mutex_unlock(job_id); end: - free(comment); + for (unsigned int i = 0; i < nb_epb; i++) + { + free(epbs[i]); + } return; } @@ -315,7 +328,7 @@ void dp_trace_file_rollbak(job_bitmap_t job_id) unsigned int index = job_id_to_index(job_id); - marsio_dp_trace_pcapng_fclose(mr_instance, dp_trace_output[index].pcapng); + pcapng_close(dp_trace_output[index].pcapng); dp_trace_output[index].pcapng = NULL; const char * file_path = dp_trace_output[index].file_path; @@ -325,7 +338,8 @@ void dp_trace_file_rollbak(job_bitmap_t job_id) dzlog_error("rename %s to %s failed. error info: %s", file_path, file_bak_path, strerror(errno)); } - pcapng_file_t * pcapng = marsio_dp_trace_pcapng_fopen(mr_instance, dp_trace_output[index].file_path); + struct pcapng_t * pcapng = pcapng_open(file_path); + if (pcapng == NULL) { goto unlock; @@ -348,7 +362,7 @@ void dp_trace_pcapng_merger(job_bitmap_t job_id) goto end; } - marsio_dp_trace_pcapng_fclose(mr_instance, dp_trace_output[index].pcapng); + pcapng_close(dp_trace_output[index].pcapng); dp_trace_output[index].pcapng = NULL; const char * file_path = dp_trace_output[index].file_path; @@ -493,7 +507,7 @@ static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** dat const char * comment = cur + sizeof(struct dp_trace_record_header); const unsigned int comment_len = record_header->recode_len; - if ((record_header->type == DP_TRACE_RECORD_TYPE_TELEMETRY) != 0) + if ((record_header->type == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) != 0) { mpack_start_map(&writer, 4); @@ -544,7 +558,7 @@ static int dp_trace_record_decode_to_str(marsio_buff_t * mr_mbuf, char * data, u const char * str = cur + sizeof(struct dp_trace_record_header); const unsigned int str_len = record_header->recode_len; - if (record_header->type == DP_TRACE_RECORD_TYPE_TRACE) + if (record_header->type == DP_TRACE_MEASUREMENT_TYPE_TRACE) { int n = snprintf(data, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module, record_header->ts.tv_sec, record_header->ts.tv_nsec); |
