summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-27 01:39:14 +0000
committer童宗振 <[email protected]>2024-04-27 01:39:14 +0000
commit42c89e380c3da4b0d40d0ffc0a2e39e45ffe8356 (patch)
tree458a640a953d1293643fda81a303e4098ebdd52e
parent7803faff8547c007e93e211fe787e0a1fe01ada0 (diff)
refactor data path trace for telemetryv4.8.4-20240427
-rw-r--r--app/src/dp_trace.c264
-rw-r--r--app/src/mrb.c11
-rw-r--r--app/src/rawio.c17
-rw-r--r--app/src/version.map11
-rw-r--r--examples/l2fwd-nf.c22
-rw-r--r--include/external/marsio.h113
-rw-r--r--include/internal/mrb_define.h7
-rw-r--r--infra/include/common.h4
-rw-r--r--infra/include/dp_trace.h106
-rw-r--r--infra/include/ldbc.h38
-rw-r--r--infra/src/dp_trace.c835
-rw-r--r--infra/test/TestDataPathTrace.cc146
-rw-r--r--service/include/sc_app.h1
-rw-r--r--service/include/sc_trace.h93
-rw-r--r--service/src/app.c2
-rw-r--r--service/src/dp_trace.c261
-rw-r--r--service/src/node_bfd.c6
-rw-r--r--service/src/node_bridge.c11
-rw-r--r--service/src/node_classifier.c8
-rw-r--r--service/src/node_eth_egress.c11
-rw-r--r--service/src/node_eth_ingress.c7
-rw-r--r--service/src/node_etherfabric.c17
-rw-r--r--service/src/node_forwarder.c6
-rw-r--r--service/src/node_health_check.c24
-rw-r--r--service/src/node_lb.c5
-rw-r--r--service/src/node_phydev.c25
-rw-r--r--service/src/node_shmdev.c7
-rw-r--r--service/src/node_tera.c11
-rw-r--r--service/src/node_vwire.c16
29 files changed, 852 insertions, 1233 deletions
diff --git a/app/src/dp_trace.c b/app/src/dp_trace.c
index 3c220e3..77d162b 100644
--- a/app/src/dp_trace.c
+++ b/app/src/dp_trace.c
@@ -1,6 +1,18 @@
#include "mrapp.h"
#include "mrdp_trace.h"
+#include <libgen.h>
+#include <mpack.h>
+#include <rte_ip.h>
+#include <rte_pcapng.h>
+#include <rte_tcp.h>
+#include <rte_udp.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/stat.h>
+#include <sys/utsname.h>
+#include <unistd.h>
+
extern __thread struct mr_thread_info thread_info;
int marsio_dp_trace_init(struct mr_instance * mr_instance)
@@ -13,13 +25,14 @@ int marsio_dp_trace_init(struct mr_instance * mr_instance)
int marsio_dp_trace_record_can_emit(const marsio_buff_t * mbuf)
{
- return dp_trace_record_can_emit((struct rte_mbuf *)mbuf);
+ return dp_trace_record_can_emit((struct rte_mbuf *)mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE);
}
int marsio_dp_trace_record_emit_str(struct mr_instance * mr_instance, marsio_buff_t * mbuf, const char * module,
const char * str)
{
- struct dp_trace_record_meta meta = {.module = module, .appsym = mr_instance->appsym};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .module = module, .appsym = mr_instance->appsym};
return dp_trace_record_emit_str(mr_instance->trace, (struct rte_mbuf *)mbuf, marsio_thread_id_get(), &meta, str);
}
@@ -41,7 +54,47 @@ int marsio_dp_trace_record_emit_fmt(struct mr_instance * mr_instance, marsio_buf
return -1;
}
- struct dp_trace_record_meta meta = {.module = module, .appsym = mr_instance->appsym};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .module = module, .appsym = mr_instance->appsym};
+
+ ret = dp_trace_record_emit_str(trace, (struct rte_mbuf *)mbuf, marsio_thread_id_get(), &meta, buffer);
+ return ret;
+}
+
+int marsio_dp_trace_measurements_can_emit(__rte_unused struct mr_instance * instance, const marsio_buff_t * mbuf,
+ uint8_t measurement_type)
+{
+ return dp_trace_record_can_emit((struct rte_mbuf *)mbuf, measurement_type);
+}
+
+int marsio_dp_trace_measurement_emit_str(struct mr_instance * mr_instance, marsio_buff_t * mbuf,
+ uint8_t measurement_type, const char * module, const char * str)
+{
+ struct dp_trace_record_meta meta = {
+ .measurement_type = measurement_type, .module = module, .appsym = mr_instance->appsym};
+ return dp_trace_record_emit_str(mr_instance->trace, (struct rte_mbuf *)mbuf, marsio_thread_id_get(), &meta, str);
+}
+
+int marsio_dp_trace_measurement_emit_fmt(struct mr_instance * mr_instance, marsio_buff_t * mbuf,
+ uint8_t measurement_type, const char * module, const char * format, ...)
+{
+ struct dp_trace_process * trace = mr_instance->trace;
+
+ char buffer[512];
+ va_list args;
+ va_start(args, format);
+ int ret = vsnprintf(buffer, sizeof(buffer), format, args);
+ va_end(args);
+
+ if (unlikely(ret < 0 || ret >= sizeof(buffer)))
+ {
+ thread_id_t thread_id = marsio_thread_id_get();
+ trace->statistics[thread_id].record_emit_failed_trace_oversize++;
+ return -1;
+ }
+
+ struct dp_trace_record_meta meta = {
+ .measurement_type = measurement_type, .module = module, .appsym = mr_instance->appsym};
ret = dp_trace_record_emit_str(trace, (struct rte_mbuf *)mbuf, marsio_thread_id_get(), &meta, buffer);
return ret;
@@ -79,7 +132,7 @@ void marsio_rte_mempool_generic_put(struct rte_mempool * mp, void * const * obj_
thread_id_t marsio_thread_id_get()
{
- return thread_info.thread_id;
+ return thread_info.thread_id % RTE_MAX_LCORE;
}
cJSON * marsio_dp_trace_monit_loop(struct mr_instance * instance)
@@ -88,11 +141,15 @@ cJSON * marsio_dp_trace_monit_loop(struct mr_instance * instance)
struct dp_trace_process * trace = instance->trace;
struct dp_trace_stat monit_statistics = {};
+ uint64_t filter_exec_hit = 0;
for (unsigned int i = 0; i < RTE_MAX_LCORE; i++)
{
struct dp_trace_stat * statistics_i = &trace->statistics[i];
- monit_statistics.filter_exec_hit += statistics_i->filter_exec_hit;
+ for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++)
+ {
+ filter_exec_hit += statistics_i->filter_exec_hit[j];
+ }
monit_statistics.filter_exec_miss += statistics_i->filter_exec_miss;
monit_statistics.reach_pkt_cnt_limit += statistics_i->reach_pkt_cnt_limit;
monit_statistics.record_buf_alloc_failed_no_mem += statistics_i->record_buf_alloc_failed_no_mem;
@@ -102,11 +159,12 @@ cJSON * marsio_dp_trace_monit_loop(struct mr_instance * instance)
monit_statistics.record_emit_failed_trace_oversize += statistics_i->record_emit_failed_trace_oversize;
monit_statistics.record_emit_failed_no_space_in_buf += statistics_i->record_emit_failed_no_space_in_buf;
monit_statistics.record_emit_success += statistics_i->record_emit_success;
+ monit_statistics.record_buf_free += statistics_i->record_buf_free;
}
cJSON_AddBoolToObject(json_root, "trace_enable", trace->inst->enable);
- cJSON_AddNumberToObject(json_root, "filter_exec_hit", monit_statistics.filter_exec_hit);
+ cJSON_AddNumberToObject(json_root, "filter_exec_hit", filter_exec_hit);
cJSON_AddNumberToObject(json_root, "filter_exec_miss", monit_statistics.filter_exec_miss);
cJSON_AddNumberToObject(json_root, "reach_pkt_cnt_limit", monit_statistics.reach_pkt_cnt_limit);
@@ -124,5 +182,199 @@ cJSON * marsio_dp_trace_monit_loop(struct mr_instance * instance)
monit_statistics.record_emit_failed_no_space_in_buf);
cJSON_AddNumberToObject(json_root, "record_emit_success", monit_statistics.record_emit_success);
+ cJSON_AddNumberToObject(json_root, "record_buf_free", monit_statistics.record_buf_free);
+
return json_root;
+}
+
+int marsio_dp_trace_job_id_uesd_get(__rte_unused struct mr_instance * instance, job_bitmap_t * jobs_id)
+{
+ // Initiate a request and add a job. After the addition is successful, the job id is returned.
+ struct rte_mp_msg req_msg = {};
+ int ret = -1;
+
+ snprintf(req_msg.name, sizeof(req_msg.name), "%s", DP_TRACE_MP_MSG_NAME);
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)req_msg.param;
+ req_msg.len_param = sizeof(struct dp_trace_req);
+
+ dp_trace_req->action = DP_TRACE_JOB_ID_USED_GET;
+
+ struct rte_mp_reply mp_reply;
+ const struct timespec wait_timespec = {
+ .tv_nsec = 0,
+ .tv_sec = 30,
+ };
+
+ int tmp = rte_mp_request_sync(&req_msg, &mp_reply, &wait_timespec);
+ if (tmp < 0)
+ {
+ MR_WARNING("Failed to execute rte_mp_request_sync in marsio_dp_trace_job_get:%s", rte_strerror(rte_errno));
+ goto end;
+ }
+
+ struct dp_trace_resp * dp_trace_resp = (struct dp_trace_resp *)mp_reply.msgs->param;
+
+ if (dp_trace_resp->errcode == DP_TRACE_SUCCESS)
+ {
+ *jobs_id = dp_trace_resp->jobs_id_used_get;
+ ret = 1;
+ }
+ else
+ {
+ MR_ERROR("%s", dp_trace_strerror(dp_trace_resp->errcode));
+ }
+
+end:
+ free(mp_reply.msgs);
+ return ret;
+}
+
+job_bitmap_t marsio_dp_trace_job_add(struct mr_instance * instance, const struct dp_trace_job_desc * desc)
+{
+ // Initiate a request and add a job. After the addition is successful, the job id is returned.
+ struct rte_mp_msg req_msg = {};
+ job_bitmap_t job_id = 0;
+
+ snprintf(req_msg.name, sizeof(req_msg.name), "%s", DP_TRACE_MP_MSG_NAME);
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)req_msg.param;
+ req_msg.len_param = sizeof(struct dp_trace_req);
+
+ snprintf(dp_trace_req->appsym, sizeof(dp_trace_req->appsym), "%s", instance->appsym);
+ dp_trace_req->action = DP_TRACE_JOB_ADD;
+ struct dp_trace_job_desc * desc_copy = ZMALLOC(sizeof(struct dp_trace_job_desc));
+ MR_VERIFY_MALLOC(desc_copy);
+ memcpy(desc_copy, desc, sizeof(struct dp_trace_job_desc));
+ dp_trace_req->desc = desc_copy;
+
+ struct rte_mp_reply mp_reply;
+ const struct timespec wait_timespec = {
+ .tv_nsec = 0,
+ .tv_sec = 30,
+ };
+
+ int ret = rte_mp_request_sync(&req_msg, &mp_reply, &wait_timespec);
+ if (ret < 0)
+ {
+ MR_WARNING("Failed to execute rte_mp_request_sync in marsio_dp_trace_job_add:%s", rte_strerror(rte_errno));
+ goto end;
+ }
+
+ struct dp_trace_resp * dp_trace_resp = (struct dp_trace_resp *)mp_reply.msgs->param;
+
+ if (dp_trace_resp->errcode == DP_TRACE_SUCCESS)
+ {
+ job_id = dp_trace_resp->job_id_add;
+ }
+ else
+ {
+ MR_ERROR("%s", dp_trace_strerror(dp_trace_resp->errcode));
+ }
+
+end:
+ FREE(desc_copy);
+ free(mp_reply.msgs);
+ return job_id;
+}
+
+int marsio_dp_trace_job_del(__rte_unused struct mr_instance * instance, job_bitmap_t jobs_id)
+{
+ int ret = 0;
+
+ struct rte_mp_msg req_msg = {};
+ snprintf(req_msg.name, sizeof(req_msg.name), "%s", DP_TRACE_MP_MSG_NAME);
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)req_msg.param;
+ req_msg.len_param = sizeof(struct dp_trace_req);
+
+ dp_trace_req->action = DP_TRACE_JOB_DESTROY;
+ dp_trace_req->jobs_id_destroy = jobs_id;
+
+ struct rte_mp_reply mp_reply;
+ const struct timespec wait_timespec = {
+ .tv_nsec = 0,
+ .tv_sec = 30,
+ };
+
+ if (rte_mp_request_sync(&req_msg, &mp_reply, &wait_timespec) < 0)
+ {
+ MR_WARNING("Failed to execute rte_mp_request_sync in marsio_dp_trace_job_del:%s", rte_strerror(rte_errno));
+ goto end;
+ }
+
+end:
+ free(mp_reply.msgs);
+ return ret;
+}
+
+int marsio_dp_trace_mbuf_recv_burst(struct mr_instance * instance, queue_id_t qid, marsio_buff_t * mbufs[],
+ int nr_mbufs)
+{
+ struct dp_trace_process * dp_trace_process = instance->trace;
+ assert(qid < dp_trace_process->nr_ring);
+
+ int n = rte_ring_dequeue_burst(dp_trace_process->ring[qid], mbufs, nr_mbufs, NULL);
+ return n;
+}
+
+void marsio_dp_trace_mbuf_free(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs)
+{
+ struct dp_trace_process * dp_trace_process = instance->trace;
+
+ for (unsigned int i = 0; i < nr_mbufs; i++)
+ {
+ struct rte_mbuf * mbuf = mbufs[i];
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
+ struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
+ dp_trace_buffer->buffer_refcnt--;
+ if (dp_trace_buffer->buffer_refcnt == 0)
+ {
+ rte_mempool_put(dp_trace_process->inst->pool, (void *)dp_trace_buffer);
+ thread_id_t thread_id = marsio_thread_id_get();
+ dp_trace_process->statistics[thread_id].record_buf_free++;
+ }
+ }
+
+ rte_pktmbuf_free_bulk((struct rte_mbuf **)mbufs, nr_mbufs);
+}
+
+int marsio_dp_trace_buffer_info_get(const marsio_buff_t * mbuf, struct dp_trace_buffer_telemetry * info)
+{
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv((struct rte_mbuf *)mbuf);
+ struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
+ assert(dp_trace_buffer != NULL);
+
+ info->jobs_id = dp_trace_buffer->jobs;
+ info->snaplen = dp_trace_buffer->snaplen;
+ info->buffer = dp_trace_buffer->buffer;
+ info->buffer_len = dp_trace_buffer->buffer_len;
+ info->buffer_used = dp_trace_buffer->buffer_used;
+
+ return 0;
+}
+
+int marsio_dp_trace_mbuf_refcnt_update(const marsio_buff_t * mbuf, int16_t value)
+{
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv((struct rte_mbuf *)mbuf);
+ struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
+ assert(dp_trace_buffer != NULL);
+
+ dp_trace_buffer->buffer_refcnt += value;
+ rte_mbuf_refcnt_update((struct rte_mbuf *)mbuf, value);
+ return 0;
+}
+
+void * marsio_pkt_jump_to_innermost_layer(const marsio_buff_t * mbuf, enum complex_layer_type_id layer_id)
+{
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv((struct rte_mbuf *)mbuf);
+ struct pkt_parser_result * pkt_parser_result = &mrb_meta->pkt_parser_result;
+
+ void * ret = NULL;
+ for (int i = pkt_parser_result->nr_layers - 1; i >= 0; i--)
+ {
+ if (pkt_parser_result->layers[i].type_id == layer_id)
+ {
+ ret = rte_pktmbuf_mtod((struct rte_mbuf *)mbuf, void *) + pkt_parser_result->layers[i].offset;
+ }
+ }
+
+ return ret;
} \ No newline at end of file
diff --git a/app/src/mrb.c b/app/src/mrb.c
index 31c1102..f7eb885 100644
--- a/app/src/mrb.c
+++ b/app/src/mrb.c
@@ -29,12 +29,12 @@ void marsio_buff_ctrlzone_reset(marsio_buff_t * mr_buff)
{
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mr_buff, MR_NODE_CTRLZONE_ID);
struct dp_trace_buffer * dp_trace_buffer_ptr = mrb_meta->dp_trace_buffer;
- int8_t dp_trace_can_emit = mrb_meta->dp_trace_can_emit;
+ int8_t measurement_type = mrb_meta->measurement_type;
/* reset the ctrlzone but not dp_trace_buffer */
memset(mrbuf_cz_data(mr_buff, MR_NODE_CTRLZONE_ID), 0, sizeof(struct mrb_metadata));
mrb_meta->dp_trace_buffer = dp_trace_buffer_ptr;
- mrb_meta->dp_trace_can_emit = dp_trace_can_emit;
+ mrb_meta->measurement_type = measurement_type;
}
void * mr_buffer_ctrlzone(struct rte_mbuf * mr_buff, uint8_t id)
@@ -120,11 +120,12 @@ void marsio_buff_free_v2(struct mr_instance * instance, marsio_buff_t * buff[],
struct rte_mbuf * m = (struct rte_mbuf *)buff[i];
struct rte_mbuf * m_next = NULL;
- if (marsio_dp_trace_record_can_emit(m))
+ if (marsio_dp_trace_measurements_can_emit(instance, m, DP_TRACE_MEASUREMENT_TYPE_TRACE))
{
- marsio_dp_trace_record_emit_str(instance, m, NULL, "packet dropped by application");
- dp_trace_record_write(instance->trace, m, marsio_thread_id_get());
+ marsio_dp_trace_measurement_emit_str(instance, m, DP_TRACE_MEASUREMENT_TYPE_TRACE, NULL,
+ "packet dropped by application");
}
+ dp_trace_record_write(instance->trace, m, marsio_thread_id_get());
__rte_mbuf_sanity_check(m, 1);
while (m != NULL)
diff --git a/app/src/rawio.c b/app/src/rawio.c
index d322b49..4a17e4f 100644
--- a/app/src/rawio.c
+++ b/app/src/rawio.c
@@ -23,8 +23,8 @@ static inline unsigned int packet_total_len(struct rte_mbuf * mbufs[], unsigned
return total_len;
}
-int mrapp_packet_fast_send_burst(struct mr_instance * instance,
- struct vdev_instance * vdi, queue_id_t qid, struct rte_mbuf * mbufs[], int nr_mbufs)
+int mrapp_packet_fast_send_burst(struct mr_instance * instance, struct vdev_instance * vdi, queue_id_t qid,
+ struct rte_mbuf * mbufs[], int nr_mbufs)
{
hash_t mbufs_hash[MR_BURST_MAX];
for (int i = 0; i < nr_mbufs; i++)
@@ -103,10 +103,10 @@ int marsio_recv_burst(struct mr_vdev * vdev, queue_id_t qid, marsio_buff_t * mbu
{
mbufs[i] = rx_buffer->mbufs[rx_buffer->curser + i];
- if (unlikely(marsio_dp_trace_record_can_emit(mbufs[i])))
+ if (unlikely(marsio_dp_trace_measurements_can_emit(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
- marsio_dp_trace_record_emit_fmt(vdev->instance, mbufs[i], "marsio_recv", "packet rx, dev=%s, qid=%u",
- vdev->devsym, qid);
+ marsio_dp_trace_measurement_emit_fmt(vdev->instance, mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE,
+ "marsio_recv", "packet rx, dev=%s, qid=%u", vdev->devsym, qid);
}
}
@@ -142,10 +142,11 @@ int marsio_send_buffer_flush(struct mr_vdev * vdev, queue_id_t sid)
for (int i = 0; i < tx_buffer->length; i++)
{
hash[i] = tx_buffer->mbufs[i]->hash.usr;
- if (marsio_dp_trace_record_can_emit(tx_buffer->mbufs[i]))
+ if (marsio_dp_trace_measurements_can_emit(instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE))
{
- marsio_dp_trace_record_emit_fmt(vdev->instance, tx_buffer->mbufs[i], "marsio_send",
- "packet tx, dev=%s , qid=%u, hash=%u", vdev->devsym, sid, hash[i]);
+ marsio_dp_trace_measurement_emit_fmt(vdev->instance, tx_buffer->mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE,
+ "marsio_send", "packet tx, dev=%s , qid=%u, hash=%u", vdev->devsym,
+ sid, hash[i]);
}
}
diff --git a/app/src/version.map b/app/src/version.map
index 49adbfe..07bc6eb 100644
--- a/app/src/version.map
+++ b/app/src/version.map
@@ -88,6 +88,17 @@ global:
marsio_dp_trace_record_can_emit;
marsio_dp_trace_record_emit_str;
marsio_dp_trace_record_emit_fmt;
+ marsio_dp_trace_measurements_can_emit;
+ marsio_dp_trace_measurement_emit_str;
+ marsio_dp_trace_measurement_emit_fmt;
+ marsio_dp_trace_job_id_uesd_get;
+ marsio_dp_trace_job_add;
+ marsio_dp_trace_job_del;
+ marsio_dp_trace_mbuf_recv_burst;
+ marsio_dp_trace_mbuf_free;
+ marsio_dp_trace_buffer_info_get;
+ marsio_pkt_jump_to_innermost_layer;
+ marsio_dp_trace_mbuf_refcnt_update;
local: *;
};
diff --git a/examples/l2fwd-nf.c b/examples/l2fwd-nf.c
index 903a0b7..6c245e2 100644
--- a/examples/l2fwd-nf.c
+++ b/examples/l2fwd-nf.c
@@ -56,10 +56,20 @@ void * l2fwd_loop(void * arg)
for (int i = 0; i < ret; i++)
{
- if (marsio_dp_trace_record_can_emit(rx_buff[i]))
+ if (marsio_dp_trace_measurements_can_emit(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY))
{
- marsio_dp_trace_record_emit_str(mr_instance, rx_buff[i], "I2fwd-nf", "test data path trace function");
- marsio_dp_trace_record_emit_fmt(mr_instance, rx_buff[i], "I2fwd-nf", "test format recod %s", "hello");
+ marsio_dp_trace_measurement_emit_str(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY,
+ "I2fwd-nf", "test data path trace telemetry");
+ marsio_dp_trace_measurement_emit_str(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TELEMETRY,
+ "I2fwd-nf", "test data path trace telemetry 2");
+ }
+
+ if (marsio_dp_trace_measurements_can_emit(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TRACE))
+ {
+ marsio_dp_trace_measurement_emit_fmt(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TRACE,
+ "I2fwd-nf", "test format recod %s", "hello");
+ marsio_dp_trace_measurement_emit_fmt(mr_instance, rx_buff[i], DP_TRACE_MEASUREMENT_TYPE_TRACE,
+ "I2fwd-nf", "test format recod %s", "hello 2");
}
if (opt_dump_packet_metadata)
@@ -101,10 +111,10 @@ void * l2fwd_loop(void * arg)
marsio_buff_set_sid_list(deep_copy_buff, sids, nr_sids);
marsio_dp_trace_filter_exec(mr_instance, deep_copy_buff);
- if (marsio_dp_trace_record_can_emit(deep_copy_buff))
+ if (marsio_dp_trace_measurements_can_emit(mr_instance, deep_copy_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE))
{
- marsio_dp_trace_record_emit_str(mr_instance, deep_copy_buff, "I2fwd-nf",
- "test for sending message directly");
+ marsio_dp_trace_measurement_emit_str(mr_instance, deep_copy_buff, DP_TRACE_MEASUREMENT_TYPE_TRACE,
+ "I2fwd-nf", "test for sending message directly");
}
tx_buff[i] = deep_copy_buff;
diff --git a/include/external/marsio.h b/include/external/marsio.h
index a928f00..0a54c20 100644
--- a/include/external/marsio.h
+++ b/include/external/marsio.h
@@ -118,6 +118,25 @@ enum mr_buff_metadata_type
MR_BUFF_USER_0 = 254
};
+enum complex_layer_type_id
+{
+ LAYER_TYPE_ID_ETHER,
+ LAYER_TYPE_ID_PPP,
+ LAYER_TYPE_ID_HDLC,
+ LAYER_TYPE_ID_VLAN,
+ LAYER_TYPE_ID_PPPOE,
+ LAYER_TYPE_ID_MPLS,
+ LAYER_TYPE_ID_IPV4,
+ LAYER_TYPE_ID_IPV6,
+ LAYER_TYPE_ID_UDP,
+ LAYER_TYPE_ID_TCP,
+ LAYER_TYPE_ID_ICMP,
+ LAYER_TYPE_ID_ICMP6,
+ LAYER_TYPE_ID_GRE,
+ LAYER_TYPE_ID_G_VXLAN,
+ LAYER_TYPE_ID_GTPV1_U,
+};
+
#ifdef __cplusplus
extern "C"
{
@@ -215,9 +234,6 @@ void marsio_buff_chain_pkt(marsio_buff_t * pkt, marsio_buff_t * next);
uint16_t marsio_buff_headroom(const marsio_buff_t * m);
uint16_t marsio_buff_tailroom(const marsio_buff_t * m);
-marsio_buff_t * marsio_buff_getnext_seg(marsio_buff_t * m);
-marsio_buff_t * marsio_buff_getnext_pkt(marsio_buff_t * m);
-
char * marsio_buff_mtod(marsio_buff_t * m);
uint32_t marsio_buff_buflen(marsio_buff_t * m);
@@ -298,12 +314,93 @@ int marsio_buff_prepend_sid_list(marsio_buff_t * m, sid_t * slist, uint8_t sz_sl
int marsio_buff_get_current_sid(marsio_buff_t * m, sid_t * sid);
/******************************** data path trace**************************************/
+__attribute__((deprecated(
+ "use marsio_dp_trace_measurements_can_emit replace.the current interface will be removed in 24.06."))) int
+marsio_dp_trace_record_can_emit(const marsio_buff_t * mbuf);
+
+__attribute__((
+ deprecated("use marsio_dp_trace_measurement_emit_str replace.the current interface will be removed in 24.06."))) int
+marsio_dp_trace_record_emit_str(struct mr_instance * instance, marsio_buff_t * mbuf, const char * module,
+ const char * str);
+__attribute__((
+ deprecated("use marsio_dp_trace_measurement_emit_fmt replace.the current interface will be removed in 24.06."))) int
+marsio_dp_trace_record_emit_fmt(struct mr_instance * instance, marsio_buff_t * mbuf, const char * module,
+ const char * format, ...);
+
+#define DP_TRACE_MEASUREMENT_TYPE_TRACE (1 << 0)
+#define DP_TRACE_MEASUREMENT_TYPE_TELEMETRY (1 << 1)
+
void marsio_dp_trace_filter_exec(struct mr_instance * instance, marsio_buff_t * mbuf);
-int marsio_dp_trace_record_can_emit(const marsio_buff_t * mbuf);
-int marsio_dp_trace_record_emit_str(struct mr_instance * instance, marsio_buff_t * mbuf, const char * module,
- const char * str);
-int marsio_dp_trace_record_emit_fmt(struct mr_instance * instance, marsio_buff_t * mbuf, const char * module,
- const char * format, ...);
+
+int marsio_dp_trace_measurements_can_emit(struct mr_instance * instance, const marsio_buff_t * mbuf,
+ uint8_t measurement_type);
+
+int marsio_dp_trace_measurement_emit_str(struct mr_instance * instance, marsio_buff_t * mbuf, uint8_t measurement_type,
+ const char * module, const char * str);
+
+int marsio_dp_trace_measurement_emit_fmt(struct mr_instance * instance, marsio_buff_t * mbuf, uint8_t measurement_type,
+ const char * module, const char * format, ...);
+
+/////////////////////// only for data path trace telemetry ////////////
+#ifndef MR_BPF_EXPRESSION_MAX
+#define MR_BPF_EXPRESSION_MAX 128
+#endif
+
+#define DP_TRACE_JOB_NUM_MAX 16
+
+#define DP_TRACE_RING_NUM 4
+
+typedef uint16_t job_bitmap_t;
+typedef void pcapng_file_t;
+
+struct dp_trace_buffer_telemetry
+{
+ job_bitmap_t jobs_id;
+ unsigned int snaplen;
+ char * buffer;
+ uint16_t buffer_len;
+ uint16_t buffer_used;
+};
+
+struct dp_trace_record_header
+{
+ uint8_t measurement_type;
+ char appsym[16];
+ char module[16];
+ struct timespec ts;
+ uint16_t recode_len;
+};
+
+struct dp_trace_job_desc
+{
+ bool enable;
+ uint8_t measurement_type;
+ int8_t rule_index;
+ char bpf_expr[MR_BPF_EXPRESSION_MAX];
+ unsigned int pkt_cnt_max;
+ unsigned int sampling;
+ unsigned int snaplen;
+};
+
+void * marsio_pkt_jump_to_innermost_layer(const marsio_buff_t * mbuf, enum complex_layer_type_id layer_id);
+
+int marsio_dp_trace_job_id_uesd_get(struct mr_instance * instance, job_bitmap_t * jobs_id);
+
+job_bitmap_t marsio_dp_trace_job_add(struct mr_instance * instance, const struct dp_trace_job_desc * desc);
+
+int marsio_dp_trace_job_del(struct mr_instance * instance, job_bitmap_t job_id);
+
+int marsio_dp_trace_mbuf_recv_burst(struct mr_instance * instance, queue_id_t qid, marsio_buff_t * mbufs[],
+ int nr_mbufs);
+
+void marsio_dp_trace_mbuf_free(struct mr_instance * instance, marsio_buff_t * mbufs[], int nr_mbufs);
+
+int marsio_dp_trace_buffer_info_get(const marsio_buff_t * mbuf, struct dp_trace_buffer_telemetry * info);
+
+int marsio_dp_trace_mbuf_refcnt_update(const marsio_buff_t * mbuf, int16_t value);
+
+/////////////////////// only for data path trace telemetry ////////////
+
/***********************************************************************************/
#ifdef __cplusplus
diff --git a/include/internal/mrb_define.h b/include/internal/mrb_define.h
index 78057cf..cad8c44 100644
--- a/include/internal/mrb_define.h
+++ b/include/internal/mrb_define.h
@@ -18,7 +18,7 @@ struct mrb_zone_idx
struct mrb_metadata
{
- RTE_MARKER cacheline0;
+ RTE_MARKER cacheline0;
/* status */
uint8_t dir : 1;
@@ -49,10 +49,9 @@ struct mrb_metadata
uint16_t cur_sid;
struct sid_list sid_list;
- int8_t dp_trace_can_emit; // -1(cant emit), 0(unknow), 1(can emit)
+ uint8_t measurement_type;
-
- RTE_MARKER cacheline1 __rte_cache_min_aligned;
+ RTE_MARKER cacheline1 __rte_cache_min_aligned;
/* pkt_parser result */
struct pkt_parser_result pkt_parser_result;
diff --git a/infra/include/common.h b/infra/include/common.h
index ae982ff..2a0c919 100644
--- a/infra/include/common.h
+++ b/infra/include/common.h
@@ -77,10 +77,6 @@ extern "C"
#define MR_MEMPOOL_COUNT_MAX 64
#endif
-#ifndef MR_BPF_EXPRESSION_MAX
-#define MR_BPF_EXPRESSION_MAX 128
-#endif
-
typedef uint64_t cpu_mask_t;
typedef uint32_t port_id_t;
typedef uint32_t queue_id_t;
diff --git a/infra/include/dp_trace.h b/infra/include/dp_trace.h
index 1049421..689023d 100644
--- a/infra/include/dp_trace.h
+++ b/infra/include/dp_trace.h
@@ -13,7 +13,6 @@
* todo: introduce data path trace design and usage
*/
-#define DP_TRACE_JOB_NUM_MAX 16
#define DP_TRACE_RING_SIZE_MAX 4096
#define DP_TRACE_POOL_NAME "dp_trace_pool"
@@ -25,15 +24,41 @@
#define DP_TRACE_ALL_JOBS UINT16_MAX
-typedef uint16_t job_bitmap_t;
+#define DP_TRACE_MP_MSG_NAME "data_path_trace"
-struct dp_trace_job_desc
+#define DP_TRACE_SUCCESS 0
+#define DP_TRACE_ERROR_NO_ENOUGH_JOB_ID 1
+#define DP_TRACE_ERROR_JOB_ID_IN_USED 2
+#define DP_TRACE_ERROR_ILLEGAL_JOB_ID 3
+#define DP_TRACE_ERROR_ILLEGAL_BPF_EXPR 4
+#define DP_TRACE_ERROR_MAX 5
+
+#define DP_TRACE_MEASUREMENT_TYPE_UNKNOW (0)
+#define DP_TRACE_MEASUREMENT_TYPE_UNMATCH (1 << 3)
+#define DP_TRACE_MEASUREMENT_TYPE_MATCHED (DP_TRACE_MEASUREMENT_TYPE_TRACE | DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)
+
+enum dp_trace_req_action
{
- bool enable;
- char bpf_expr[MR_BPF_EXPRESSION_MAX];
- unsigned int pkt_cnt_max;
- unsigned int sampling;
- unsigned int snaplen;
+ DP_TRACE_INSTANCE_GET,
+ DP_TRACE_JOB_ID_USED_GET,
+ DP_TRACE_JOB_ADD,
+ DP_TRACE_JOB_DESTROY
+};
+
+struct dp_trace_req
+{
+ char appsym[MR_SYMBOL_MAX];
+ enum dp_trace_req_action action;
+ const struct dp_trace_job_desc * desc;
+ job_bitmap_t jobs_id_destroy;
+};
+
+struct dp_trace_resp
+{
+ int16_t errcode;
+ void * trace_instance;
+ job_bitmap_t jobs_id_used_get;
+ job_bitmap_t job_id_add;
};
struct dp_trace_job_ctx
@@ -46,13 +71,14 @@ struct dp_trace_job_ctx
struct dp_trace_stat
{
- uint64_t filter_exec_hit;
+ uint64_t filter_exec_hit[DP_TRACE_JOB_NUM_MAX];
uint64_t filter_exec_miss;
uint64_t reach_pkt_cnt_limit;
uint64_t record_buf_alloc_failed_no_mem;
uint64_t record_buf_alloc_success;
+ uint64_t record_buf_free;
uint64_t record_emit_failed_no_space_in_buf;
uint64_t record_emit_failed_trace_oversize;
@@ -64,43 +90,26 @@ struct dp_trace_stat
// uint64_t uncategorized_failed;
} __rte_cache_aligned;
-struct dp_trace_saving_stat
-{
- uint64_t save_to_file_failed_other;
- uint64_t save_to_file_failed_at_pcapng_format;
- uint64_t save_to_file_failed_at_write_to_disk;
- uint64_t save_to_file_success;
-} __rte_cache_aligned;
-
struct dp_trace_instance
{
- bool enable;
- struct dp_trace_job_ctx job_ctx[DP_TRACE_JOB_NUM_MAX];
- struct rte_ring * ring;
+ uint16_t enable;
+ uint16_t nr_ring;
+ struct rte_ring * ring[DP_TRACE_RING_NUM];
struct rte_mempool * pool;
struct rte_mempool * dump_pool;
-
- pthread_mutex_t trace_file_mutex;
- unsigned int trace_file_max_size;
- char trace_file_path[PATH_MAX];
- char trace_file_bak_path[PATH_MAX];
- rte_pcapng_t * pcapng;
- unsigned int trace_merge_timeout;
-};
+ uint16_t nr_job_ctx;
+ struct dp_trace_job_ctx job_ctx[DP_TRACE_JOB_NUM_MAX];
+} __rte_cache_aligned;
// Memory alignment is very important here; it can avoid cache misses; do not remove
struct dp_trace_process
{
struct dp_trace_instance * inst;
- struct rte_ring * ring;
-
- rte_atomic16_t save_thread_still_run;
- pthread_t save_trace_file_thread;
+ struct rte_ring * ring[DP_TRACE_RING_NUM];
+ uint16_t nr_ring;
RTE_MARKER cacheline1 __rte_cache_min_aligned;
- sem_t sem;
- struct dp_trace_saving_stat saving_statistics;
struct dp_trace_stat statistics[RTE_MAX_LCORE];
} __rte_cache_aligned;
@@ -122,31 +131,22 @@ struct dp_trace_buffer
/* record buffer*/
uint16_t buffer_len;
uint16_t buffer_used;
+ uint16_t buffer_refcnt;
char buffer[0]; // Arrays of Length Zero. Other members must be above buffer[0]
};
struct dp_trace_record_meta
{
+ uint8_t measurement_type;
const char * appsym;
const char * module;
const char * function;
};
-struct dp_trace_record_header
-{
- char appsym[16];
- char module[16];
- uint32_t custom_id_1;
- uint32_t custom_id_2;
-
- struct timespec ts;
- uint16_t recode_len;
-};
-
-static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf)
+static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf, uint8_t measurement_type)
{
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (unlikely(mrb_meta->dp_trace_can_emit == 1))
+ if (unlikely(mrb_meta->measurement_type & measurement_type))
{
return 1;
}
@@ -158,7 +158,7 @@ static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf)
struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_tpye);
// use dp_trace_des initialize instance
-int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc, unsigned int job_index);
+int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc);
// Check whether the current packet matches the bpf expression
// Note: This function can only be called once for an rte_mbuf package.
@@ -169,7 +169,6 @@ job_bitmap_t dp_trace_job_id_bitmap_get(struct dp_trace_process * trace, struct
// When constructing the string parameters of dp_trace_record_emit_str is time-consuming, before calling
// dp_trace_record_emit_str, you can call this function to determine whether dp_trace_record_emit_str is valid.
-// int dp_trace_record_can_emit(const struct rte_mbuf * mbuf);
// Record information to different jobs buffer
int dp_trace_record_emit_str(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id,
@@ -180,20 +179,13 @@ int dp_trace_record_emit_fmt(struct dp_trace_process * trace, struct rte_mbuf *
void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id);
-// Save the contents of the buffer to a file
-int dp_trace_record_flush(struct dp_trace_process * trace);
-
// Clear instance. If there is a trace that is not saved, it will be saved automatically.
int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs);
// for unit test
int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_meta * meta, const char * record);
-int dp_trace_record_decode(struct rte_mbuf * mbuf, char * dst, unsigned int size);
// only for infra
void infra_rte_pktmbuf_free(struct rte_mbuf * mbuf);
-void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count);
-// for config
-bool dp_trace_start(struct dp_trace_process * trace);
-void dp_trace_stop(struct dp_trace_process * trace); \ No newline at end of file
+const char * dp_trace_strerror(unsigned int err); \ No newline at end of file
diff --git a/infra/include/ldbc.h b/infra/include/ldbc.h
index cc115e1..f039e9c 100644
--- a/infra/include/ldbc.h
+++ b/infra/include/ldbc.h
@@ -1,5 +1,5 @@
#pragma once
-
+#include "marsio.h"
#include <assert.h>
#include <rte_mbuf.h>
#include <stdint.h>
@@ -37,25 +37,6 @@ enum e_hash_mode
LDBC_HASH_MAX
};
-enum complex_layer_type_id
-{
- LAYER_TYPE_ID_ETHER,
- LAYER_TYPE_ID_PPP,
- LAYER_TYPE_ID_HDLC,
- LAYER_TYPE_ID_VLAN,
- LAYER_TYPE_ID_PPPOE,
- LAYER_TYPE_ID_MPLS,
- LAYER_TYPE_ID_IPV4,
- LAYER_TYPE_ID_IPV6,
- LAYER_TYPE_ID_UDP,
- LAYER_TYPE_ID_TCP,
- LAYER_TYPE_ID_ICMP,
- LAYER_TYPE_ID_ICMP6,
- LAYER_TYPE_ID_GRE,
- LAYER_TYPE_ID_G_VXLAN,
- LAYER_TYPE_ID_GTPV1_U,
-};
-
enum complex_layer_type_mask
{
/* 数据链路层 */
@@ -90,8 +71,8 @@ enum complex_layer_type_mask
LAYER_TYPE_GTPV1_U = 1 << 14,
/* ALL */
- LAYER_TYPE_ALL =(LAYER_TYPE_L2 | LAYER_TYPE_L2_TUN | LAYER_TYPE_L3 |
- LAYER_TYPE_L4 | LAYER_TYPE_G_VXLAN | LAYER_TYPE_GTPV1_U),
+ LAYER_TYPE_ALL =
+ (LAYER_TYPE_L2 | LAYER_TYPE_L2_TUN | LAYER_TYPE_L3 | LAYER_TYPE_L4 | LAYER_TYPE_G_VXLAN | LAYER_TYPE_GTPV1_U),
};
#define MR_PKT_PARSER_LAYERS_MAX 16
@@ -144,8 +125,7 @@ struct distributer
};
static inline void pkt_parser_init(struct pkt_parser * pkt_parser, struct pkt_parser_result * result,
- enum complex_layer_type_mask expect_layer_type,
- unsigned int nr_expect_results)
+ enum complex_layer_type_mask expect_layer_type, unsigned int nr_expect_results)
{
/* read only parameters */
pkt_parser->expect_layer_mask = expect_layer_type;
@@ -164,14 +144,14 @@ struct distributer * distributer_create(enum e_dist_mode distmode, enum e_hash_m
int distributer_rss_key_setup(struct distributer * dist_object, const uint8_t * key);
static inline int distributer_calculate_from_parser_results(struct distributer * dist_object, struct rte_mbuf * mbufs[],
- struct pkt_parser_result * parser_results[],
- unsigned int nr_mbufs)
+ struct pkt_parser_result * parser_results[],
+ unsigned int nr_mbufs)
{
return dist_object->fn_distributer(dist_object, mbufs, parser_results, nr_mbufs);
}
static inline int distributer_calculate(struct distributer * dist_object, struct rte_mbuf * mbufs[],
- unsigned int nr_mbufs)
+ unsigned int nr_mbufs)
{
struct pkt_parser parser_handlers[nr_mbufs];
struct pkt_parser_result parser_results[nr_mbufs];
@@ -244,8 +224,8 @@ static inline void * complex_layer_jump_to_innermost(struct pkt_parser_result *
return NULL;
}
-static inline int complex_layer_type_expect(struct pkt_parser_result * pkt_parser_result, const uint16_t * expect_layers,
- unsigned int nr_expect_layers)
+static inline int complex_layer_type_expect(struct pkt_parser_result * pkt_parser_result,
+ const uint16_t * expect_layers, unsigned int nr_expect_layers)
{
unsigned int iter_pkt_result;
unsigned int iter_expect_layers;
diff --git a/infra/src/dp_trace.c b/infra/src/dp_trace.c
index 94b2b8b..ea0a748 100644
--- a/infra/src/dp_trace.c
+++ b/infra/src/dp_trace.c
@@ -1,7 +1,6 @@
#include "dp_trace.h"
#include "common.h"
#include "mrb_define.h"
-#include "pcapng_proto.h"
#include <libgen.h>
#include <linux/if_ether.h>
#include <rte_common.h>
@@ -12,35 +11,8 @@
#include <sys/utsname.h>
#include <unistd.h>
-static void * dp_trace_save_thread(void * arg);
-static struct dp_trace_instance * dp_trace_instance_create();
-static int trace_file_mutex_lock(struct dp_trace_process * trace);
-static int dp_trace_file_mutex_unlock(struct dp_trace_process * trace);
static inline bool dp_trace_is_disable(struct dp_trace_process * trace);
-/* copy from dpdk-23.11.After upgrading dpdk, you can delete the following content */
-static struct rte_mbuf * dpdk_23_rte_pcapng_copy(uint16_t port_id, uint32_t queue, const struct rte_mbuf * md,
- struct rte_mempool * mp, uint32_t length,
- enum rte_pcapng_direction direction, const char * comment);
-static int dpdk_23_pcapng_vlan_insert(struct rte_mbuf * m, uint16_t ether_type, uint16_t tci);
-static char * os_info_get(void);
-static void dp_trace_file_merge(struct dp_trace_process * trace);
-
-#if 0
-static bool is_directory_exists(const char * path)
-{
- struct stat info;
- if (stat(path, &info) != 0)
- return false;
- return S_ISDIR(info.st_mode) == 1;
-}
-#endif
-
-static bool is_file_exists(const char * path)
-{
- return access(path, F_OK) == 0;
-}
-
static struct dp_trace_instance * dp_trace_instance_create()
{
static_assert(DP_TRACE_JOB_NUM_MAX <= 16, "DP_TRACE_JOB_NUM_MAX must be no greater than 16");
@@ -59,21 +31,21 @@ static struct dp_trace_instance * dp_trace_instance_create()
MR_VERIFY_MALLOC(instance->pool);
MR_VERIFY_MALLOC(instance->dump_pool);
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&instance->trace_file_mutex, &attr);
-
for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++)
{
instance->job_ctx[i].job_id = 1 << (i);
}
- instance->ring = rte_ring_create(DP_TRACE_RING_NAME, DP_TRACE_RING_SIZE_MAX, rte_socket_id(), RING_F_SC_DEQ);
- MR_VERIFY_MALLOC(instance->ring);
+ instance->nr_ring = DP_TRACE_RING_NUM;
+ for (unsigned int i = 0; i < instance->nr_ring; i++)
+ {
+ char ring_name[64];
+ snprintf(ring_name, sizeof(ring_name), "%s_%u", DP_TRACE_RING_NAME, i);
+ instance->ring[i] = rte_ring_create(ring_name, DP_TRACE_RING_SIZE_MAX, rte_socket_id(), 0);
+ MR_VERIFY_MALLOC(instance->ring[i]);
+ }
- instance->enable = false;
+ instance->enable = 0;
return instance;
}
@@ -83,21 +55,6 @@ struct dp_trace_process * dp_trace_process_create_in_serv(void)
struct dp_trace_process * trace = ZMALLOC(sizeof(struct dp_trace_process));
MR_VERIFY_MALLOC(trace);
trace->inst = dp_trace_instance_create();
-
- if (sem_init(&trace->sem, 0, 0) < 0)
- {
- MR_ERROR("sem init fail.");
- return NULL;
- }
-
- rte_atomic16_set(&trace->save_thread_still_run, 1);
-
- if (pthread_create(&trace->save_trace_file_thread, NULL, dp_trace_save_thread, trace) != 0)
- {
- MR_ERROR("Failed to create thread to save trace: %s", strerror(errno));
- return NULL;
- }
-
return trace;
}
@@ -106,24 +63,28 @@ struct dp_trace_process * dp_trace_process_create_in_app(void)
struct dp_trace_process * trace = ZMALLOC(sizeof(struct dp_trace_process));
MR_VERIFY_MALLOC(trace);
+ struct rte_mp_msg req_msg = {};
+ snprintf(req_msg.name, sizeof(req_msg.name), "%s", DP_TRACE_MP_MSG_NAME);
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)req_msg.param;
+ req_msg.len_param = sizeof(struct dp_trace_req);
+
+ dp_trace_req->action = DP_TRACE_INSTANCE_GET;
+
struct rte_mp_reply mp_reply;
const struct timespec wait_timespec = {
.tv_nsec = 0,
.tv_sec = 30,
};
- struct rte_mp_msg req = {};
- snprintf(req.name, sizeof(req.name), "%s", "data_path_trace");
- int ret = rte_mp_request_sync(&req, &mp_reply, &wait_timespec);
+ int ret = rte_mp_request_sync(&req_msg, &mp_reply, &wait_timespec);
if (ret < 0)
{
MR_WARNING("Failed to execute rte_mp_request_sync:%s", rte_strerror(rte_errno));
goto err;
}
- uintptr_t stored_ptr_address;
- memcpy(&stored_ptr_address, mp_reply.msgs->param, sizeof(uintptr_t));
- trace->inst = (struct dp_trace_instance *)stored_ptr_address;
+ struct dp_trace_resp * dp_trace_resp = (struct dp_trace_resp *)mp_reply.msgs->param;
+ trace->inst = dp_trace_resp->trace_instance;
free(mp_reply.msgs);
return trace;
@@ -134,10 +95,10 @@ err:
return NULL;
}
-struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_tpye)
+struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_type)
{
struct dp_trace_process * trace;
- if (process_tpye == DP_TRACE_PROCESS_MARSIO)
+ if (process_type == DP_TRACE_PROCESS_MARSIO)
{
trace = dp_trace_process_create_in_serv();
}
@@ -153,7 +114,11 @@ struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type pro
}
// Currently, all traces use the same ring
- trace->ring = trace->inst->ring;
+ trace->nr_ring = trace->inst->nr_ring;
+ for (unsigned int i = 0; i < trace->nr_ring; i++)
+ {
+ trace->ring[i] = trace->inst->ring[i];
+ }
return trace;
@@ -183,7 +148,7 @@ void dp_trace_job_clean(struct dp_trace_job_ctx * ctx)
}
}
-int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc, unsigned int job_index)
+int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc)
{
// Find a job that is not being used.pkt_cnt
// Recycle the jobs added that have been terminated
@@ -192,15 +157,24 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
struct dp_trace_job_ctx * ctx = NULL;
pcap_t * pcap_handle = NULL;
struct bpf_program fp = {};
+ int16_t err_code = DP_TRACE_SUCCESS;
- if (instance->job_ctx[job_index].used == false)
+ unsigned int rule_index = desc->rule_index;
+ if (rule_index >= DP_TRACE_JOB_NUM_MAX)
{
- ctx = &instance->job_ctx[job_index];
+ err_code = DP_TRACE_ERROR_ILLEGAL_JOB_ID;
+ goto err;
+ }
+
+ if (instance->job_ctx[rule_index].used == false)
+ {
+ ctx = &instance->job_ctx[rule_index];
}
if (ctx == NULL)
{
MR_ERROR("Not enough job id");
+ err_code = DP_TRACE_ERROR_JOB_ID_IN_USED;
goto err;
}
@@ -208,6 +182,7 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
if (pcap_compile(pcap_handle, &fp, desc->bpf_expr, 0, PCAP_NETMASK_UNKNOWN) < 0)
{
MR_ERROR("pcap_compile execution failed: %s", pcap_geterr(pcap_handle));
+ err_code = DP_TRACE_ERROR_ILLEGAL_BPF_EXPR;
goto err;
}
copy_bpf_prog(ctx, fp);
@@ -215,21 +190,24 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
pcap_close(pcap_handle);
ctx->desc.enable = desc->enable;
+ ctx->desc.measurement_type = desc->measurement_type;
snprintf(ctx->desc.bpf_expr, sizeof(ctx->desc.bpf_expr), "%s", desc->bpf_expr);
ctx->desc.pkt_cnt_max = desc->pkt_cnt_max;
ctx->desc.sampling = desc->sampling;
ctx->desc.snaplen = desc->snaplen;
ctx->used = true;
+ instance->nr_job_ctx++;
+ MR_INFO("[add job:%u] bpf_expr: %s type: %u", rule_index, ctx->desc.bpf_expr, ctx->desc.measurement_type);
- return 0;
+ return err_code;
err:
pcap_freecode(&fp);
if (pcap_handle != NULL)
pcap_close(pcap_handle);
dp_trace_job_clean(ctx);
- return -1;
+ return err_code;
}
uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int offset,
@@ -237,6 +215,7 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
{
assert(lcore_id < RTE_MAX_LCORE);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
struct dp_trace_instance * instance = trace->inst;
@@ -266,12 +245,12 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
// packet.
// unlimit: ctx->desc.pkt_cnt_max == 0
target_packet = true;
- if (ctx->desc.pkt_cnt_max != 0 && statistics->filter_exec_hit >= ctx->desc.pkt_cnt_max)
+ if (ctx->desc.pkt_cnt_max != 0 && statistics->filter_exec_hit[i] >= ctx->desc.pkt_cnt_max)
{
statistics->reach_pkt_cnt_limit++;
continue;
}
- if (statistics->filter_exec_hit % ctx->desc.sampling == 0)
+ if (statistics->filter_exec_hit[i] % ctx->desc.sampling == 0)
{
// match every sampling packet
match_jobs = match_jobs | ctx->job_id;
@@ -280,26 +259,28 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
{
*snaplen = ctx->desc.snaplen;
}
+ mrb_meta->measurement_type |= ctx->desc.measurement_type;
}
+ statistics->filter_exec_hit[i]++;
}
}
- if (unlikely(target_packet))
- {
- statistics->filter_exec_hit++;
- }
- else
+ if (unlikely(target_packet == false))
{
statistics->filter_exec_miss++;
}
+ if (likely(match_jobs == 0))
+ {
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
+ }
return match_jobs;
}
job_bitmap_t dp_trace_job_id_bitmap_get(struct dp_trace_process * trace, struct rte_mbuf * mbuf)
{
RTE_SET_USED(trace);
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer == NULL)
@@ -316,21 +297,28 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
assert(lcore_id < RTE_MAX_LCORE);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (mrb_meta->dp_trace_can_emit != 0)
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
+ if (mrb_meta->measurement_type != DP_TRACE_MEASUREMENT_TYPE_UNKNOW)
+ {
+ return;
+ }
+
+ // Optimization: When there is no job, return directly
+ if (likely(trace->inst->nr_job_ctx == 0))
{
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
return;
}
- if (likely(dp_trace_is_disable(trace)))
+ if (unlikely(dp_trace_is_disable(trace)))
{
- mrb_meta->dp_trace_can_emit = -1;
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
return;
}
struct pkt_parser_result * pkt_parser_result = &mrb_meta->pkt_parser_result;
job_bitmap_t match_jobs = 0;
- mrb_meta->dp_trace_can_emit = -1;
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNKNOW;
if (unlikely(pkt_parser_result->nr_layers == 0))
{
@@ -361,12 +349,12 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
unsigned header_size = sizeof(struct dp_trace_buffer);
memset(dp_trace_buffer, 0, header_size);
+ dp_trace_buffer->buffer_refcnt = 1;
dp_trace_buffer->buffer_len = DP_TRACE_RECORD_SIZE - header_size;
dp_trace_buffer->inst = trace->inst;
dp_trace_buffer->jobs = match_jobs;
dp_trace_buffer->snaplen = snaplen;
statistics->record_buf_alloc_success++;
- mrb_meta->dp_trace_can_emit = 1;
}
}
}
@@ -374,13 +362,9 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
int dp_trace_record_emit_str(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id,
const struct dp_trace_record_meta * meta, const char * str)
{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (mrb_meta->dp_trace_can_emit == 0)
- {
- dp_trace_filter_exec(trace, mbuf, 0, lcore_id);
- }
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
- if (mrb_meta->dp_trace_can_emit == -1)
+ if ((mrb_meta->measurement_type & DP_TRACE_MEASUREMENT_TYPE_MATCHED) == 0)
{
return 0;
}
@@ -425,14 +409,14 @@ int dp_trace_record_emit_fmt(struct dp_trace_process * trace, struct rte_mbuf *
void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id)
{
- // infra_dp_trace_record_write(mbuf);
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer != NULL)
{
rte_mbuf_refcnt_update(mbuf, 1);
- int ret = rte_ring_enqueue(trace->ring, (void *)mbuf);
+ uint16_t ring_id = mbuf->hash.usr % trace->nr_ring;
+ int ret = rte_ring_enqueue(trace->ring[ring_id], (void *)mbuf);
assert(lcore_id < RTE_MAX_LCORE);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
@@ -452,384 +436,6 @@ void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mb
}
}
-#if 0
-static void trace_filename_generate(struct dp_trace_process * trace)
-{
- // get trace file name
- time_t current_time;
- time(&current_time);
- char time_str[20];
- snprintf(time_str, sizeof(time_str), "%ld", current_time);
- char trace_file_name[256];
- snprintf(trace_file_name, sizeof(trace_file_name), "%s", time_str);
-
- // get trace path
- unsigned int trace_dir_len = strlen(trace->inst->trace_dir);
- if (trace->inst->trace_dir[trace_dir_len - 1] == '/')
- {
- trace->inst->trace_dir[trace_dir_len - 1] = '\0';
- trace_dir_len = strlen(trace->inst->trace_dir);
- }
-
- snprintf(trace->trace_file_path, sizeof(trace->trace_file_path), "%s/dp_trace_%s.pcapng", trace->inst->trace_dir,
- trace_file_name);
-}
-#endif
-
-static bool dp_trace_file_open(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
- if (likely(trace->inst->pcapng == NULL))
- {
- char * trace_file_path = strdup(trace->inst->trace_file_path);
- char * trace_file_dir = dirname(trace_file_path);
- int ret = mkdir(trace_file_dir, 0755);
- free(trace_file_path);
- if (ret != 0 && errno != EEXIST)
- {
- MR_ERROR("Failed to create directory:%s. errno is %d.", trace_file_dir, errno);
- goto end;
- }
-
- unlink(trace->inst->trace_file_path);
-
- int dumpfile_fd = open(trace->inst->trace_file_path, O_WRONLY | O_CREAT, 0640);
- char * os_info = os_info_get();
- trace->inst->pcapng = rte_pcapng_fdopen(dumpfile_fd, os_info, NULL, NULL, NULL);
- free(os_info);
- if (trace->inst->pcapng == NULL)
- {
- MR_ERROR("Failed to create data path trace file.");
- goto end;
- }
- }
- dp_trace_file_mutex_unlock(trace);
- return true;
-
-end:
- dp_trace_file_mutex_unlock(trace);
- return false;
-}
-
-static void dp_trace_file_close(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
- if (likely(trace->inst->pcapng != NULL))
- {
- rte_pcapng_close(trace->inst->pcapng);
- trace->inst->pcapng = NULL;
- }
- dp_trace_file_mutex_unlock(trace);
-}
-
-bool dp_trace_start(struct dp_trace_process * trace)
-{
- bool ret = false;
- trace_file_mutex_lock(trace);
- if (dp_trace_file_open(trace))
- {
- for (unsigned int i = 0; i < RTE_MAX_LCORE; i++)
- {
- struct dp_trace_stat * statistics = &trace->statistics[i];
- statistics->filter_exec_hit = 0;
- statistics->filter_exec_miss = 0;
- statistics->reach_pkt_cnt_limit = 0;
- statistics->record_buf_alloc_failed_no_mem = 0;
- statistics->record_buf_alloc_success = 0;
- statistics->record_emit_failed_trace_oversize = 0;
- statistics->record_emit_failed_no_space_in_buf = 0;
- statistics->record_emit_success = 0;
- statistics->ring_enqueue_failed = 0;
- statistics->ring_enqueue_success = 0;
- }
-
- struct dp_trace_saving_stat * saving_statistics = &trace->saving_statistics;
- saving_statistics->save_to_file_failed_at_pcapng_format = 0;
- saving_statistics->save_to_file_failed_at_write_to_disk = 0;
- saving_statistics->save_to_file_failed_other = 0;
- saving_statistics->save_to_file_success = 0;
-
- if (remove(trace->inst->trace_file_bak_path) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", trace->inst->trace_file_bak_path, strerror(errno));
- }
-
- trace->inst->enable = true;
- ret = true;
- }
- dp_trace_file_mutex_unlock(trace);
- return ret;
-}
-
-static inline bool dp_trace_is_disable(struct dp_trace_process * trace)
-{
- return trace->inst->enable == false;
-}
-
-void dp_trace_stop(struct dp_trace_process * trace)
-{
- trace->inst->enable = false;
-
- trace_file_mutex_lock(trace);
- dp_trace_file_close(trace);
- dp_trace_file_merge(trace);
- dp_trace_file_mutex_unlock(trace);
-}
-
-#if 0
-void new_filename_generate(const char * old_filename, char * new_filename, unsigned int buf_len, const char * suffix)
-{
- assert(strlen(old_filename) <= PATH_MAX);
-
- char * dot = strrchr(old_filename, '.');
- if (dot == NULL)
- {
- snprintf(new_filename, buf_len, "%s_%s", old_filename, suffix);
- }
- else
- {
- unsigned int name_len = dot - old_filename;
- assert(buf_len > name_len);
-
- strncpy(new_filename, old_filename, name_len);
- snprintf(new_filename + name_len, buf_len - name_len, "_%s%s", suffix, dot);
- }
-}
-#endif
-
-static void dp_trace_file_merge(struct dp_trace_process * trace)
-{
- if (!is_file_exists(trace->inst->trace_file_bak_path))
- {
- // Only one file, no need to merge
- return;
- }
-
- trace_file_mutex_lock(trace);
-
- char tmp_name[PATH_MAX];
- snprintf(tmp_name, sizeof(tmp_name), "%s.2", trace->inst->trace_file_path);
-
- dp_trace_file_close(trace);
-
- // If newpath already exists, it will be atomically replaced
- if (rename(trace->inst->trace_file_path, tmp_name) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", trace->inst->trace_file_path, tmp_name, strerror(errno));
- }
-
- char command[2 * PATH_MAX];
- snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", trace->inst->trace_merge_timeout,
- trace->inst->trace_file_path, tmp_name, trace->inst->trace_file_bak_path);
- MR_INFO("merge trace file: %s", command);
-
- FILE * fp;
- char buffer[1024];
- fp = popen(command, "r");
- if (fp == NULL)
- {
- MR_ERROR("open pipe failed: %s", strerror(errno));
- goto err;
- }
-
- while (fgets(buffer, sizeof(buffer), fp) != NULL)
- {
- MR_ERROR("merge trace file output: %s", buffer);
- }
-
- pclose(fp);
-
- if (remove(tmp_name) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", tmp_name, strerror(errno));
- }
-
- if (remove(trace->inst->trace_file_bak_path) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", trace->inst->trace_file_bak_path, strerror(errno));
- }
-
- dp_trace_file_mutex_unlock(trace);
- return;
-
-err:
- if (rename(tmp_name, trace->inst->trace_file_path) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", tmp_name, trace->inst->trace_file_path, strerror(errno));
- }
-
- dp_trace_file_mutex_unlock(trace);
- return;
-}
-
-static void dp_trace_file_rollbak(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
-
- const char * cur_filename = trace->inst->trace_file_path;
- char * bak_filename = trace->inst->trace_file_bak_path;
- unsigned int bak_filename_len = sizeof(trace->inst->trace_file_bak_path);
-
- dp_trace_file_close(trace);
-
- snprintf(bak_filename, bak_filename_len, "%s.1", cur_filename);
-
- if (rename(cur_filename, bak_filename) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", cur_filename, bak_filename, strerror(errno));
- }
-
- dp_trace_file_open(trace);
-
- dp_trace_file_mutex_unlock(trace);
-}
-
-static bool dp_trace_file_reach_max_size(struct dp_trace_process * trace)
-{
- // max_size == 0 : unlimit write
- trace_file_mutex_lock(trace);
-
- unsigned int max_size = trace->inst->trace_file_max_size / 2;
-
- if (max_size == 0)
- {
- dp_trace_file_mutex_unlock(trace);
- return false;
- }
-
- struct stat file_stat;
- if (unlikely(stat(trace->inst->trace_file_path, &file_stat) == -1))
- {
- MR_ERROR("Failed to obtain data path trace file status.");
- dp_trace_file_mutex_unlock(trace);
- return true;
- }
-
- // unit is B -> KB
- if ((file_stat.st_size >> 10) >= max_size)
- {
- dp_trace_file_mutex_unlock(trace);
- return true;
- }
-
- dp_trace_file_mutex_unlock(trace);
- return false;
-}
-
-static void * dp_trace_save_thread(void * arg)
-{
- struct dp_trace_process * trace = (struct dp_trace_process *)arg;
- mr_thread_setname(pthread_self(), "dp_trace_save");
- MR_DEBUG("start data path trace save thread");
- char * decode_trace_buf = ZMALLOC(DP_TRACE_RECORD_SIZE);
- MR_VERIFY_MALLOC(decode_trace_buf);
-
- while (rte_atomic16_read(&trace->save_thread_still_run) == 1)
- {
- sem_wait(&trace->sem);
-
- struct dp_trace_saving_stat * saving_statistics = &trace->saving_statistics;
-
- if (rte_atomic16_read(&trace->save_thread_still_run) == 0)
- {
- break;
- }
-
- struct rte_mbuf * records[DP_TRACE_RING_SIZE_MAX];
- struct rte_mbuf * pcapng_records[DP_TRACE_RING_SIZE_MAX];
- int n = 0;
-
- if (dp_trace_is_disable(trace) &&
- (n = rte_ring_dequeue_burst(trace->ring, (void **)records, DP_TRACE_RING_SIZE_MAX, NULL)) != 0)
- {
- // Consume the semaphore; avoid reacquiring the lock;
- // Discard the trace data that entered the ring after stop
- for (unsigned int i = 0; i < n; i++)
- {
- struct rte_mbuf * mbuf = records[i];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
-
- rte_mempool_put(trace->inst->pool, (void *)dp_trace_buffer);
- saving_statistics->save_to_file_failed_other++;
- }
- rte_pktmbuf_free_bulk(records, n);
- continue;
- }
-
- trace_file_mutex_lock(trace);
- while ((n = rte_ring_dequeue_burst(trace->ring, (void **)records, DP_TRACE_RING_SIZE_MAX, NULL)) != 0)
- {
- int copy_cnt = 0;
-
- for (unsigned int i = 0; i < n; i++)
- {
- int ret = 0;
- struct rte_mbuf * mbuf = records[i];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
-
- ret = dp_trace_record_decode(mbuf, decode_trace_buf, DP_TRACE_RECORD_SIZE);
- if (unlikely(ret < 0))
- {
- saving_statistics->save_to_file_failed_other++;
- goto release_buffer;
- }
-
- struct rte_mbuf * pkt =
- dpdk_23_rte_pcapng_copy(0, 0, mbuf, trace->inst->dump_pool, dp_trace_buffer->snaplen,
- RTE_PCAPNG_DIRECTION_UNKNOWN, decode_trace_buf);
- if (pkt == NULL)
- {
- saving_statistics->save_to_file_failed_at_pcapng_format++;
- }
- else
- {
- pcapng_records[copy_cnt++] = pkt;
- }
-
- release_buffer:
- rte_mempool_put(trace->inst->pool, (void *)dp_trace_buffer);
- }
- rte_pktmbuf_free_bulk(records, n);
-
- if (trace->inst->pcapng != NULL &&
- rte_pcapng_write_packets(trace->inst->pcapng, pcapng_records, copy_cnt) >= 0)
- {
- saving_statistics->save_to_file_success += copy_cnt;
- }
- else
- {
- saving_statistics->save_to_file_failed_at_write_to_disk += copy_cnt;
- }
- // doc say: The mbuf's in pkts are always freed
- // But in fact rte_pcapng_write_packets does not release mbuf
- rte_pktmbuf_free_bulk(pcapng_records, copy_cnt);
-
- // When the file is full, stop tagging data packets, close the file
- // OR in the configuration file, stop trace
- if (dp_trace_file_reach_max_size(trace))
- {
- dp_trace_file_rollbak(trace);
- }
- }
- dp_trace_file_mutex_unlock(trace);
- }
- FREE(decode_trace_buf);
- pthread_exit(NULL);
-}
-
-int dp_trace_record_flush(struct dp_trace_process * trace)
-{
- if (rte_ring_empty(trace->ring) == 1)
- {
- return 0;
- }
-
- sem_post(&trace->sem);
- return 0;
-}
-
int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
{
struct dp_trace_job_ctx * ctx;
@@ -839,6 +445,8 @@ int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
if (ctx->used && ((ctx->job_id & jobs) != 0))
{
dp_trace_job_clean(ctx);
+ trace->inst->nr_job_ctx--;
+ MR_INFO("[destroy job:%u] bpf_expr: %s", i, ctx->desc.bpf_expr);
}
}
@@ -847,7 +455,7 @@ int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_meta * meta, const char * str)
{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
unsigned int left = dp_trace_buffer->buffer_len - dp_trace_buffer->buffer_used;
@@ -863,6 +471,7 @@ int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_
struct dp_trace_record_header * record_header =
(struct dp_trace_record_header *)(dp_trace_buffer->buffer + dp_trace_buffer->buffer_used);
+ record_header->measurement_type = meta->measurement_type;
snprintf(record_header->appsym, sizeof(record_header->appsym), "%s", meta->appsym);
snprintf(record_header->module, sizeof(record_header->module), "%s", meta->module);
@@ -879,286 +488,19 @@ int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_
return 0;
}
-int dp_trace_record_decode(struct rte_mbuf * mbuf, char * dst, unsigned int size)
-{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
- unsigned int position = 0;
-
-#ifndef NDEBUG
- unsigned int comment_cnt = 0;
-#endif
-
- while (position < dp_trace_buffer->buffer_used)
- {
- char * cur = dp_trace_buffer->buffer + position;
-
- const struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur);
- const char * str = cur + sizeof(struct dp_trace_record_header);
- const unsigned int str_len = record_header->recode_len;
-
- int n = snprintf(dst, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module,
- record_header->ts.tv_sec, record_header->ts.tv_nsec);
- if (unlikely(n < 0 || n >= size))
- return -1;
- size -= n;
- dst += n;
-
- if (unlikely(size - 2 < str_len))
- return -1;
- memcpy(dst, str, str_len);
- size -= str_len;
- dst += str_len;
-
- *dst = '\n';
- size--;
- dst++;
-
- position += sizeof(struct dp_trace_record_header) + str_len;
-#ifndef NDEBUG
- comment_cnt++;
-#endif
- }
-
- if (size < 1)
- return -1;
-
-#ifndef NDEBUG
- uint16_t avali = dp_trace_buffer->buffer_len - dp_trace_buffer->buffer_used;
- snprintf(dst, size, "used: %u, avali: %u, comment: %u", dp_trace_buffer->buffer_used, avali, comment_cnt);
-#else
- *dst = '\0';
-#endif
-
- return 0;
-}
-
-int trace_file_mutex_lock(struct dp_trace_process * trace)
-{
- int ret = pthread_mutex_lock(&trace->inst->trace_file_mutex);
- if (ret == EOWNERDEAD)
- {
- ret = pthread_mutex_consistent(&trace->inst->trace_file_mutex);
- ret = pthread_mutex_unlock(&trace->inst->trace_file_mutex);
- if (ret != 0)
- {
- MR_ERROR("EOWNERDEAD -> job ctx unlock failed");
- return -1;
- }
- }
- else if (ret != 0)
- {
- MR_ERROR("job ctx lock failed");
- return -1;
- }
- return 0;
-}
-
-int dp_trace_file_mutex_unlock(struct dp_trace_process * trace)
-{
- return pthread_mutex_unlock(&trace->inst->trace_file_mutex);
-}
-
-/* copy from pdump.c */
-/* length of option including padding */
-static uint16_t pcapng_optlen(uint16_t len)
-{
- return RTE_ALIGN(sizeof(struct pcapng_option) + len, sizeof(uint32_t));
-}
-
-static struct pcapng_option * pcapng_add_option(struct pcapng_option * popt, uint16_t code, const void * data,
- uint16_t len)
-{
- popt->code = code;
- popt->length = len;
- memcpy(popt->data, data, len);
-
- return (struct pcapng_option *)((uint8_t *)popt + pcapng_optlen(len));
-}
-
-static char * os_info_get(void)
-{
- struct utsname uts;
- char * os_name = NULL;
-
- if (uname(&uts) < 0)
- return NULL;
-
- if (asprintf(&os_name, "%s %s", uts.sysname, uts.release) == -1)
- return NULL;
-
- return os_name;
-}
-
-/* Make a copy of original mbuf with pcapng header and options */
-static struct rte_mbuf * dpdk_23_rte_pcapng_copy(uint16_t port_id, uint32_t queue, const struct rte_mbuf * md,
- struct rte_mempool * mp, uint32_t length,
- enum rte_pcapng_direction direction, const char * comment)
-{
- struct pcapng_enhance_packet_block * epb;
- uint32_t orig_len, data_len, padding, flags;
- struct pcapng_option * opt;
- uint64_t timestamp;
- uint16_t optlen;
- struct rte_mbuf * mc;
- bool rss_hash;
-
-#ifdef RTE_LIBRTE_ETHDEV_DEBUG
- RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, NULL);
-#endif
- orig_len = rte_pktmbuf_pkt_len(md);
-
- /* Take snapshot of the data */
- mc = rte_pktmbuf_copy(md, mp, 0, length);
- if (unlikely(mc == NULL))
- return NULL;
-
- /* Expand any offloaded VLAN information */
- if ((direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_VLAN_STRIPPED)) ||
- (direction == RTE_PCAPNG_DIRECTION_OUT && (md->ol_flags & RTE_MBUF_F_TX_VLAN)))
- {
- if (dpdk_23_pcapng_vlan_insert(mc, RTE_ETHER_TYPE_VLAN, md->vlan_tci) != 0)
- goto fail;
- }
-
- if ((direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_QINQ_STRIPPED)) ||
- (direction == RTE_PCAPNG_DIRECTION_OUT && (md->ol_flags & RTE_MBUF_F_TX_QINQ)))
- {
- if (dpdk_23_pcapng_vlan_insert(mc, RTE_ETHER_TYPE_QINQ, md->vlan_tci_outer) != 0)
- goto fail;
- }
-
- /* record HASH on incoming packets */
- rss_hash = (direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_RSS_HASH));
-
- /* pad the packet to 32 bit boundary */
- data_len = rte_pktmbuf_data_len(mc);
- padding = RTE_ALIGN(data_len, sizeof(uint32_t)) - data_len;
- if (padding > 0)
- {
- void * tail = rte_pktmbuf_append(mc, padding);
-
- if (tail == NULL)
- goto fail;
- memset(tail, 0, padding);
- }
-
- optlen = pcapng_optlen(sizeof(flags));
- optlen += pcapng_optlen(sizeof(queue));
- if (rss_hash)
- optlen += pcapng_optlen(sizeof(uint8_t) + sizeof(uint32_t));
-
- if (comment)
- optlen += pcapng_optlen(strlen(comment));
-
- /* reserve trailing options and block length */
- opt = (struct pcapng_option *)rte_pktmbuf_append(mc, optlen + sizeof(uint32_t));
- if (unlikely(opt == NULL))
- goto fail;
-
- switch (direction)
- {
- case RTE_PCAPNG_DIRECTION_IN:
- flags = PCAPNG_IFB_INBOUND;
- break;
- case RTE_PCAPNG_DIRECTION_OUT:
- flags = PCAPNG_IFB_OUTBOUND;
- break;
- default:
- flags = 0;
- }
-
- opt = pcapng_add_option(opt, PCAPNG_EPB_FLAGS, &flags, sizeof(flags));
-
- opt = pcapng_add_option(opt, PCAPNG_EPB_QUEUE, &queue, sizeof(queue));
-
- if (rss_hash)
- {
- uint8_t hash_opt[5];
-
- /* The algorithm could be something else if
- * used rte_flow_action_rss; but the current API does not
- * have a way for ethdev to report this on a per-packet basis.
- */
- hash_opt[0] = PCAPNG_HASH_TOEPLITZ;
-
- memcpy(&hash_opt[1], &md->hash.rss, sizeof(uint32_t));
- opt = pcapng_add_option(opt, PCAPNG_EPB_HASH, &hash_opt, sizeof(hash_opt));
- }
-
- if (comment)
- opt = pcapng_add_option(opt, PCAPNG_OPT_COMMENT, comment, strlen(comment));
-
- /* Note: END_OPT necessary here. Wireshark doesn't do it. */
-
- /* Add PCAPNG packet header */
- epb = (struct pcapng_enhance_packet_block *)rte_pktmbuf_prepend(mc, sizeof(*epb));
- if (unlikely(epb == NULL))
- goto fail;
-
- epb->block_type = PCAPNG_ENHANCED_PACKET_BLOCK;
- epb->block_length = rte_pktmbuf_data_len(mc);
-
- /* Interface index is filled in later during write */
- mc->port = port_id;
-
- /* Put timestamp in cycles here - adjust in packet write */
- // timestamp = rte_get_tsc_cycles();
-
- // Modification: use system time
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, &current_time);
- timestamp = (uint64_t)current_time.tv_sec * 1000000000 + current_time.tv_nsec;
-
- epb->timestamp_hi = timestamp >> 32;
- epb->timestamp_lo = (uint32_t)timestamp;
- epb->capture_length = data_len;
- epb->original_length = orig_len;
-
- /* set trailer of block length */
- *(uint32_t *)opt = epb->block_length;
-
- return mc;
-
-fail:
- rte_pktmbuf_free(mc);
- return NULL;
-}
-
-static int dpdk_23_pcapng_vlan_insert(struct rte_mbuf * m, uint16_t ether_type, uint16_t tci)
+static inline bool dp_trace_is_disable(struct dp_trace_process * trace)
{
- struct rte_ether_hdr *nh, *oh;
- struct rte_vlan_hdr * vh;
-
- if (!RTE_MBUF_DIRECT(m) || rte_mbuf_refcnt_read(m) > 1)
- return -EINVAL;
-
- if (rte_pktmbuf_data_len(m) < sizeof(*oh))
- return -EINVAL;
-
- oh = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
- nh = (struct rte_ether_hdr *)rte_pktmbuf_prepend(m, sizeof(struct rte_vlan_hdr));
- if (nh == NULL)
- return -ENOSPC;
-
- memmove(nh, oh, 2 * RTE_ETHER_ADDR_LEN);
- nh->ether_type = rte_cpu_to_be_16(ether_type);
-
- vh = (struct rte_vlan_hdr *)(nh + 1);
- vh->vlan_tci = rte_cpu_to_be_16(tci);
-
- return 0;
+ return trace->inst->enable == 0;
}
static void dp_trace_buffer_free(struct rte_mbuf * mbuf)
{
if (unlikely(mbuf == NULL))
return;
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer != NULL)
{
- // rte_atomic64_inc(&dp_trace_buffer->inst->statistics.uncategorized_failed);
rte_mempool_put(dp_trace_buffer->inst->pool, (void *)dp_trace_buffer);
}
}
@@ -1169,6 +511,7 @@ void infra_rte_pktmbuf_free(struct rte_mbuf * mbuf)
rte_pktmbuf_free(mbuf);
}
+#if 0
void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count)
{
for (unsigned int idx = 0; idx < count; idx++)
@@ -1176,4 +519,20 @@ void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count)
dp_trace_buffer_free(mbufs[idx]);
}
rte_pktmbuf_free_bulk(mbufs, count);
+}
+#endif
+
+const char * dp_trace_strerror(unsigned int err)
+{
+ const static char * errlist[] = {"data path trace: operate successfully",
+ "data path trace: no enough job id",
+ "data path trace: job id is being used",
+ "data path trace: job id is illegal",
+ "data path trace: encountered an illegal expression",
+ "data path trace: unkonw error"};
+ if (err < DP_TRACE_ERROR_MAX)
+ {
+ return errlist[err];
+ }
+ return errlist[DP_TRACE_ERROR_MAX];
} \ No newline at end of file
diff --git a/infra/test/TestDataPathTrace.cc b/infra/test/TestDataPathTrace.cc
index 8b34a22..6306e42 100644
--- a/infra/test/TestDataPathTrace.cc
+++ b/infra/test/TestDataPathTrace.cc
@@ -85,8 +85,7 @@ class DataPathTraceTest : public testing::Test
{
// create trace
trace = dp_trace_process_create(DP_TRACE_PROCESS_MARSIO);
- snprintf(trace->inst->trace_file_path, sizeof(trace->inst->trace_file_path), "./dp_trace.pcapng");
- dp_trace_start(trace);
+ trace->inst->enable = true;
}
void TearDown() override
@@ -99,51 +98,27 @@ class DataPathTraceTest : public testing::Test
rte_eal_cleanup();
}
- void wait_save_trace_thread_close(struct dp_trace_process * trace)
- {
- rte_atomic16_set(&trace->save_thread_still_run, 0);
- sem_post(&trace->sem);
- pthread_join(trace->save_trace_file_thread, NULL);
- }
-
void dp_trace_process_destroy(struct dp_trace_process * trace, enum dp_trace_process_type process_tpye)
{
// This function is for unit testing only. This instance created by marsio or app should not be deleted
// manually. It accompanies the entire life cycle of the process.
- dp_trace_stop(trace);
- wait_save_trace_thread_close(trace);
if (process_tpye == DP_TRACE_PROCESS_MARSIO)
{
- sem_destroy(&trace->sem);
rte_mempool_free(trace->inst->pool);
rte_mempool_free(trace->inst->dump_pool);
- rte_ring_free(trace->inst->ring);
+ for (unsigned int i = 0; i < trace->inst->nr_ring; i++)
+ {
+ rte_ring_free(trace->inst->ring[i]);
+ }
FREE(trace->inst);
FREE(trace);
}
else
{
- sem_destroy(&trace->sem);
FREE(trace);
}
}
- bool file_contains_string(const std::string & filename, const std::string & searchString)
- {
- std::ifstream file(filename.c_str());
- if (!file.is_open())
- return false;
-
- std::string line;
- while (std::getline(file, line))
- {
- if (line.find(searchString) != std::string::npos)
- return true;
- }
-
- return false;
- }
-
struct rte_mbuf * mbuf_construct(const unsigned char * pkt, unsigned int len)
{
if (trace == NULL)
@@ -151,7 +126,6 @@ class DataPathTraceTest : public testing::Test
struct rte_mbuf * pkt_mbuf = rte_pktmbuf_alloc(trace->inst->dump_pool);
unsigned int sz_align_priv = RTE_ALIGN(sizeof(struct mrb_metadata), RTE_MBUF_PRIV_ALIGN);
- // memset(((unsigned char *)pkt_mbuf + sizeof(struct rte_mbuf)), 0, sz_align_priv);
uint16_t data_off = sizeof(struct rte_mbuf) + sz_align_priv;
unsigned char * data_addr = (unsigned char *)pkt_mbuf->buf_addr + data_off;
@@ -177,7 +151,6 @@ class DataPathTraceTest : public testing::Test
}
struct dp_trace_process * trace;
- pthread_t save_trace_file_thread;
};
TEST_F(DataPathTraceTest, PackageConstruct)
@@ -214,24 +187,22 @@ TEST_F(DataPathTraceTest, InstanceCreate)
TEST_F(DataPathTraceTest, JobInit)
{
int n;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- n = dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ n = dp_trace_job_add(trace, &desc);
EXPECT_GE(n, 0);
EXPECT_EQ(trace->inst->job_ctx[0].used, true);
}
TEST_F(DataPathTraceTest, JobDestroy)
{
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
dp_trace_jobs_destroy(trace, 1 | 1 << 1);
- wait_save_trace_thread_close(trace);
-
EXPECT_EQ(trace->inst->job_ctx[0].used, false);
EXPECT_EQ(trace->inst->job_ctx[1].used, false);
}
@@ -242,13 +213,12 @@ TEST_F(DataPathTraceTest, BPFMatch)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
- ;
dp_trace_filter_exec(trace, pkt73_mbuf, offset, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
EXPECT_EQ(ret, 3);
@@ -264,8 +234,8 @@ TEST_F(DataPathTraceTest, BPFMatchNet)
{
struct rte_mbuf * pkt1_buf = mbuf_construct(pkt1, sizeof(pkt1));
- struct dp_trace_job_desc desc = {true, "net 0.0.0.0/0", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "net 0.0.0.0/0", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt1_buf, 0, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt1_buf);
@@ -285,11 +255,11 @@ TEST_F(DataPathTraceTest, BPFMatchExternalAndInternal)
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- struct dp_trace_job_desc desc = {true, "ip src 1.1.15.100", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ip src 1.1.15.100", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
dp_trace_filter_exec(trace, pkt73_mbuf, 0, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
@@ -308,8 +278,8 @@ TEST_F(DataPathTraceTest, BPFUnmatch)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "vlan && ip src 127.0.0.1", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "vlan && ip src 127.0.0.1", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt73_mbuf, offset, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
@@ -325,31 +295,37 @@ TEST_F(DataPathTraceTest, EmitTrace)
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 1);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
mrb_meta->packet_create_from_nf = 1;
memset(&mrb_meta->pkt_parser_result, 0, sizeof(mrb_meta->pkt_parser_result));
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
+ dp_trace_filter_exec(trace, pkt73_mbuf, 0, rte_lcore_id());
+
+ struct dp_trace_record_meta meta = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
- struct dp_trace_record_meta meta_2 = {"test", "emit", NULL};
+ struct dp_trace_record_meta meta_2 = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta_2, "def");
- char decode_record[256];
- dp_trace_record_decode(pkt73_mbuf, decode_record, 256);
- std::string str(decode_record);
- std::cout << "decode emit:" << str << std::endl;
+ struct dp_trace_buffer * dp_trace_buffer = (struct dp_trace_buffer *)mrb_meta->dp_trace_buffer;
+
+ char * cur = dp_trace_buffer->buffer;
+ struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur);
+ char * str = cur + sizeof(struct dp_trace_record_header);
+ unsigned int str_len = record_header->recode_len;
+ EXPECT_TRUE(std::string(str, str_len).find("abc") != std::string::npos);
- EXPECT_TRUE(str.find("abc") != std::string::npos);
- EXPECT_TRUE(str.find("def") != std::string::npos);
+ cur += sizeof(struct dp_trace_record_header) + str_len;
+ record_header = (struct dp_trace_record_header *)(cur);
+ str = cur + sizeof(struct dp_trace_record_header);
+ EXPECT_TRUE(std::string(str, str_len).find("def") != std::string::npos);
// Excessive content will not cause buffer overflow
std::string big_buf(DP_TRACE_RECORD_SIZE, 'a');
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta_2, big_buf.c_str());
- struct dp_trace_buffer * dp_trace_buffer = (struct dp_trace_buffer *)mrb_meta->dp_trace_buffer;
EXPECT_TRUE(dp_trace_buffer->buffer_used <= DP_TRACE_RECORD_SIZE);
infra_rte_pktmbuf_free(pkt73_mbuf);
@@ -361,42 +337,22 @@ TEST_F(DataPathTraceTest, StripToRing)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt73_mbuf, offset, rte_lcore_id());
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
+ struct dp_trace_record_meta meta = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
dp_trace_record_write(trace, pkt73_mbuf, rte_lcore_id());
- EXPECT_TRUE(rte_ring_count(trace->ring) == 1);
-
- infra_rte_pktmbuf_free(pkt73_mbuf);
-}
-
-TEST_F(DataPathTraceTest, SaveTraceToFile)
-{
- struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
-
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
-
- dp_trace_filter_exec(trace, pkt73_mbuf, offset, rte_lcore_id());
-
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
- dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
-
- dp_trace_record_write(trace, pkt73_mbuf, rte_lcore_id());
-
- dp_trace_record_flush(trace);
-
- sleep(1);
-
- EXPECT_TRUE(file_contains_string(trace->inst->trace_file_path, "test:emit"));
+ unsigned int trace_record_cnt = 0;
+ for (unsigned int i = 0; i < trace->nr_ring; i++)
+ {
+ trace_record_cnt += rte_ring_count(trace->ring[i]);
+ }
+ EXPECT_TRUE(trace_record_cnt == 1);
infra_rte_pktmbuf_free(pkt73_mbuf);
}
@@ -405,8 +361,8 @@ TEST_F(DataPathTraceTest, MaxRecordCount)
{
unsigned int lcore_id = rte_lcore_id();
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 2, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 2, 1};
+ dp_trace_job_add(trace, &desc);
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
dp_trace_filter_exec(trace, pkt73_mbuf, 0, 0);
diff --git a/service/include/sc_app.h b/service/include/sc_app.h
index c03fa6b..b5cb9de 100644
--- a/service/include/sc_app.h
+++ b/service/include/sc_app.h
@@ -20,6 +20,7 @@ struct app
char mntfile[MR_STRING_MAX];
/* 监控app是否存活*/
int fd;
+ uint8_t job_id_used;
/* 虚设备管理模块在APP中的保存的上下文 */
void * pme_vdev_main;
};
diff --git a/service/include/sc_trace.h b/service/include/sc_trace.h
index 084eae2..c3d3f2d 100644
--- a/service/include/sc_trace.h
+++ b/service/include/sc_trace.h
@@ -6,6 +6,7 @@
#include "dp_trace.h"
#include "sc_app.h"
#include "sc_common.h"
+#include <adapter_define.h>
#include <mrb_define.h>
#define MR_TRACE_APPSYM "mrzcpd"
@@ -18,8 +19,6 @@ void hook_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count);
/* Generate and store the trace information for pkt parser */
static inline void gen_store_trace_info_pkt_parser(struct rte_node * node, struct rte_mbuf * mbuf)
{
- struct dp_trace_record_meta meta = {.appsym=MR_TRACE_APPSYM, .module=node->name};
-
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
struct pkt_parser_result * pkt_parser_result = &mrb_meta->pkt_parser_result;
@@ -39,14 +38,14 @@ static inline void gen_store_trace_info_pkt_parser(struct rte_node * node, struc
len += snprintf(str_record + len, sizeof(str_record) - len, "]");
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
/* Generate and store the trace information for rte mbuf */
static inline void gen_store_trace_info_rte_mbuf(struct rte_node * node, struct rte_mbuf * mbuf)
{
- struct dp_trace_record_meta meta = {.appsym=MR_TRACE_APPSYM, .module=node->name};
-
/* Populate the rte mbuf information */
char str_record[MR_STRING_MAX];
@@ -59,14 +58,14 @@ static inline void gen_store_trace_info_rte_mbuf(struct rte_node * node, struct
#endif
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
/* Generate and store the trace information for Sid list */
static inline void gen_store_trace_info_sid_list(struct rte_node * node, struct rte_mbuf * mbuf)
{
- struct dp_trace_record_meta meta = {.appsym=MR_TRACE_APPSYM, .module=node->name};
-
/* Populate the Sid list */
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
uint16_t sids[RTE_DIM(mrb_meta->sid_list.sids)];
@@ -85,6 +84,8 @@ static inline void gen_store_trace_info_sid_list(struct rte_node * node, struct
len += snprintf(str_record + len, sizeof(str_record) - len, "]]");
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -109,25 +110,14 @@ static inline int embed_sid_info(struct rte_mbuf * mbuf, char * str_record, int
return len;
}
-
/* Generate and store the trace information for rx */
static inline void gen_store_trace_info_rx(struct rte_node * node, struct rte_mbuf * mbuf,
struct mr_dev_desc * dev_desc, uint16_t queue_id)
{
- struct dp_trace_record_meta meta = {.appsym=MR_TRACE_APPSYM, .module=node->name};
-
/* Populate the next node information */
char str_record[MR_STRING_MAX];
- int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[0]->name);
-
- /* Populate the core id information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", core:%u", rte_lcore_id());
-
- /* Populate the queue information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", qid:%u", queue_id);
-
- /* Populate the hash information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", hash:%u", mbuf->hash.usr);
+ int len = snprintf(str_record, sizeof(str_record), "next node:%s, core:%u, qid:%u, hash:%u", node->nodes[0]->name,
+ rte_lcore_id(), queue_id, mbuf->hash.usr);
/* Populate the nf create infomation */
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
@@ -137,6 +127,22 @@ static inline void gen_store_trace_info_rx(struct rte_node * node, struct rte_mb
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
+ dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
+}
+
+/* Generate and store the telemetry information for rx */
+static inline void gen_store_telemetry_info_rx(struct rte_node * node, struct rte_mbuf * mbuf,
+ struct mr_dev_desc * dev_desc, uint16_t queue_id)
+{
+ /* Populate the next node information */
+ char str_record[MR_STRING_MAX];
+ snprintf(str_record, sizeof(str_record), "interface(rx) = %s, qid = %u", dev_desc->symbol, queue_id);
+
+ /* Emit the trace record */
+ struct dp_trace_record_meta meta = {.measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY,
+ .appsym = MR_TRACE_APPSYM};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -144,21 +150,52 @@ static inline void gen_store_trace_info_rx(struct rte_node * node, struct rte_mb
static inline void gen_store_trace_info_tx(struct rte_node * node, struct rte_mbuf * mbuf,
struct mr_dev_desc * dev_desc, uint16_t queue_id)
{
- struct dp_trace_record_meta meta = {.appsym=MR_TRACE_APPSYM, .module=node->name};
-
/* Populate the port information */
char str_record[MR_STRING_MAX];
- int len = snprintf(str_record, sizeof(str_record), "tx:%u,%s", dev_desc->port_id, dev_desc->symbol);
+ snprintf(str_record, sizeof(str_record), "tx:%u,%s, core:%u, qid:%u, hash:%u", dev_desc->port_id, dev_desc->symbol,
+ rte_lcore_id(), queue_id, mbuf->hash.usr);
+
+ /* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
+ dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
+}
- /* Populate the core id information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", core:%u", rte_lcore_id());
+/* Generate and store the telemetry information for tx */
+static inline void gen_store_telemetry_info_tx(struct rte_node * node, struct rte_mbuf * mbuf,
+ struct mr_dev_desc * dev_desc, uint16_t queue_id)
+{
+ /* Populate the port information */
+ char str_record[MR_STRING_MAX];
+ snprintf(str_record, sizeof(str_record), "interface(tx) = %s, qid = %u", dev_desc->symbol, queue_id);
- /* Populate the queue information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", qid:%u", queue_id);
+ /* Emit the trace record */
+ struct dp_trace_record_meta meta = {.measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY,
+ .appsym = MR_TRACE_APPSYM};
+ dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
+}
- /* Populate the hash information */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", hash:%u", mbuf->hash.usr);
+/* Generate and store the telemetry information */
+static __rte_always_inline void gen_store_telemetry_info_adapter(struct rte_mbuf * mbuf)
+{
+ char str_record[MR_STRING_MAX];
+ struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
+ switch (mrb_meta->adapter_type)
+ {
+ case ADAPTER_TYPE_VWIRE:
+ snprintf(str_record, sizeof(str_record), "vwire_id = %u, traffic link id = %u", mrb_meta->adapter_id,
+ mrb_meta->traffic_link_id);
+ break;
+ case ADAPTER_TYPE_EF:
+ snprintf(str_record, sizeof(str_record), "ef_id = %u, traffic link id = %u", mrb_meta->adapter_id,
+ mrb_meta->traffic_link_id);
+ break;
+ default:
+ break;
+ }
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {.measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY,
+ .appsym = MR_TRACE_APPSYM};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
diff --git a/service/src/app.c b/service/src/app.c
index 764549a..e21d1a3 100644
--- a/service/src/app.c
+++ b/service/src/app.c
@@ -186,6 +186,8 @@ int __instance_alive_handler(const struct rte_mp_msg * msg, const void * peer)
epoll_add_event(app_main->epoll_fd, app_object->fd, EPOLLIN);
__send_app_register_response(msg, peer, (const char *)(reg_msg->symbol), true, NULL);
+
+ MR_INFO("Application %s registe successfully.", app_object->symbol);
return 0;
error:
diff --git a/service/src/dp_trace.c b/service/src/dp_trace.c
index e229b68..35949d0 100644
--- a/service/src/dp_trace.c
+++ b/service/src/dp_trace.c
@@ -8,79 +8,17 @@
struct dp_trace_config
{
- bool enable;
- char file_path[PATH_MAX];
- unsigned int file_max_size; // KB
- unsigned int trace_merge_timeout;
-
- struct dp_trace_job_desc desc[DP_TRACE_JOB_NUM_MAX];
+ uint16_t enable;
};
-int __data_trace_path_handler(const struct rte_mp_msg * msg, const void * peer);
+int __dp_trace_path_handler(const struct rte_mp_msg * msg, const void * peer);
+static void dp_trace_telemetry_unregister(struct app_main * app_main, struct app * app, void * arg);
void load_dp_trace_config(struct sc_main * sc, struct dp_trace_config * dp_trace_config)
{
- int ret = 0;
-
- unsigned int enable = 0;
+ unsigned int enable = 1;
MESA_load_profile_uint_nodef(sc->local_dyfile, "dp_trace_rule", "enable", &enable);
- dp_trace_config->enable = (enable == 0) ? false : true;
-
- char dp_trace_file_path[MR_SYMBOL_MAX] = "/tmp/dp_trace.pcapng";
- MESA_load_profile_string_nodef(sc->local_dyfile, "dp_trace_rule", "dp_trace_file_path", dp_trace_file_path,
- sizeof(dp_trace_file_path));
- snprintf(dp_trace_config->file_path, sizeof(dp_trace_config->file_path), "%s", dp_trace_file_path);
-
- unsigned int dp_trace_file_max_size_in_KB = 256 * 1024;
- MESA_load_profile_uint_nodef(sc->local_dyfile, "dp_trace_rule", "dp_trace_file_max_size_in_KB",
- &dp_trace_file_max_size_in_KB);
- dp_trace_config->file_max_size = dp_trace_file_max_size_in_KB;
-
- unsigned int dp_trace_merge_timeout = 30;
- MESA_load_profile_uint_nodef(sc->local_dyfile, "dp_trace_rule", "dp_trace_merge_timeout", &dp_trace_merge_timeout);
- dp_trace_config->trace_merge_timeout = dp_trace_merge_timeout;
-
- for (int rule_index = 0; rule_index < DP_TRACE_JOB_NUM_MAX; rule_index++)
- {
- unsigned int enable = 1;
- char bpf_expr[MR_BPF_EXPRESSION_MAX] = {};
- unsigned int pkt_cnt_max = 0;
- unsigned int sampling = 1;
- unsigned int snaplen = UINT32_MAX;
-
- char dp_trace_section[MR_SYMBOL_MAX] = {};
- snprintf(dp_trace_section, sizeof(dp_trace_section), "dp_trace_rule:%d", rule_index);
-
- MESA_load_profile_uint_nodef(sc->local_dyfile, dp_trace_section, "enable", &enable);
-
- ret =
- MESA_load_profile_string_nodef(sc->local_dyfile, dp_trace_section, "bpf_expr", bpf_expr, sizeof(bpf_expr));
- if (ret < 0)
- {
- continue;
- }
-
- MESA_load_profile_uint_nodef(sc->local_dyfile, dp_trace_section, "pkt_cnt_max", &pkt_cnt_max);
- MESA_load_profile_uint_nodef(sc->local_dyfile, dp_trace_section, "sampling", &sampling);
- MESA_load_profile_uint_nodef(sc->local_dyfile, dp_trace_section, "snaplen", &snaplen);
-
- struct dp_trace_job_desc * desc = (struct dp_trace_job_desc *)&dp_trace_config->desc;
- struct dp_trace_job_desc * desc_i = &desc[rule_index];
- desc_i->enable = (enable == 0) ? false : true;
- snprintf(desc_i->bpf_expr, sizeof(desc_i->bpf_expr), "%s", bpf_expr);
-
- // The maximum number of packets is limited to each core
- // assert(sc->nr_io_thread != 0);
- // pkt_cnt_max = pkt_cnt_max / sc->nr_io_thread;
- // if (pkt_cnt_max % sc->nr_io_thread != 0)
- // {
- // pkt_cnt_max++;
- // }
-
- desc_i->pkt_cnt_max = pkt_cnt_max;
- desc_i->sampling = (sampling == 0) ? 1 : sampling;
- desc_i->snaplen = (snaplen == 0) ? UINT32_MAX : snaplen;
- }
+ dp_trace_config->enable = enable;
}
void dp_trace_config_apply(struct sc_main * sc)
@@ -89,98 +27,15 @@ void dp_trace_config_apply(struct sc_main * sc)
return;
struct dp_trace_config dp_trace_config = {};
-
- struct dp_trace_job_desc * desc = (struct dp_trace_job_desc *)&dp_trace_config.desc;
-
MR_INFO("Loading data path trace configuration file...");
- load_dp_trace_config(sc, &dp_trace_config);
-
- if (sc->trace->inst->enable == false && dp_trace_config.enable == false)
- {
- MR_INFO("continue to disable data path trace");
- goto end;
- }
- else if (sc->trace->inst->enable == true && dp_trace_config.enable == false)
- {
- MR_INFO("disable data path trace");
- dp_trace_stop(sc->trace);
- dp_trace_jobs_destroy(sc->trace, DP_TRACE_ALL_JOBS);
- goto end;
- }
- if (sc->trace->inst->enable == false && dp_trace_config.enable == true)
+ load_dp_trace_config(sc, &dp_trace_config);
+ if (sc->trace->inst->enable != dp_trace_config.enable)
{
- struct dp_trace_instance * inst = sc->trace->inst;
- snprintf(inst->trace_file_path, sizeof(inst->trace_file_path), "%s", dp_trace_config.file_path);
- inst->trace_file_max_size = dp_trace_config.file_max_size;
- inst->trace_merge_timeout = dp_trace_config.trace_merge_timeout;
-
- if (!dp_trace_start(sc->trace))
- {
- MR_INFO("failed to enable data path trace");
- goto end;
- }
- MR_INFO("enable data path trace");
+ MR_INFO("Modify trace status from %u to %u", sc->trace->inst->enable, dp_trace_config.enable);
+ sc->trace->inst->enable = dp_trace_config.enable;
}
- struct dp_trace_job_ctx * job_ctx = sc->trace->inst->job_ctx;
- for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++)
- {
- // The job that was not in the system originally is not in the configuration now.
- if (job_ctx[i].used == false && (desc[i].enable == false || strlen(desc[i].bpf_expr) == 0))
- {
- continue;
- }
-
- // The job that was not originally in the system is now included in the configuration.
- if (job_ctx[i].used == false && desc[i].enable && strlen(desc[i].bpf_expr) != 0)
- {
- MR_INFO(" [add or enable rule %u] bpf_expr: %s;", i, desc[i].bpf_expr);
- dp_trace_job_add(sc->trace, &desc[i], i);
- continue;
- }
-
- // The job that was originally in the system is also included in the configuration now,
- // but the content is different.
- if (job_ctx[i].used == true && desc[i].enable && strlen(desc[i].bpf_expr) != 0)
- {
- struct dp_trace_job_desc * job_ctx_i_desc = &job_ctx[i].desc;
- if (strcmp(job_ctx_i_desc->bpf_expr, desc[i].bpf_expr) != 0)
- {
- MR_INFO(" [mod rule %u] bpf_expr: %s;", i, desc[i].bpf_expr);
- dp_trace_jobs_destroy(sc->trace, job_ctx[i].job_id);
- dp_trace_job_add(sc->trace, &desc[i], i);
- }
- else
- {
- if (job_ctx_i_desc->pkt_cnt_max != desc[i].pkt_cnt_max)
- {
- job_ctx_i_desc->pkt_cnt_max = desc[i].pkt_cnt_max;
- }
-
- if (job_ctx_i_desc->sampling != desc[i].sampling)
- {
- job_ctx_i_desc->sampling = desc[i].sampling;
- }
-
- if (job_ctx_i_desc->snaplen != desc[i].snaplen)
- {
- job_ctx_i_desc->snaplen = desc[i].snaplen;
- }
- }
- continue;
- }
-
- // The job that originally existed in the system is not in the configuration now.
- if (job_ctx[i].used == true && (desc[i].enable == false || strlen(desc[i].bpf_expr) == 0))
- {
- MR_INFO(" [del or disable rule %u] bpf_expr: %s;", i, job_ctx[i].desc.bpf_expr);
- dp_trace_jobs_destroy(sc->trace, job_ctx[i].job_id);
- continue;
- }
- }
-
-end:
MR_INFO("Loading data path trace configuration file is completed.");
}
@@ -199,21 +54,73 @@ int mr_dp_trace_init(struct sc_main * sc)
dp_trace_config_apply(sc);
- rte_mp_action_register("data_path_trace", __data_trace_path_handler);
+ rte_mp_action_register(DP_TRACE_MP_MSG_NAME, __dp_trace_path_handler);
+ app_event_handler_register(sc->app_main, APP_EV_TYPE_UNREGISTER, dp_trace_telemetry_unregister, NULL);
return 0;
}
-int __data_trace_path_handler(const struct rte_mp_msg * msg, const void * peer)
+int __dp_trace_path_handler(const struct rte_mp_msg * msg, const void * peer)
{
- struct rte_mp_msg resp;
- snprintf(resp.name, sizeof(resp.name), "%s", "data_path_trace");
+ struct dp_trace_instance * inst = sc_main_get()->trace->inst;
+ struct dp_trace_job_ctx * job_ctx = inst->job_ctx;
+ int ret = 0;
+
+ struct rte_mp_msg resp = {};
+ snprintf(resp.name, sizeof(resp.name), "%s", DP_TRACE_MP_MSG_NAME);
+
+ struct dp_trace_resp * dp_trace_resp = (struct dp_trace_resp *)resp.param;
+ resp.len_param = sizeof(struct dp_trace_resp);
- uintptr_t ptr_address = (uintptr_t)sc_main_get()->trace->inst;
- resp.len_param = sizeof(ptr_address);
- memcpy(resp.param, &ptr_address, sizeof(ptr_address));
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)msg->param;
+ if (dp_trace_req->action == DP_TRACE_INSTANCE_GET)
+ {
+ dp_trace_resp->errcode = DP_TRACE_SUCCESS;
+ dp_trace_resp->trace_instance = inst;
+ goto send_resp;
+ }
+
+ if (dp_trace_req->action == DP_TRACE_JOB_ID_USED_GET)
+ {
+ job_bitmap_t jobs_id_uesd = 0;
+ for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++)
+ {
+ if (job_ctx[i].used == true)
+ {
+ jobs_id_uesd |= job_ctx[i].job_id;
+ }
+ }
+
+ dp_trace_resp->jobs_id_used_get = jobs_id_uesd;
+ dp_trace_resp->errcode = DP_TRACE_SUCCESS;
+ goto send_resp;
+ }
- int ret = rte_mp_reply(&resp, peer);
+ if (dp_trace_req->action == DP_TRACE_JOB_ADD)
+ {
+ dp_trace_resp->errcode = dp_trace_job_add(sc_main_get()->trace, dp_trace_req->desc);
+ if (dp_trace_resp->errcode == DP_TRACE_SUCCESS)
+ {
+ unsigned int rule_index = dp_trace_req->desc->rule_index;
+ dp_trace_resp->job_id_add = job_ctx[rule_index].job_id;
+
+ struct app * app_object = app_lookup_by_symbol(sc_main_get(), (const char *)dp_trace_req->appsym);
+ assert(app_object != NULL);
+ app_object->job_id_used = 1;
+ }
+ goto send_resp;
+ }
+
+ if (dp_trace_req->action == DP_TRACE_JOB_DESTROY)
+ {
+ job_bitmap_t jobs_id = dp_trace_req->jobs_id_destroy;
+ jobs_id = jobs_id & 0xffff;
+ dp_trace_jobs_destroy(sc_main_get()->trace, jobs_id);
+ dp_trace_resp->errcode = DP_TRACE_SUCCESS;
+ }
+
+send_resp:
+ ret = rte_mp_reply(&resp, peer);
if (ret < 0)
{
MR_WARNING("Failed to execute rte_mp_reply:%s", rte_strerror(rte_errno));
@@ -222,17 +129,32 @@ int __data_trace_path_handler(const struct rte_mp_msg * msg, const void * peer)
return 0;
}
+static void dp_trace_telemetry_unregister(struct app_main * app_main, struct app * app, void * arg)
+{
+ if (app->job_id_used == 0)
+ {
+ return;
+ }
+
+ struct dp_trace_process * dp_trace_process = sc_main_get()->trace;
+ dp_trace_jobs_destroy(dp_trace_process, DP_TRACE_ALL_JOBS);
+}
+
cJSON * dp_trace_monit_loop(struct sc_main * sc)
{
cJSON * json_root = cJSON_CreateObject();
struct dp_trace_process * trace = sc->trace;
struct dp_trace_stat monit_statistics = {};
+ uint64_t filter_exec_hit = 0;
for (unsigned int i = 0; i < RTE_MAX_LCORE; i++)
{
struct dp_trace_stat * statistics_i = &trace->statistics[i];
- monit_statistics.filter_exec_hit += statistics_i->filter_exec_hit;
+ for (unsigned int j = 0; j < DP_TRACE_JOB_NUM_MAX; j++)
+ {
+ filter_exec_hit += statistics_i->filter_exec_hit[j];
+ }
monit_statistics.filter_exec_miss += statistics_i->filter_exec_miss;
monit_statistics.reach_pkt_cnt_limit += statistics_i->reach_pkt_cnt_limit;
monit_statistics.record_buf_alloc_failed_no_mem += statistics_i->record_buf_alloc_failed_no_mem;
@@ -244,11 +166,9 @@ cJSON * dp_trace_monit_loop(struct sc_main * sc)
monit_statistics.record_emit_success += statistics_i->record_emit_success;
}
- struct dp_trace_saving_stat * saving_statistics = &trace->saving_statistics;
-
cJSON_AddBoolToObject(json_root, "trace_enable", trace->inst->enable);
- cJSON_AddNumberToObject(json_root, "filter_exec_hit", monit_statistics.filter_exec_hit);
+ cJSON_AddNumberToObject(json_root, "filter_exec_hit", filter_exec_hit);
cJSON_AddNumberToObject(json_root, "filter_exec_miss", monit_statistics.filter_exec_miss);
cJSON_AddNumberToObject(json_root, "reach_pkt_cnt_limit", monit_statistics.reach_pkt_cnt_limit);
@@ -266,13 +186,6 @@ cJSON * dp_trace_monit_loop(struct sc_main * sc)
monit_statistics.record_emit_failed_no_space_in_buf);
cJSON_AddNumberToObject(json_root, "record_emit_success", monit_statistics.record_emit_success);
- cJSON_AddNumberToObject(json_root, "save_to_file_failed_other", saving_statistics->save_to_file_failed_other);
- cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_pcapng_format",
- saving_statistics->save_to_file_failed_at_pcapng_format);
- cJSON_AddNumberToObject(json_root, "save_to_file_failed_at_write_to_disk",
- saving_statistics->save_to_file_failed_at_write_to_disk);
- cJSON_AddNumberToObject(json_root, "save_to_file_success", saving_statistics->save_to_file_success);
-
return json_root;
}
@@ -282,7 +195,6 @@ void hook_rte_pktmbuf_free(struct rte_mbuf * m)
return;
struct dp_trace_process * trace = sc_main_get()->trace;
dp_trace_record_write(trace, m, rte_lcore_id());
- dp_trace_record_flush(trace);
rte_pktmbuf_free(m);
}
@@ -295,6 +207,5 @@ void hook_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count)
continue;
dp_trace_record_write(trace, mbufs[idx], rte_lcore_id());
}
- dp_trace_record_flush(trace);
rte_pktmbuf_free_bulk(mbufs, count);
} \ No newline at end of file
diff --git a/service/src/node_bfd.c b/service/src/node_bfd.c
index d039ad2..4d35f33 100644
--- a/service/src/node_bfd.c
+++ b/service/src/node_bfd.c
@@ -152,8 +152,6 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
uint16_t next_node_index, int expect_result,
uint16_t bfd_session_id, uint8_t bfd_state)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
@@ -175,6 +173,8 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
len += snprintf(str_record + len, sizeof(str_record) - len, ", bfd state:%u", bfd_state);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -251,7 +251,7 @@ static __rte_always_inline uint16_t bfd_node_process(struct rte_graph * graph, s
node_enqueue:
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info(node, mbuf, next_node_index, expect_result, bfd_session_id, bfd_state);
}
diff --git a/service/src/node_bridge.c b/service/src/node_bridge.c
index 2790ae6..cf018a3 100644
--- a/service/src/node_bridge.c
+++ b/service/src/node_bridge.c
@@ -227,7 +227,6 @@ static __rte_always_inline void gen_store_trace_info_original(struct rte_node *
{
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
-
assert(match_result < BRIDGE_MATCH_MAX);
int len = snprintf(str_record, sizeof(str_record), ", rsn:%s", bridge_match_result_str[match_result]);
@@ -239,7 +238,8 @@ static __rte_always_inline void gen_store_trace_info_original(struct rte_node *
}
/* Emit the trace record */
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -247,7 +247,6 @@ static __rte_always_inline void gen_store_trace_info_original(struct rte_node *
static __rte_always_inline void gen_store_trace_info_clone(struct rte_node * node, struct rte_mbuf * mbuf,
uint16_t next_node_index)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
@@ -256,6 +255,8 @@ static __rte_always_inline void gen_store_trace_info_clone(struct rte_node * nod
node->nodes[next_node_index]->name, mrb_meta->port_egress);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -365,7 +366,7 @@ static __rte_always_inline uint16_t bridge_node_process(struct rte_graph * graph
dp_trace_filter_exec(sc_main_get()->trace, mbuf_clone, 0, rte_lcore_id());
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_clone(node, mbuf_clone, BRIDGE_NEXT_ETH_EGRESS);
}
@@ -376,7 +377,7 @@ static __rte_always_inline uint16_t bridge_node_process(struct rte_graph * graph
node_enqueue:
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_original(node, mbuf, next_node_index, &stats, match_result);
}
diff --git a/service/src/node_classifier.c b/service/src/node_classifier.c
index 88b65ee..d4e87e3 100644
--- a/service/src/node_classifier.c
+++ b/service/src/node_classifier.c
@@ -364,8 +364,6 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
struct match_result_engine * result, uint8_t ignore_icmp,
enum cls_excpt_reason excpt_reason)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
@@ -401,6 +399,8 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -515,7 +515,7 @@ static __rte_always_inline uint16_t classifier_node_process(struct rte_graph * g
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(pending_mbufs[pkt_idx])))
+ if (unlikely(dp_trace_record_can_emit(pending_mbufs[pkt_idx], DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info(node, pending_mbufs[pkt_idx], CLASSIFIER_NEXT_FORWARDER, &stats, &results[pkt_idx],
ignore_icmp_pkts[pkt_idx], excpt_reason);
@@ -600,7 +600,7 @@ static __rte_always_inline uint16_t classifier_node_process(struct rte_graph * g
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf0)))
+ if (unlikely(dp_trace_record_can_emit(mbuf0, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info(node, mbuf0, CLASSIFIER_NEXT_FORWARDER, &stats, &result, ignore_icmp_pkt,
excpt_reason);
diff --git a/service/src/node_eth_egress.c b/service/src/node_eth_egress.c
index 7614cff..dd77595 100644
--- a/service/src/node_eth_egress.c
+++ b/service/src/node_eth_egress.c
@@ -81,17 +81,14 @@ int node_eth_egress_init(struct node_manager_main * node_mgr_main)
static __rte_always_inline void gen_store_trace_info_egress(struct rte_node * node, struct rte_mbuf * mbuf,
uint16_t next_node_index)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
- int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
-
- /* Populate the egress port information */
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
- len += snprintf(str_record + len, sizeof(str_record) - len, ", tx:%u", mrb_meta->port_egress);
+ snprintf(str_record, sizeof(str_record), "next node:%s, tx:%u", node->nodes[next_node_index]->name,
+ mrb_meta->port_egress);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -124,7 +121,7 @@ static __rte_always_inline uint16_t eth_egress_node_process(struct rte_graph * g
#if 0
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_egress(node, mbuf, next_node_index);
}
diff --git a/service/src/node_eth_ingress.c b/service/src/node_eth_ingress.c
index ad7468a..bf3f444 100644
--- a/service/src/node_eth_ingress.c
+++ b/service/src/node_eth_ingress.c
@@ -409,14 +409,11 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
struct mr_dev_desc * dev_desc,
enum eth_ingress_drop_reason drop_reason)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
/* Populate the reason for next node */
-
switch (next_node_index)
{
case ETH_INGRESS_NEXT_PKT_DROP: {
@@ -456,6 +453,8 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -536,7 +535,7 @@ static __rte_always_inline uint16_t eth_ingress_node_process(struct rte_graph *
node_enqueue:
#if 1
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_ingress(node, mbuf, next_node_index, &stats, dev_desc, drop_reason);
}
diff --git a/service/src/node_etherfabric.c b/service/src/node_etherfabric.c
index da856b7..1174e6a 100644
--- a/service/src/node_etherfabric.c
+++ b/service/src/node_etherfabric.c
@@ -498,8 +498,6 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
uint16_t next_node_index, uint16_t prepend_sid,
enum ef_ingress_drop_reason drop_reason)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
@@ -522,6 +520,8 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -619,13 +619,18 @@ static __rte_always_inline uint16_t ef_ingress_node_process(struct rte_graph * g
node_enqueue:
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_ingress(node, mbuf, next_node_index, prepend_sid, drop_reason);
// gen_store_trace_info_sid_list(node, mbuf);
// gen_store_trace_info_rte_mbuf(node, mbuf);
}
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)))
+ {
+ gen_store_telemetry_info_adapter(mbuf);
+ }
+
/* Judge the next index whether to change */
if (unlikely(batch_next_node_index != next_node_index))
{
@@ -750,8 +755,6 @@ static __rte_always_inline void gen_store_trace_info_egress(struct rte_node * no
uint16_t next_node_index,
enum ef_egress_drop_reason drop_reason)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
@@ -778,6 +781,8 @@ static __rte_always_inline void gen_store_trace_info_egress(struct rte_node * no
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -861,7 +866,7 @@ static __rte_always_inline uint16_t ef_egress_node_process(struct rte_graph * gr
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_egress(node, mbuf, next_node_index, drop_reason);
// gen_store_trace_info_rte_mbuf(node, mbuf);
diff --git a/service/src/node_forwarder.c b/service/src/node_forwarder.c
index 41bcf5a..ba5aee3 100644
--- a/service/src/node_forwarder.c
+++ b/service/src/node_forwarder.c
@@ -106,8 +106,6 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
uint16_t next_node_index, struct forwarder_stats * stats,
enum forwarder_drop_reason drop_reason)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
@@ -130,6 +128,8 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
len += embed_sid_info(mbuf, str_record + len, sizeof(str_record) - len);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -206,7 +206,7 @@ static __rte_always_inline uint16_t forwarder_node_process(struct rte_graph * gr
node_enqueue:
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info(node, mbuf, next_node_index, &stats, drop_reason);
// gen_store_trace_info_sid_list(node, mbuf);
diff --git a/service/src/node_health_check.c b/service/src/node_health_check.c
index 3a5662c..f8f9afd 100644
--- a/service/src/node_health_check.c
+++ b/service/src/node_health_check.c
@@ -649,21 +649,14 @@ static __rte_always_inline void gen_store_trace_info_ask_node(struct rte_node *
uint16_t next_node_index,
struct health_check_session * session)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
- int len = snprintf(str_record, sizeof(str_record), "next node:%s", node->nodes[next_node_index]->name);
-
- /* Populate the core id */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", core:%u", rte_lcore_id());
-
- /* Populate the health check session infomation */
- len += snprintf(str_record + len, sizeof(str_record) - len, ", session name:%s", session->name);
- len += snprintf(str_record + len, sizeof(str_record) - len, ", listening dev:%u,%s", session->port_id,
- session->device);
+ snprintf(str_record, sizeof(str_record), "next node:%s, core:%u, session name:%s, listening dev:%u,%s",
+ node->nodes[next_node_index]->name, rte_lcore_id(), session->name, session->port_id, session->device);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -708,7 +701,7 @@ static __rte_always_inline uint16_t health_check_ask_node_process(struct rte_gra
dp_trace_filter_exec(sc_main_get()->trace, mbuf, 0, rte_lcore_id());
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_ask_node(node, mbuf, HEALTH_CHECK_ASK_NEXT_ETH_EGRESS, session);
}
@@ -726,7 +719,7 @@ static __rte_always_inline uint16_t health_check_ask_node_process(struct rte_gra
for (int i = 0; i < nr_mbufs; i++)
{
struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i];
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
gen_store_trace_info_rte_mbuf(node, mbuf);
}
@@ -786,7 +779,8 @@ static __rte_always_inline void gen_store_trace_info_answer_node(struct rte_node
}
/* Emit the trace record */
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -834,7 +828,7 @@ static __rte_always_inline uint16_t health_check_deal_answer_node_process(struct
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_answer_node(node, mbuf, HEALTH_CHECK_DEAL_ANSWER_NEXT_PKT_DROP, &stats, session,
excpt_reason);
diff --git a/service/src/node_lb.c b/service/src/node_lb.c
index 552200f..f86fcbe 100644
--- a/service/src/node_lb.c
+++ b/service/src/node_lb.c
@@ -454,7 +454,6 @@ int lb_init(struct sc_main * sc)
static __rte_always_inline void gen_store_trace_info(struct rte_node * node, struct rte_mbuf * mbuf,
uint16_t next_node_index, struct lb_group * lb_group)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
@@ -477,6 +476,8 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
snprintf(str_record + len, sizeof(str_record) - len, ", cur sid:%u, lb id:%u", lb_group->sid, lb_group->index);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -537,7 +538,7 @@ static __rte_always_inline uint16_t lb_node_process(struct rte_graph * graph, st
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info(node, mbuf, next_node_index, lb_group);
}
diff --git a/service/src/node_phydev.c b/service/src/node_phydev.c
index ba5b219..39c16f0 100644
--- a/service/src/node_phydev.c
+++ b/service/src/node_phydev.c
@@ -5,6 +5,7 @@
#include <rte_graph_worker.h>
#include <rte_mbuf.h>
+#include <cJSON.h>
#include <metadata_define.h>
#include <rte_memcpy.h>
#include <sc_node.h>
@@ -423,7 +424,7 @@ static __rte_always_inline uint16_t dpdk_msgpack_dev_rx_node_process(struct rte_
dp_trace_filter_exec(sc_main_get()->trace, mbuf, 0);
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_rx(node, mbuf, ctx->dev_desc, graph->id);
}
@@ -530,11 +531,16 @@ static __rte_always_inline uint16_t dpdk_dev_rx_node_process(struct rte_graph *
dp_trace_filter_exec(sc_main_get()->trace, mbuf, 0, rte_lcore_id());
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_rx(node, mbuf, dev_desc, qid);
gen_store_trace_info_pkt_parser(node, mbuf);
}
+
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)))
+ {
+ gen_store_telemetry_info_rx(node, mbuf, dev_desc, qid);
+ }
}
/* for test */
@@ -544,7 +550,7 @@ static __rte_always_inline uint16_t dpdk_dev_rx_node_process(struct rte_graph *
for (unsigned int i = 0; i < node->idx; i++)
{
struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i];
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_rte_mbuf(node, mbuf);
}
@@ -597,7 +603,7 @@ static __rte_always_inline uint16_t dpdk_msgpack_dev_tx_node_process(struct rte_
struct rte_mbuf * mbuf = (struct rte_mbuf *)node->objs[i];
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_tx(node, mbuf, dev_desc, graph->id);
}
@@ -647,15 +653,18 @@ static __rte_always_inline uint16_t dpdk_dev_tx_node_process(struct rte_graph *
struct rte_mbuf * mbuf = (struct rte_mbuf *)objs[i];
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_tx(node, mbuf, dev_desc, graph->id);
// gen_store_trace_info_rte_mbuf(node, mbuf);
- dp_trace_record_write(sc_main_get()->trace, mbuf, rte_lcore_id());
}
- }
- dp_trace_record_flush(sc_main_get()->trace);
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)))
+ {
+ gen_store_telemetry_info_tx(node, mbuf, dev_desc, graph->id);
+ }
+ dp_trace_record_write(sc_main_get()->trace, mbuf, rte_lcore_id());
+ }
/* do CF calculate */
#if MR_PHYDEV_ENABLE_DF_CALCULATE
diff --git a/service/src/node_shmdev.c b/service/src/node_shmdev.c
index 1f02d68..d840866 100644
--- a/service/src/node_shmdev.c
+++ b/service/src/node_shmdev.c
@@ -46,8 +46,6 @@ enum packet_direction
static __rte_always_inline void gen_store_trace_info(struct rte_node * node, struct rte_mbuf * mbuf,
struct vdev * shm_dev_desc, enum packet_direction direction)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
-
char str_record[MR_STRING_MAX];
int len = snprintf(str_record, sizeof(str_record), "port:%u,%s", shm_dev_desc->port_id, shm_dev_desc->symbol);
@@ -57,6 +55,7 @@ static __rte_always_inline void gen_store_trace_info(struct rte_node * node, str
}
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -126,7 +125,7 @@ uint16_t shmdev_rx_node_process(struct rte_graph * graph, struct rte_node * node
mrb_meta->port_ingress = dev_desc->port_id;
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf0)))
+ if (unlikely(dp_trace_record_can_emit(mbuf0, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_rx(node, mbuf0, dev_desc, qid);
@@ -163,7 +162,7 @@ uint16_t shmdev_tx_node_process(struct rte_graph * graph, struct rte_node * node
for (unsigned int i = 0; i < nr_pkts; i++)
{
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbufs[i])))
+ if (unlikely(dp_trace_record_can_emit(mbufs[i], DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_tx(node, mbufs[i], dev_desc, graph->id);
// gen_store_trace_info_rte_mbuf(node, mbufs[i]);
diff --git a/service/src/node_tera.c b/service/src/node_tera.c
index 3453631..a0e4a77 100644
--- a/service/src/node_tera.c
+++ b/service/src/node_tera.c
@@ -351,7 +351,8 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
}
/* Emit the trace record */
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -459,7 +460,7 @@ static __rte_always_inline uint16_t tera_ingress_node_process(struct rte_graph *
node_enqueue:
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_ingress(node, mbuf, next_node_index, &stats, vlan_id, prev_vlan_couple,
prev_adapter->mac_flipping, prepend_sid, drop_reason);
@@ -519,16 +520,16 @@ RTE_NODE_REGISTER(tera_ingress_node_base);
static __rte_always_inline void gen_store_trace_info_egress(struct rte_node * node, struct rte_mbuf * mbuf,
uint16_t next_node_index)
{
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
/* Populate the next node infomation */
char str_record[MR_STRING_MAX];
struct mrb_metadata * mrb_meta = mrbuf_cz_data(mbuf, MR_NODE_CTRLZONE_ID);
-
snprintf(str_record, sizeof(str_record), "next node:%s, tera id:%u, tx:%u", node->nodes[next_node_index]->name,
mrb_meta->adapter_id, mrb_meta->port_egress);
/* Emit the trace record */
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -579,7 +580,7 @@ static __rte_always_inline uint16_t tera_egress_node_process(struct rte_graph *
mrb_meta->port_egress = prev_port_id;
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf)))
+ if (unlikely(dp_trace_record_can_emit(mbuf, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_egress(node, mbuf, TERA_EGRESS_NEXT_ETH_EGRESS);
}
diff --git a/service/src/node_vwire.c b/service/src/node_vwire.c
index 5a64e9d..01fb74d 100644
--- a/service/src/node_vwire.c
+++ b/service/src/node_vwire.c
@@ -1,5 +1,6 @@
#include "sc_trace.h"
#include <cJSON.h>
+#include <rte_branch_prediction.h>
#include <rte_debug.h>
#include <rte_ethdev.h>
#include <rte_ether.h>
@@ -131,7 +132,8 @@ static __rte_always_inline void gen_store_trace_info_ingress(struct rte_node * n
}
/* Emit the trace record */
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -199,12 +201,17 @@ static __rte_always_inline uint16_t vwire_ingress_node_process(struct rte_graph
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf0)))
+ if (unlikely(dp_trace_record_can_emit(mbuf0, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_ingress(node, mbuf0, next0, prepend_sid);
// gen_store_trace_info_sid_list(node, mbuf0);
}
+ if (unlikely(dp_trace_record_can_emit(mbuf0, DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)))
+ {
+ gen_store_telemetry_info_adapter(mbuf0);
+ }
+
if (unlikely(next_index ^ next0))
{
/* Copy things successfully speculated till now */
@@ -265,7 +272,8 @@ static __rte_always_inline void gen_store_trace_info_egress(struct rte_node * no
}
/* Emit the trace record */
- struct dp_trace_record_meta meta = {.appsym = MR_TRACE_APPSYM, .module = node->name};
+ struct dp_trace_record_meta meta = {
+ .measurement_type = DP_TRACE_MEASUREMENT_TYPE_TRACE, .appsym = MR_TRACE_APPSYM, .module = node->name};
dp_trace_record_emit_str(sc_main_get()->trace, mbuf, rte_lcore_id(), &meta, str_record);
}
@@ -329,7 +337,7 @@ static __rte_always_inline uint16_t vwire_egress_node_process(struct rte_graph *
}
/* Check if tracing is enabled for the current Mbuf */
- if (unlikely(dp_trace_record_can_emit(mbuf0)))
+ if (unlikely(dp_trace_record_can_emit(mbuf0, DP_TRACE_MEASUREMENT_TYPE_TRACE)))
{
gen_store_trace_info_egress(node, mbuf0, next0);
}