diff options
| author | 童宗振 <[email protected]> | 2024-04-10 02:15:46 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-04-10 02:15:46 +0000 |
| commit | d0e21dbd01b6701cbc26af6da0b3e43955872423 (patch) | |
| tree | 725aa6d2cf45f5d0c5d176e28ef234635538fb48 /src/trace_output.c | |
| parent | e84923c693a5a29928ee6a4d36403350aef75233 (diff) | |
Adjust interface position
Diffstat (limited to 'src/trace_output.c')
| -rw-r--r-- | src/trace_output.c | 139 |
1 files changed, 136 insertions, 3 deletions
diff --git a/src/trace_output.c b/src/trace_output.c index 5cfa370..4f44070 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -3,6 +3,8 @@ #include "config.h" #include "job_ctx.h" +#include <mpack.h> + #include <errno.h> #include <pthread.h> #include <stdlib.h> @@ -15,7 +17,9 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id); bool dp_trace_file_reach_max_size(job_bitmap_t job_id); void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id); void dp_trace_file_rollbak(job_bitmap_t job_id); - +int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, + marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]); +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size); static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd); struct dp_trace_output @@ -90,7 +94,7 @@ void * dp_trace_process_thread(void * arg) continue; } - marsio_dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs); + dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs); for (unsigned int i = 0; i < 8; i++) { @@ -124,7 +128,8 @@ void * dp_trace_process_thread(void * arg) { char * data; size_t size; - marsio_dp_trace_decode_to_message_pack(mr_instance, rx_buff[j], &data, &size); + dp_trace_decode_to_message_pack(rx_buff[j], &data, &size); + printf("%.*s\n", data, size); #if 0 rd_kafka_produce(g_conf->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, (void *)data, size, NULL, 0, NULL); @@ -134,6 +139,49 @@ void * dp_trace_process_thread(void * arg) } } +int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs, + marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]) +{ + assert(nr_class_mbufs >= DP_TRACE_JOB_NUM_MAX * nr_mbufs); + if (unlikely(nr_class_mbufs < DP_TRACE_JOB_NUM_MAX * nr_mbufs)) + { + return -1; + } + + memset((void *)nr_jobs_mbufs, 0, DP_TRACE_JOB_NUM_MAX * sizeof(int)); + + for (unsigned int i = 0; i < nr_mbufs; i++) + { + struct dp_trace_buffer_telemetry info; + marsio_dp_trace_buffer_info_get((struct rte_mbuf *)mbufs[i], &info); + + int refcnt = -1; + + job_bitmap_t jobs_id = info.jobs_id; + refcnt += __builtin_popcount(jobs_id & UINT16_MAX); + if (refcnt > 0) + { + marsio_dp_trace_mbuf_refcnt_update(mbufs[i], refcnt); + } + + for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++) + { + uint16_t job_id_mask = 1 << j; + uint16_t job_id = jobs_id & job_id_mask; + if (job_id == 0) + { + continue; + } + + unsigned int nr_job = nr_jobs_mbufs[j]; + class_mbufs[j * nr_mbufs + nr_job] = mbufs[i]; + nr_jobs_mbufs[j]++; + } + } + + return 0; +} + void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id) { if (is_job_id_used(job_id) == 0) @@ -334,6 +382,91 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id) return pthread_mutex_unlock(&dp_trace_output[index].file_mutex); } +static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size) +{ + struct dp_trace_buffer_telemetry trace_buff_info; + marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info); + + unsigned int position = 0; + + struct timeval tv; + gettimeofday(&tv, NULL); + uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec; + + job_bitmap_t job_id = trace_buff_info.jobs_id; + + struct pkt_inner_ip_port pkt_info; + marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info); + + mpack_writer_t writer; + mpack_writer_init_growable(&writer, data, size); + mpack_build_map(&writer); + + mpack_write_cstr(&writer, "timestamp"); + mpack_write_u64(&writer, microseconds); + + char job_id_str[10]; + snprintf(job_id_str, sizeof(job_id_str), "%d", job_id); + mpack_write_cstr(&writer, "job_id"); + mpack_write_cstr(&writer, job_id_str); + + mpack_write_cstr(&writer, "src_ip"); + mpack_write_cstr(&writer, pkt_info.src_addr_str); + + mpack_write_cstr(&writer, "src_port"); + mpack_write_i32(&writer, pkt_info.src_port); + + mpack_write_cstr(&writer, "server_ip"); + mpack_write_cstr(&writer, pkt_info.dst_addr_str); + + mpack_write_cstr(&writer, "server_port"); + mpack_write_i32(&writer, pkt_info.dst_port); + + mpack_write_cstr(&writer, "packet"); + mpack_write_bin(&writer, marsio_buff_mtod(mr_mbuf), marsio_buff_datalen(mr_mbuf)); + + mpack_write_cstr(&writer, "measurements"); + mpack_build_array(&writer); + while (position < trace_buff_info.buffer_used) + { + char * cur = trace_buff_info.buffer + position; + + const struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur); + const char * comment = cur + sizeof(struct dp_trace_record_header); + const unsigned int comment_len = record_header->recode_len; + + if ((record_header->tag == DP_TRACE_RECORD_TYPE_TELEMETRY) != 0) + { + mpack_start_map(&writer, 4); + + mpack_write_cstr(&writer, "tv_sec"); + mpack_write_i32(&writer, record_header->ts.tv_sec); + + mpack_write_cstr(&writer, "tv_nsec"); + mpack_write_i32(&writer, record_header->ts.tv_nsec); + + mpack_write_cstr(&writer, "app"); + mpack_write_cstr(&writer, record_header->appsym); + + mpack_write_cstr(&writer, "comments"); + mpack_write_str(&writer, comment, comment_len); + + mpack_finish_map(&writer); + } + + position += sizeof(struct dp_trace_record_header) + comment_len; + } + mpack_complete_array(&writer); + + mpack_complete_map(&writer); + + /* finish writing */ + if (mpack_writer_destroy(&writer) != mpack_ok) + { + syslog(LOG_ERR, "An error occurred during the data path decode to message pack!\n"); + } +} + static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd) { int ret; |
