summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/CMakeLists.txt11
-rw-r--r--src/trace_output.c139
2 files changed, 146 insertions, 4 deletions
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ccb93af..829cbb6 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -1,4 +1,13 @@
include_directories(${CMAKE_CURRENT_SOURCE_DIR})
-add_executable(${PROJECT_NAME} main.c config.c job_ctx.c trace_output.c)
+include_directories(${CMAKE_SOURCE_DIR}/support/mpack)
+
+set(DP_TELEMETRY_SRC
+ ${CMAKE_CURRENT_SOURCE_DIR}/main.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/config.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/job_ctx.c
+ ${CMAKE_CURRENT_SOURCE_DIR}/trace_output.c
+ ${CMAKE_SOURCE_DIR}/support/mpack/mpack.c)
+
+add_executable(${PROJECT_NAME} ${DP_TELEMETRY_SRC})
target_link_libraries(${PROJECT_NAME} libmarsio maatframe MESA_prof_load libevent-static rdkafka uuid pthread) \ No newline at end of file
diff --git a/src/trace_output.c b/src/trace_output.c
index 5cfa370..4f44070 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -3,6 +3,8 @@
#include "config.h"
#include "job_ctx.h"
+#include <mpack.h>
+
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
@@ -15,7 +17,9 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id);
bool dp_trace_file_reach_max_size(job_bitmap_t job_id);
void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id);
void dp_trace_file_rollbak(job_bitmap_t job_id);
-
+int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
+ marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX]);
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size);
static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd);
struct dp_trace_output
@@ -90,7 +94,7 @@ void * dp_trace_process_thread(void * arg)
continue;
}
- marsio_dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs);
+ dp_trace_classification(mr_instance, rx_buff, nr_recv, class_mbufs, nr_class_mbufs, nr_jobs_mbufs);
for (unsigned int i = 0; i < 8; i++)
{
@@ -124,7 +128,8 @@ void * dp_trace_process_thread(void * arg)
{
char * data;
size_t size;
- marsio_dp_trace_decode_to_message_pack(mr_instance, rx_buff[j], &data, &size);
+ dp_trace_decode_to_message_pack(rx_buff[j], &data, &size);
+ printf("%.*s\n", data, size);
#if 0
rd_kafka_produce(g_conf->kafka_topic, RD_KAFKA_PARTITION_UA, RD_KAFKA_MSG_F_FREE, (void *)data, size,
NULL, 0, NULL);
@@ -134,6 +139,49 @@ void * dp_trace_process_thread(void * arg)
}
}
+int dp_trace_classification(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs,
+ marsio_buff_t * class_mbufs[], int nr_class_mbufs, int nr_jobs_mbufs[DP_TRACE_JOB_NUM_MAX])
+{
+ assert(nr_class_mbufs >= DP_TRACE_JOB_NUM_MAX * nr_mbufs);
+ if (unlikely(nr_class_mbufs < DP_TRACE_JOB_NUM_MAX * nr_mbufs))
+ {
+ return -1;
+ }
+
+ memset((void *)nr_jobs_mbufs, 0, DP_TRACE_JOB_NUM_MAX * sizeof(int));
+
+ for (unsigned int i = 0; i < nr_mbufs; i++)
+ {
+ struct dp_trace_buffer_telemetry info;
+ marsio_dp_trace_buffer_info_get((struct rte_mbuf *)mbufs[i], &info);
+
+ int refcnt = -1;
+
+ job_bitmap_t jobs_id = info.jobs_id;
+ refcnt += __builtin_popcount(jobs_id & UINT16_MAX);
+ if (refcnt > 0)
+ {
+ marsio_dp_trace_mbuf_refcnt_update(mbufs[i], refcnt);
+ }
+
+ for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++)
+ {
+ uint16_t job_id_mask = 1 << j;
+ uint16_t job_id = jobs_id & job_id_mask;
+ if (job_id == 0)
+ {
+ continue;
+ }
+
+ unsigned int nr_job = nr_jobs_mbufs[j];
+ class_mbufs[j * nr_mbufs + nr_job] = mbufs[i];
+ nr_jobs_mbufs[j]++;
+ }
+ }
+
+ return 0;
+}
+
void cli_job_mbufs_write_process(marsio_buff_t * mbufs[], int nr_mbufs, job_bitmap_t job_id)
{
if (is_job_id_used(job_id) == 0)
@@ -334,6 +382,91 @@ int dp_trace_file_mutex_unlock(job_bitmap_t job_id)
return pthread_mutex_unlock(&dp_trace_output[index].file_mutex);
}
+static void dp_trace_decode_to_message_pack(marsio_buff_t * mr_mbuf, char ** data, size_t * size)
+{
+ struct dp_trace_buffer_telemetry trace_buff_info;
+ marsio_dp_trace_buffer_info_get(mr_mbuf, &trace_buff_info);
+
+ unsigned int position = 0;
+
+ struct timeval tv;
+ gettimeofday(&tv, NULL);
+ uint64_t microseconds = (uint64_t)(tv.tv_sec) * 1000000 + tv.tv_usec;
+
+ job_bitmap_t job_id = trace_buff_info.jobs_id;
+
+ struct pkt_inner_ip_port pkt_info;
+ marsio_pkt_inner_ip_port_get(mr_mbuf, &pkt_info);
+
+ mpack_writer_t writer;
+ mpack_writer_init_growable(&writer, data, size);
+ mpack_build_map(&writer);
+
+ mpack_write_cstr(&writer, "timestamp");
+ mpack_write_u64(&writer, microseconds);
+
+ char job_id_str[10];
+ snprintf(job_id_str, sizeof(job_id_str), "%d", job_id);
+ mpack_write_cstr(&writer, "job_id");
+ mpack_write_cstr(&writer, job_id_str);
+
+ mpack_write_cstr(&writer, "src_ip");
+ mpack_write_cstr(&writer, pkt_info.src_addr_str);
+
+ mpack_write_cstr(&writer, "src_port");
+ mpack_write_i32(&writer, pkt_info.src_port);
+
+ mpack_write_cstr(&writer, "server_ip");
+ mpack_write_cstr(&writer, pkt_info.dst_addr_str);
+
+ mpack_write_cstr(&writer, "server_port");
+ mpack_write_i32(&writer, pkt_info.dst_port);
+
+ mpack_write_cstr(&writer, "packet");
+ mpack_write_bin(&writer, marsio_buff_mtod(mr_mbuf), marsio_buff_datalen(mr_mbuf));
+
+ mpack_write_cstr(&writer, "measurements");
+ mpack_build_array(&writer);
+ while (position < trace_buff_info.buffer_used)
+ {
+ char * cur = trace_buff_info.buffer + position;
+
+ const struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur);
+ const char * comment = cur + sizeof(struct dp_trace_record_header);
+ const unsigned int comment_len = record_header->recode_len;
+
+ if ((record_header->tag == DP_TRACE_RECORD_TYPE_TELEMETRY) != 0)
+ {
+ mpack_start_map(&writer, 4);
+
+ mpack_write_cstr(&writer, "tv_sec");
+ mpack_write_i32(&writer, record_header->ts.tv_sec);
+
+ mpack_write_cstr(&writer, "tv_nsec");
+ mpack_write_i32(&writer, record_header->ts.tv_nsec);
+
+ mpack_write_cstr(&writer, "app");
+ mpack_write_cstr(&writer, record_header->appsym);
+
+ mpack_write_cstr(&writer, "comments");
+ mpack_write_str(&writer, comment, comment_len);
+
+ mpack_finish_map(&writer);
+ }
+
+ position += sizeof(struct dp_trace_record_header) + comment_len;
+ }
+ mpack_complete_array(&writer);
+
+ mpack_complete_map(&writer);
+
+ /* finish writing */
+ if (mpack_writer_destroy(&writer) != mpack_ok)
+ {
+ syslog(LOG_ERR, "An error occurred during the data path decode to message pack!\n");
+ }
+}
+
static rd_kafka_t * kafka_handle_create(const char * brokerlist, const char * sasl_username, const char * sasl_passwd)
{
int ret;