summaryrefslogtreecommitdiff
path: root/infra
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-04-27 01:39:14 +0000
committer童宗振 <[email protected]>2024-04-27 01:39:14 +0000
commit42c89e380c3da4b0d40d0ffc0a2e39e45ffe8356 (patch)
tree458a640a953d1293643fda81a303e4098ebdd52e /infra
parent7803faff8547c007e93e211fe787e0a1fe01ada0 (diff)
refactor data path trace for telemetryv4.8.4-20240427
Diffstat (limited to 'infra')
-rw-r--r--infra/include/common.h4
-rw-r--r--infra/include/dp_trace.h106
-rw-r--r--infra/include/ldbc.h38
-rw-r--r--infra/src/dp_trace.c835
-rw-r--r--infra/test/TestDataPathTrace.cc146
5 files changed, 206 insertions, 923 deletions
diff --git a/infra/include/common.h b/infra/include/common.h
index ae982ff..2a0c919 100644
--- a/infra/include/common.h
+++ b/infra/include/common.h
@@ -77,10 +77,6 @@ extern "C"
#define MR_MEMPOOL_COUNT_MAX 64
#endif
-#ifndef MR_BPF_EXPRESSION_MAX
-#define MR_BPF_EXPRESSION_MAX 128
-#endif
-
typedef uint64_t cpu_mask_t;
typedef uint32_t port_id_t;
typedef uint32_t queue_id_t;
diff --git a/infra/include/dp_trace.h b/infra/include/dp_trace.h
index 1049421..689023d 100644
--- a/infra/include/dp_trace.h
+++ b/infra/include/dp_trace.h
@@ -13,7 +13,6 @@
* todo: introduce data path trace design and usage
*/
-#define DP_TRACE_JOB_NUM_MAX 16
#define DP_TRACE_RING_SIZE_MAX 4096
#define DP_TRACE_POOL_NAME "dp_trace_pool"
@@ -25,15 +24,41 @@
#define DP_TRACE_ALL_JOBS UINT16_MAX
-typedef uint16_t job_bitmap_t;
+#define DP_TRACE_MP_MSG_NAME "data_path_trace"
-struct dp_trace_job_desc
+#define DP_TRACE_SUCCESS 0
+#define DP_TRACE_ERROR_NO_ENOUGH_JOB_ID 1
+#define DP_TRACE_ERROR_JOB_ID_IN_USED 2
+#define DP_TRACE_ERROR_ILLEGAL_JOB_ID 3
+#define DP_TRACE_ERROR_ILLEGAL_BPF_EXPR 4
+#define DP_TRACE_ERROR_MAX 5
+
+#define DP_TRACE_MEASUREMENT_TYPE_UNKNOW (0)
+#define DP_TRACE_MEASUREMENT_TYPE_UNMATCH (1 << 3)
+#define DP_TRACE_MEASUREMENT_TYPE_MATCHED (DP_TRACE_MEASUREMENT_TYPE_TRACE | DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)
+
+enum dp_trace_req_action
{
- bool enable;
- char bpf_expr[MR_BPF_EXPRESSION_MAX];
- unsigned int pkt_cnt_max;
- unsigned int sampling;
- unsigned int snaplen;
+ DP_TRACE_INSTANCE_GET,
+ DP_TRACE_JOB_ID_USED_GET,
+ DP_TRACE_JOB_ADD,
+ DP_TRACE_JOB_DESTROY
+};
+
+struct dp_trace_req
+{
+ char appsym[MR_SYMBOL_MAX];
+ enum dp_trace_req_action action;
+ const struct dp_trace_job_desc * desc;
+ job_bitmap_t jobs_id_destroy;
+};
+
+struct dp_trace_resp
+{
+ int16_t errcode;
+ void * trace_instance;
+ job_bitmap_t jobs_id_used_get;
+ job_bitmap_t job_id_add;
};
struct dp_trace_job_ctx
@@ -46,13 +71,14 @@ struct dp_trace_job_ctx
struct dp_trace_stat
{
- uint64_t filter_exec_hit;
+ uint64_t filter_exec_hit[DP_TRACE_JOB_NUM_MAX];
uint64_t filter_exec_miss;
uint64_t reach_pkt_cnt_limit;
uint64_t record_buf_alloc_failed_no_mem;
uint64_t record_buf_alloc_success;
+ uint64_t record_buf_free;
uint64_t record_emit_failed_no_space_in_buf;
uint64_t record_emit_failed_trace_oversize;
@@ -64,43 +90,26 @@ struct dp_trace_stat
// uint64_t uncategorized_failed;
} __rte_cache_aligned;
-struct dp_trace_saving_stat
-{
- uint64_t save_to_file_failed_other;
- uint64_t save_to_file_failed_at_pcapng_format;
- uint64_t save_to_file_failed_at_write_to_disk;
- uint64_t save_to_file_success;
-} __rte_cache_aligned;
-
struct dp_trace_instance
{
- bool enable;
- struct dp_trace_job_ctx job_ctx[DP_TRACE_JOB_NUM_MAX];
- struct rte_ring * ring;
+ uint16_t enable;
+ uint16_t nr_ring;
+ struct rte_ring * ring[DP_TRACE_RING_NUM];
struct rte_mempool * pool;
struct rte_mempool * dump_pool;
-
- pthread_mutex_t trace_file_mutex;
- unsigned int trace_file_max_size;
- char trace_file_path[PATH_MAX];
- char trace_file_bak_path[PATH_MAX];
- rte_pcapng_t * pcapng;
- unsigned int trace_merge_timeout;
-};
+ uint16_t nr_job_ctx;
+ struct dp_trace_job_ctx job_ctx[DP_TRACE_JOB_NUM_MAX];
+} __rte_cache_aligned;
// Memory alignment is very important here; it can avoid cache misses; do not remove
struct dp_trace_process
{
struct dp_trace_instance * inst;
- struct rte_ring * ring;
-
- rte_atomic16_t save_thread_still_run;
- pthread_t save_trace_file_thread;
+ struct rte_ring * ring[DP_TRACE_RING_NUM];
+ uint16_t nr_ring;
RTE_MARKER cacheline1 __rte_cache_min_aligned;
- sem_t sem;
- struct dp_trace_saving_stat saving_statistics;
struct dp_trace_stat statistics[RTE_MAX_LCORE];
} __rte_cache_aligned;
@@ -122,31 +131,22 @@ struct dp_trace_buffer
/* record buffer*/
uint16_t buffer_len;
uint16_t buffer_used;
+ uint16_t buffer_refcnt;
char buffer[0]; // Arrays of Length Zero. Other members must be above buffer[0]
};
struct dp_trace_record_meta
{
+ uint8_t measurement_type;
const char * appsym;
const char * module;
const char * function;
};
-struct dp_trace_record_header
-{
- char appsym[16];
- char module[16];
- uint32_t custom_id_1;
- uint32_t custom_id_2;
-
- struct timespec ts;
- uint16_t recode_len;
-};
-
-static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf)
+static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf, uint8_t measurement_type)
{
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (unlikely(mrb_meta->dp_trace_can_emit == 1))
+ if (unlikely(mrb_meta->measurement_type & measurement_type))
{
return 1;
}
@@ -158,7 +158,7 @@ static inline int dp_trace_record_can_emit(const struct rte_mbuf * mbuf)
struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_tpye);
// use dp_trace_des initialize instance
-int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc, unsigned int job_index);
+int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc);
// Check whether the current packet matches the bpf expression
// Note: This function can only be called once for an rte_mbuf package.
@@ -169,7 +169,6 @@ job_bitmap_t dp_trace_job_id_bitmap_get(struct dp_trace_process * trace, struct
// When constructing the string parameters of dp_trace_record_emit_str is time-consuming, before calling
// dp_trace_record_emit_str, you can call this function to determine whether dp_trace_record_emit_str is valid.
-// int dp_trace_record_can_emit(const struct rte_mbuf * mbuf);
// Record information to different jobs buffer
int dp_trace_record_emit_str(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id,
@@ -180,20 +179,13 @@ int dp_trace_record_emit_fmt(struct dp_trace_process * trace, struct rte_mbuf *
void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id);
-// Save the contents of the buffer to a file
-int dp_trace_record_flush(struct dp_trace_process * trace);
-
// Clear instance. If there is a trace that is not saved, it will be saved automatically.
int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs);
// for unit test
int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_meta * meta, const char * record);
-int dp_trace_record_decode(struct rte_mbuf * mbuf, char * dst, unsigned int size);
// only for infra
void infra_rte_pktmbuf_free(struct rte_mbuf * mbuf);
-void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count);
-// for config
-bool dp_trace_start(struct dp_trace_process * trace);
-void dp_trace_stop(struct dp_trace_process * trace); \ No newline at end of file
+const char * dp_trace_strerror(unsigned int err); \ No newline at end of file
diff --git a/infra/include/ldbc.h b/infra/include/ldbc.h
index cc115e1..f039e9c 100644
--- a/infra/include/ldbc.h
+++ b/infra/include/ldbc.h
@@ -1,5 +1,5 @@
#pragma once
-
+#include "marsio.h"
#include <assert.h>
#include <rte_mbuf.h>
#include <stdint.h>
@@ -37,25 +37,6 @@ enum e_hash_mode
LDBC_HASH_MAX
};
-enum complex_layer_type_id
-{
- LAYER_TYPE_ID_ETHER,
- LAYER_TYPE_ID_PPP,
- LAYER_TYPE_ID_HDLC,
- LAYER_TYPE_ID_VLAN,
- LAYER_TYPE_ID_PPPOE,
- LAYER_TYPE_ID_MPLS,
- LAYER_TYPE_ID_IPV4,
- LAYER_TYPE_ID_IPV6,
- LAYER_TYPE_ID_UDP,
- LAYER_TYPE_ID_TCP,
- LAYER_TYPE_ID_ICMP,
- LAYER_TYPE_ID_ICMP6,
- LAYER_TYPE_ID_GRE,
- LAYER_TYPE_ID_G_VXLAN,
- LAYER_TYPE_ID_GTPV1_U,
-};
-
enum complex_layer_type_mask
{
/* 数据链路层 */
@@ -90,8 +71,8 @@ enum complex_layer_type_mask
LAYER_TYPE_GTPV1_U = 1 << 14,
/* ALL */
- LAYER_TYPE_ALL =(LAYER_TYPE_L2 | LAYER_TYPE_L2_TUN | LAYER_TYPE_L3 |
- LAYER_TYPE_L4 | LAYER_TYPE_G_VXLAN | LAYER_TYPE_GTPV1_U),
+ LAYER_TYPE_ALL =
+ (LAYER_TYPE_L2 | LAYER_TYPE_L2_TUN | LAYER_TYPE_L3 | LAYER_TYPE_L4 | LAYER_TYPE_G_VXLAN | LAYER_TYPE_GTPV1_U),
};
#define MR_PKT_PARSER_LAYERS_MAX 16
@@ -144,8 +125,7 @@ struct distributer
};
static inline void pkt_parser_init(struct pkt_parser * pkt_parser, struct pkt_parser_result * result,
- enum complex_layer_type_mask expect_layer_type,
- unsigned int nr_expect_results)
+ enum complex_layer_type_mask expect_layer_type, unsigned int nr_expect_results)
{
/* read only parameters */
pkt_parser->expect_layer_mask = expect_layer_type;
@@ -164,14 +144,14 @@ struct distributer * distributer_create(enum e_dist_mode distmode, enum e_hash_m
int distributer_rss_key_setup(struct distributer * dist_object, const uint8_t * key);
static inline int distributer_calculate_from_parser_results(struct distributer * dist_object, struct rte_mbuf * mbufs[],
- struct pkt_parser_result * parser_results[],
- unsigned int nr_mbufs)
+ struct pkt_parser_result * parser_results[],
+ unsigned int nr_mbufs)
{
return dist_object->fn_distributer(dist_object, mbufs, parser_results, nr_mbufs);
}
static inline int distributer_calculate(struct distributer * dist_object, struct rte_mbuf * mbufs[],
- unsigned int nr_mbufs)
+ unsigned int nr_mbufs)
{
struct pkt_parser parser_handlers[nr_mbufs];
struct pkt_parser_result parser_results[nr_mbufs];
@@ -244,8 +224,8 @@ static inline void * complex_layer_jump_to_innermost(struct pkt_parser_result *
return NULL;
}
-static inline int complex_layer_type_expect(struct pkt_parser_result * pkt_parser_result, const uint16_t * expect_layers,
- unsigned int nr_expect_layers)
+static inline int complex_layer_type_expect(struct pkt_parser_result * pkt_parser_result,
+ const uint16_t * expect_layers, unsigned int nr_expect_layers)
{
unsigned int iter_pkt_result;
unsigned int iter_expect_layers;
diff --git a/infra/src/dp_trace.c b/infra/src/dp_trace.c
index 94b2b8b..ea0a748 100644
--- a/infra/src/dp_trace.c
+++ b/infra/src/dp_trace.c
@@ -1,7 +1,6 @@
#include "dp_trace.h"
#include "common.h"
#include "mrb_define.h"
-#include "pcapng_proto.h"
#include <libgen.h>
#include <linux/if_ether.h>
#include <rte_common.h>
@@ -12,35 +11,8 @@
#include <sys/utsname.h>
#include <unistd.h>
-static void * dp_trace_save_thread(void * arg);
-static struct dp_trace_instance * dp_trace_instance_create();
-static int trace_file_mutex_lock(struct dp_trace_process * trace);
-static int dp_trace_file_mutex_unlock(struct dp_trace_process * trace);
static inline bool dp_trace_is_disable(struct dp_trace_process * trace);
-/* copy from dpdk-23.11.After upgrading dpdk, you can delete the following content */
-static struct rte_mbuf * dpdk_23_rte_pcapng_copy(uint16_t port_id, uint32_t queue, const struct rte_mbuf * md,
- struct rte_mempool * mp, uint32_t length,
- enum rte_pcapng_direction direction, const char * comment);
-static int dpdk_23_pcapng_vlan_insert(struct rte_mbuf * m, uint16_t ether_type, uint16_t tci);
-static char * os_info_get(void);
-static void dp_trace_file_merge(struct dp_trace_process * trace);
-
-#if 0
-static bool is_directory_exists(const char * path)
-{
- struct stat info;
- if (stat(path, &info) != 0)
- return false;
- return S_ISDIR(info.st_mode) == 1;
-}
-#endif
-
-static bool is_file_exists(const char * path)
-{
- return access(path, F_OK) == 0;
-}
-
static struct dp_trace_instance * dp_trace_instance_create()
{
static_assert(DP_TRACE_JOB_NUM_MAX <= 16, "DP_TRACE_JOB_NUM_MAX must be no greater than 16");
@@ -59,21 +31,21 @@ static struct dp_trace_instance * dp_trace_instance_create()
MR_VERIFY_MALLOC(instance->pool);
MR_VERIFY_MALLOC(instance->dump_pool);
- pthread_mutexattr_t attr;
- pthread_mutexattr_init(&attr);
- pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
- pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
- pthread_mutex_init(&instance->trace_file_mutex, &attr);
-
for (unsigned int i = 0; i < DP_TRACE_JOB_NUM_MAX; i++)
{
instance->job_ctx[i].job_id = 1 << (i);
}
- instance->ring = rte_ring_create(DP_TRACE_RING_NAME, DP_TRACE_RING_SIZE_MAX, rte_socket_id(), RING_F_SC_DEQ);
- MR_VERIFY_MALLOC(instance->ring);
+ instance->nr_ring = DP_TRACE_RING_NUM;
+ for (unsigned int i = 0; i < instance->nr_ring; i++)
+ {
+ char ring_name[64];
+ snprintf(ring_name, sizeof(ring_name), "%s_%u", DP_TRACE_RING_NAME, i);
+ instance->ring[i] = rte_ring_create(ring_name, DP_TRACE_RING_SIZE_MAX, rte_socket_id(), 0);
+ MR_VERIFY_MALLOC(instance->ring[i]);
+ }
- instance->enable = false;
+ instance->enable = 0;
return instance;
}
@@ -83,21 +55,6 @@ struct dp_trace_process * dp_trace_process_create_in_serv(void)
struct dp_trace_process * trace = ZMALLOC(sizeof(struct dp_trace_process));
MR_VERIFY_MALLOC(trace);
trace->inst = dp_trace_instance_create();
-
- if (sem_init(&trace->sem, 0, 0) < 0)
- {
- MR_ERROR("sem init fail.");
- return NULL;
- }
-
- rte_atomic16_set(&trace->save_thread_still_run, 1);
-
- if (pthread_create(&trace->save_trace_file_thread, NULL, dp_trace_save_thread, trace) != 0)
- {
- MR_ERROR("Failed to create thread to save trace: %s", strerror(errno));
- return NULL;
- }
-
return trace;
}
@@ -106,24 +63,28 @@ struct dp_trace_process * dp_trace_process_create_in_app(void)
struct dp_trace_process * trace = ZMALLOC(sizeof(struct dp_trace_process));
MR_VERIFY_MALLOC(trace);
+ struct rte_mp_msg req_msg = {};
+ snprintf(req_msg.name, sizeof(req_msg.name), "%s", DP_TRACE_MP_MSG_NAME);
+ struct dp_trace_req * dp_trace_req = (struct dp_trace_req *)req_msg.param;
+ req_msg.len_param = sizeof(struct dp_trace_req);
+
+ dp_trace_req->action = DP_TRACE_INSTANCE_GET;
+
struct rte_mp_reply mp_reply;
const struct timespec wait_timespec = {
.tv_nsec = 0,
.tv_sec = 30,
};
- struct rte_mp_msg req = {};
- snprintf(req.name, sizeof(req.name), "%s", "data_path_trace");
- int ret = rte_mp_request_sync(&req, &mp_reply, &wait_timespec);
+ int ret = rte_mp_request_sync(&req_msg, &mp_reply, &wait_timespec);
if (ret < 0)
{
MR_WARNING("Failed to execute rte_mp_request_sync:%s", rte_strerror(rte_errno));
goto err;
}
- uintptr_t stored_ptr_address;
- memcpy(&stored_ptr_address, mp_reply.msgs->param, sizeof(uintptr_t));
- trace->inst = (struct dp_trace_instance *)stored_ptr_address;
+ struct dp_trace_resp * dp_trace_resp = (struct dp_trace_resp *)mp_reply.msgs->param;
+ trace->inst = dp_trace_resp->trace_instance;
free(mp_reply.msgs);
return trace;
@@ -134,10 +95,10 @@ err:
return NULL;
}
-struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_tpye)
+struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type process_type)
{
struct dp_trace_process * trace;
- if (process_tpye == DP_TRACE_PROCESS_MARSIO)
+ if (process_type == DP_TRACE_PROCESS_MARSIO)
{
trace = dp_trace_process_create_in_serv();
}
@@ -153,7 +114,11 @@ struct dp_trace_process * dp_trace_process_create(enum dp_trace_process_type pro
}
// Currently, all traces use the same ring
- trace->ring = trace->inst->ring;
+ trace->nr_ring = trace->inst->nr_ring;
+ for (unsigned int i = 0; i < trace->nr_ring; i++)
+ {
+ trace->ring[i] = trace->inst->ring[i];
+ }
return trace;
@@ -183,7 +148,7 @@ void dp_trace_job_clean(struct dp_trace_job_ctx * ctx)
}
}
-int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc, unsigned int job_index)
+int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_desc * desc)
{
// Find a job that is not being used.pkt_cnt
// Recycle the jobs added that have been terminated
@@ -192,15 +157,24 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
struct dp_trace_job_ctx * ctx = NULL;
pcap_t * pcap_handle = NULL;
struct bpf_program fp = {};
+ int16_t err_code = DP_TRACE_SUCCESS;
- if (instance->job_ctx[job_index].used == false)
+ unsigned int rule_index = desc->rule_index;
+ if (rule_index >= DP_TRACE_JOB_NUM_MAX)
{
- ctx = &instance->job_ctx[job_index];
+ err_code = DP_TRACE_ERROR_ILLEGAL_JOB_ID;
+ goto err;
+ }
+
+ if (instance->job_ctx[rule_index].used == false)
+ {
+ ctx = &instance->job_ctx[rule_index];
}
if (ctx == NULL)
{
MR_ERROR("Not enough job id");
+ err_code = DP_TRACE_ERROR_JOB_ID_IN_USED;
goto err;
}
@@ -208,6 +182,7 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
if (pcap_compile(pcap_handle, &fp, desc->bpf_expr, 0, PCAP_NETMASK_UNKNOWN) < 0)
{
MR_ERROR("pcap_compile execution failed: %s", pcap_geterr(pcap_handle));
+ err_code = DP_TRACE_ERROR_ILLEGAL_BPF_EXPR;
goto err;
}
copy_bpf_prog(ctx, fp);
@@ -215,21 +190,24 @@ int dp_trace_job_add(struct dp_trace_process * trace, const struct dp_trace_job_
pcap_close(pcap_handle);
ctx->desc.enable = desc->enable;
+ ctx->desc.measurement_type = desc->measurement_type;
snprintf(ctx->desc.bpf_expr, sizeof(ctx->desc.bpf_expr), "%s", desc->bpf_expr);
ctx->desc.pkt_cnt_max = desc->pkt_cnt_max;
ctx->desc.sampling = desc->sampling;
ctx->desc.snaplen = desc->snaplen;
ctx->used = true;
+ instance->nr_job_ctx++;
+ MR_INFO("[add job:%u] bpf_expr: %s type: %u", rule_index, ctx->desc.bpf_expr, ctx->desc.measurement_type);
- return 0;
+ return err_code;
err:
pcap_freecode(&fp);
if (pcap_handle != NULL)
pcap_close(pcap_handle);
dp_trace_job_clean(ctx);
- return -1;
+ return err_code;
}
uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int offset,
@@ -237,6 +215,7 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
{
assert(lcore_id < RTE_MAX_LCORE);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
struct dp_trace_instance * instance = trace->inst;
@@ -266,12 +245,12 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
// packet.
// unlimit: ctx->desc.pkt_cnt_max == 0
target_packet = true;
- if (ctx->desc.pkt_cnt_max != 0 && statistics->filter_exec_hit >= ctx->desc.pkt_cnt_max)
+ if (ctx->desc.pkt_cnt_max != 0 && statistics->filter_exec_hit[i] >= ctx->desc.pkt_cnt_max)
{
statistics->reach_pkt_cnt_limit++;
continue;
}
- if (statistics->filter_exec_hit % ctx->desc.sampling == 0)
+ if (statistics->filter_exec_hit[i] % ctx->desc.sampling == 0)
{
// match every sampling packet
match_jobs = match_jobs | ctx->job_id;
@@ -280,26 +259,28 @@ uint16_t dp_trace_filter_exec_jobs_get(struct dp_trace_process * trace, struct r
{
*snaplen = ctx->desc.snaplen;
}
+ mrb_meta->measurement_type |= ctx->desc.measurement_type;
}
+ statistics->filter_exec_hit[i]++;
}
}
- if (unlikely(target_packet))
- {
- statistics->filter_exec_hit++;
- }
- else
+ if (unlikely(target_packet == false))
{
statistics->filter_exec_miss++;
}
+ if (likely(match_jobs == 0))
+ {
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
+ }
return match_jobs;
}
job_bitmap_t dp_trace_job_id_bitmap_get(struct dp_trace_process * trace, struct rte_mbuf * mbuf)
{
RTE_SET_USED(trace);
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer == NULL)
@@ -316,21 +297,28 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
assert(lcore_id < RTE_MAX_LCORE);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (mrb_meta->dp_trace_can_emit != 0)
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
+ if (mrb_meta->measurement_type != DP_TRACE_MEASUREMENT_TYPE_UNKNOW)
+ {
+ return;
+ }
+
+ // Optimization: When there is no job, return directly
+ if (likely(trace->inst->nr_job_ctx == 0))
{
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
return;
}
- if (likely(dp_trace_is_disable(trace)))
+ if (unlikely(dp_trace_is_disable(trace)))
{
- mrb_meta->dp_trace_can_emit = -1;
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNMATCH;
return;
}
struct pkt_parser_result * pkt_parser_result = &mrb_meta->pkt_parser_result;
job_bitmap_t match_jobs = 0;
- mrb_meta->dp_trace_can_emit = -1;
+ mrb_meta->measurement_type = DP_TRACE_MEASUREMENT_TYPE_UNKNOW;
if (unlikely(pkt_parser_result->nr_layers == 0))
{
@@ -361,12 +349,12 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
unsigned header_size = sizeof(struct dp_trace_buffer);
memset(dp_trace_buffer, 0, header_size);
+ dp_trace_buffer->buffer_refcnt = 1;
dp_trace_buffer->buffer_len = DP_TRACE_RECORD_SIZE - header_size;
dp_trace_buffer->inst = trace->inst;
dp_trace_buffer->jobs = match_jobs;
dp_trace_buffer->snaplen = snaplen;
statistics->record_buf_alloc_success++;
- mrb_meta->dp_trace_can_emit = 1;
}
}
}
@@ -374,13 +362,9 @@ void dp_trace_filter_exec(struct dp_trace_process * trace, struct rte_mbuf * mbu
int dp_trace_record_emit_str(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id,
const struct dp_trace_record_meta * meta, const char * str)
{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- if (mrb_meta->dp_trace_can_emit == 0)
- {
- dp_trace_filter_exec(trace, mbuf, 0, lcore_id);
- }
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
- if (mrb_meta->dp_trace_can_emit == -1)
+ if ((mrb_meta->measurement_type & DP_TRACE_MEASUREMENT_TYPE_MATCHED) == 0)
{
return 0;
}
@@ -425,14 +409,14 @@ int dp_trace_record_emit_fmt(struct dp_trace_process * trace, struct rte_mbuf *
void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mbuf, unsigned int lcore_id)
{
- // infra_dp_trace_record_write(mbuf);
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer != NULL)
{
rte_mbuf_refcnt_update(mbuf, 1);
- int ret = rte_ring_enqueue(trace->ring, (void *)mbuf);
+ uint16_t ring_id = mbuf->hash.usr % trace->nr_ring;
+ int ret = rte_ring_enqueue(trace->ring[ring_id], (void *)mbuf);
assert(lcore_id < RTE_MAX_LCORE);
struct dp_trace_stat * statistics = &trace->statistics[lcore_id];
@@ -452,384 +436,6 @@ void dp_trace_record_write(struct dp_trace_process * trace, struct rte_mbuf * mb
}
}
-#if 0
-static void trace_filename_generate(struct dp_trace_process * trace)
-{
- // get trace file name
- time_t current_time;
- time(&current_time);
- char time_str[20];
- snprintf(time_str, sizeof(time_str), "%ld", current_time);
- char trace_file_name[256];
- snprintf(trace_file_name, sizeof(trace_file_name), "%s", time_str);
-
- // get trace path
- unsigned int trace_dir_len = strlen(trace->inst->trace_dir);
- if (trace->inst->trace_dir[trace_dir_len - 1] == '/')
- {
- trace->inst->trace_dir[trace_dir_len - 1] = '\0';
- trace_dir_len = strlen(trace->inst->trace_dir);
- }
-
- snprintf(trace->trace_file_path, sizeof(trace->trace_file_path), "%s/dp_trace_%s.pcapng", trace->inst->trace_dir,
- trace_file_name);
-}
-#endif
-
-static bool dp_trace_file_open(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
- if (likely(trace->inst->pcapng == NULL))
- {
- char * trace_file_path = strdup(trace->inst->trace_file_path);
- char * trace_file_dir = dirname(trace_file_path);
- int ret = mkdir(trace_file_dir, 0755);
- free(trace_file_path);
- if (ret != 0 && errno != EEXIST)
- {
- MR_ERROR("Failed to create directory:%s. errno is %d.", trace_file_dir, errno);
- goto end;
- }
-
- unlink(trace->inst->trace_file_path);
-
- int dumpfile_fd = open(trace->inst->trace_file_path, O_WRONLY | O_CREAT, 0640);
- char * os_info = os_info_get();
- trace->inst->pcapng = rte_pcapng_fdopen(dumpfile_fd, os_info, NULL, NULL, NULL);
- free(os_info);
- if (trace->inst->pcapng == NULL)
- {
- MR_ERROR("Failed to create data path trace file.");
- goto end;
- }
- }
- dp_trace_file_mutex_unlock(trace);
- return true;
-
-end:
- dp_trace_file_mutex_unlock(trace);
- return false;
-}
-
-static void dp_trace_file_close(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
- if (likely(trace->inst->pcapng != NULL))
- {
- rte_pcapng_close(trace->inst->pcapng);
- trace->inst->pcapng = NULL;
- }
- dp_trace_file_mutex_unlock(trace);
-}
-
-bool dp_trace_start(struct dp_trace_process * trace)
-{
- bool ret = false;
- trace_file_mutex_lock(trace);
- if (dp_trace_file_open(trace))
- {
- for (unsigned int i = 0; i < RTE_MAX_LCORE; i++)
- {
- struct dp_trace_stat * statistics = &trace->statistics[i];
- statistics->filter_exec_hit = 0;
- statistics->filter_exec_miss = 0;
- statistics->reach_pkt_cnt_limit = 0;
- statistics->record_buf_alloc_failed_no_mem = 0;
- statistics->record_buf_alloc_success = 0;
- statistics->record_emit_failed_trace_oversize = 0;
- statistics->record_emit_failed_no_space_in_buf = 0;
- statistics->record_emit_success = 0;
- statistics->ring_enqueue_failed = 0;
- statistics->ring_enqueue_success = 0;
- }
-
- struct dp_trace_saving_stat * saving_statistics = &trace->saving_statistics;
- saving_statistics->save_to_file_failed_at_pcapng_format = 0;
- saving_statistics->save_to_file_failed_at_write_to_disk = 0;
- saving_statistics->save_to_file_failed_other = 0;
- saving_statistics->save_to_file_success = 0;
-
- if (remove(trace->inst->trace_file_bak_path) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", trace->inst->trace_file_bak_path, strerror(errno));
- }
-
- trace->inst->enable = true;
- ret = true;
- }
- dp_trace_file_mutex_unlock(trace);
- return ret;
-}
-
-static inline bool dp_trace_is_disable(struct dp_trace_process * trace)
-{
- return trace->inst->enable == false;
-}
-
-void dp_trace_stop(struct dp_trace_process * trace)
-{
- trace->inst->enable = false;
-
- trace_file_mutex_lock(trace);
- dp_trace_file_close(trace);
- dp_trace_file_merge(trace);
- dp_trace_file_mutex_unlock(trace);
-}
-
-#if 0
-void new_filename_generate(const char * old_filename, char * new_filename, unsigned int buf_len, const char * suffix)
-{
- assert(strlen(old_filename) <= PATH_MAX);
-
- char * dot = strrchr(old_filename, '.');
- if (dot == NULL)
- {
- snprintf(new_filename, buf_len, "%s_%s", old_filename, suffix);
- }
- else
- {
- unsigned int name_len = dot - old_filename;
- assert(buf_len > name_len);
-
- strncpy(new_filename, old_filename, name_len);
- snprintf(new_filename + name_len, buf_len - name_len, "_%s%s", suffix, dot);
- }
-}
-#endif
-
-static void dp_trace_file_merge(struct dp_trace_process * trace)
-{
- if (!is_file_exists(trace->inst->trace_file_bak_path))
- {
- // Only one file, no need to merge
- return;
- }
-
- trace_file_mutex_lock(trace);
-
- char tmp_name[PATH_MAX];
- snprintf(tmp_name, sizeof(tmp_name), "%s.2", trace->inst->trace_file_path);
-
- dp_trace_file_close(trace);
-
- // If newpath already exists, it will be atomically replaced
- if (rename(trace->inst->trace_file_path, tmp_name) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", trace->inst->trace_file_path, tmp_name, strerror(errno));
- }
-
- char command[2 * PATH_MAX];
- snprintf(command, sizeof(command), "timeout -v %us mergecap -w %s %s %s 2>&1", trace->inst->trace_merge_timeout,
- trace->inst->trace_file_path, tmp_name, trace->inst->trace_file_bak_path);
- MR_INFO("merge trace file: %s", command);
-
- FILE * fp;
- char buffer[1024];
- fp = popen(command, "r");
- if (fp == NULL)
- {
- MR_ERROR("open pipe failed: %s", strerror(errno));
- goto err;
- }
-
- while (fgets(buffer, sizeof(buffer), fp) != NULL)
- {
- MR_ERROR("merge trace file output: %s", buffer);
- }
-
- pclose(fp);
-
- if (remove(tmp_name) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", tmp_name, strerror(errno));
- }
-
- if (remove(trace->inst->trace_file_bak_path) < 0)
- {
- MR_ERROR("remove %s failed. error info: %s", trace->inst->trace_file_bak_path, strerror(errno));
- }
-
- dp_trace_file_mutex_unlock(trace);
- return;
-
-err:
- if (rename(tmp_name, trace->inst->trace_file_path) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", tmp_name, trace->inst->trace_file_path, strerror(errno));
- }
-
- dp_trace_file_mutex_unlock(trace);
- return;
-}
-
-static void dp_trace_file_rollbak(struct dp_trace_process * trace)
-{
- trace_file_mutex_lock(trace);
-
- const char * cur_filename = trace->inst->trace_file_path;
- char * bak_filename = trace->inst->trace_file_bak_path;
- unsigned int bak_filename_len = sizeof(trace->inst->trace_file_bak_path);
-
- dp_trace_file_close(trace);
-
- snprintf(bak_filename, bak_filename_len, "%s.1", cur_filename);
-
- if (rename(cur_filename, bak_filename) < 0)
- {
- MR_ERROR("rename %s to %s failed. error info: %s", cur_filename, bak_filename, strerror(errno));
- }
-
- dp_trace_file_open(trace);
-
- dp_trace_file_mutex_unlock(trace);
-}
-
-static bool dp_trace_file_reach_max_size(struct dp_trace_process * trace)
-{
- // max_size == 0 : unlimit write
- trace_file_mutex_lock(trace);
-
- unsigned int max_size = trace->inst->trace_file_max_size / 2;
-
- if (max_size == 0)
- {
- dp_trace_file_mutex_unlock(trace);
- return false;
- }
-
- struct stat file_stat;
- if (unlikely(stat(trace->inst->trace_file_path, &file_stat) == -1))
- {
- MR_ERROR("Failed to obtain data path trace file status.");
- dp_trace_file_mutex_unlock(trace);
- return true;
- }
-
- // unit is B -> KB
- if ((file_stat.st_size >> 10) >= max_size)
- {
- dp_trace_file_mutex_unlock(trace);
- return true;
- }
-
- dp_trace_file_mutex_unlock(trace);
- return false;
-}
-
-static void * dp_trace_save_thread(void * arg)
-{
- struct dp_trace_process * trace = (struct dp_trace_process *)arg;
- mr_thread_setname(pthread_self(), "dp_trace_save");
- MR_DEBUG("start data path trace save thread");
- char * decode_trace_buf = ZMALLOC(DP_TRACE_RECORD_SIZE);
- MR_VERIFY_MALLOC(decode_trace_buf);
-
- while (rte_atomic16_read(&trace->save_thread_still_run) == 1)
- {
- sem_wait(&trace->sem);
-
- struct dp_trace_saving_stat * saving_statistics = &trace->saving_statistics;
-
- if (rte_atomic16_read(&trace->save_thread_still_run) == 0)
- {
- break;
- }
-
- struct rte_mbuf * records[DP_TRACE_RING_SIZE_MAX];
- struct rte_mbuf * pcapng_records[DP_TRACE_RING_SIZE_MAX];
- int n = 0;
-
- if (dp_trace_is_disable(trace) &&
- (n = rte_ring_dequeue_burst(trace->ring, (void **)records, DP_TRACE_RING_SIZE_MAX, NULL)) != 0)
- {
- // Consume the semaphore; avoid reacquiring the lock;
- // Discard the trace data that entered the ring after stop
- for (unsigned int i = 0; i < n; i++)
- {
- struct rte_mbuf * mbuf = records[i];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
-
- rte_mempool_put(trace->inst->pool, (void *)dp_trace_buffer);
- saving_statistics->save_to_file_failed_other++;
- }
- rte_pktmbuf_free_bulk(records, n);
- continue;
- }
-
- trace_file_mutex_lock(trace);
- while ((n = rte_ring_dequeue_burst(trace->ring, (void **)records, DP_TRACE_RING_SIZE_MAX, NULL)) != 0)
- {
- int copy_cnt = 0;
-
- for (unsigned int i = 0; i < n; i++)
- {
- int ret = 0;
- struct rte_mbuf * mbuf = records[i];
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
-
- ret = dp_trace_record_decode(mbuf, decode_trace_buf, DP_TRACE_RECORD_SIZE);
- if (unlikely(ret < 0))
- {
- saving_statistics->save_to_file_failed_other++;
- goto release_buffer;
- }
-
- struct rte_mbuf * pkt =
- dpdk_23_rte_pcapng_copy(0, 0, mbuf, trace->inst->dump_pool, dp_trace_buffer->snaplen,
- RTE_PCAPNG_DIRECTION_UNKNOWN, decode_trace_buf);
- if (pkt == NULL)
- {
- saving_statistics->save_to_file_failed_at_pcapng_format++;
- }
- else
- {
- pcapng_records[copy_cnt++] = pkt;
- }
-
- release_buffer:
- rte_mempool_put(trace->inst->pool, (void *)dp_trace_buffer);
- }
- rte_pktmbuf_free_bulk(records, n);
-
- if (trace->inst->pcapng != NULL &&
- rte_pcapng_write_packets(trace->inst->pcapng, pcapng_records, copy_cnt) >= 0)
- {
- saving_statistics->save_to_file_success += copy_cnt;
- }
- else
- {
- saving_statistics->save_to_file_failed_at_write_to_disk += copy_cnt;
- }
- // doc say: The mbuf's in pkts are always freed
- // But in fact rte_pcapng_write_packets does not release mbuf
- rte_pktmbuf_free_bulk(pcapng_records, copy_cnt);
-
- // When the file is full, stop tagging data packets, close the file
- // OR in the configuration file, stop trace
- if (dp_trace_file_reach_max_size(trace))
- {
- dp_trace_file_rollbak(trace);
- }
- }
- dp_trace_file_mutex_unlock(trace);
- }
- FREE(decode_trace_buf);
- pthread_exit(NULL);
-}
-
-int dp_trace_record_flush(struct dp_trace_process * trace)
-{
- if (rte_ring_empty(trace->ring) == 1)
- {
- return 0;
- }
-
- sem_post(&trace->sem);
- return 0;
-}
-
int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
{
struct dp_trace_job_ctx * ctx;
@@ -839,6 +445,8 @@ int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
if (ctx->used && ((ctx->job_id & jobs) != 0))
{
dp_trace_job_clean(ctx);
+ trace->inst->nr_job_ctx--;
+ MR_INFO("[destroy job:%u] bpf_expr: %s", i, ctx->desc.bpf_expr);
}
}
@@ -847,7 +455,7 @@ int dp_trace_jobs_destroy(struct dp_trace_process * trace, job_bitmap_t jobs)
int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_meta * meta, const char * str)
{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
unsigned int left = dp_trace_buffer->buffer_len - dp_trace_buffer->buffer_used;
@@ -863,6 +471,7 @@ int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_
struct dp_trace_record_header * record_header =
(struct dp_trace_record_header *)(dp_trace_buffer->buffer + dp_trace_buffer->buffer_used);
+ record_header->measurement_type = meta->measurement_type;
snprintf(record_header->appsym, sizeof(record_header->appsym), "%s", meta->appsym);
snprintf(record_header->module, sizeof(record_header->module), "%s", meta->module);
@@ -879,286 +488,19 @@ int dp_trace_record_encode(struct rte_mbuf * mbuf, const struct dp_trace_record_
return 0;
}
-int dp_trace_record_decode(struct rte_mbuf * mbuf, char * dst, unsigned int size)
-{
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
- struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
- unsigned int position = 0;
-
-#ifndef NDEBUG
- unsigned int comment_cnt = 0;
-#endif
-
- while (position < dp_trace_buffer->buffer_used)
- {
- char * cur = dp_trace_buffer->buffer + position;
-
- const struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur);
- const char * str = cur + sizeof(struct dp_trace_record_header);
- const unsigned int str_len = record_header->recode_len;
-
- int n = snprintf(dst, size, "[%s:%s:] %ld.%ld ", record_header->appsym, record_header->module,
- record_header->ts.tv_sec, record_header->ts.tv_nsec);
- if (unlikely(n < 0 || n >= size))
- return -1;
- size -= n;
- dst += n;
-
- if (unlikely(size - 2 < str_len))
- return -1;
- memcpy(dst, str, str_len);
- size -= str_len;
- dst += str_len;
-
- *dst = '\n';
- size--;
- dst++;
-
- position += sizeof(struct dp_trace_record_header) + str_len;
-#ifndef NDEBUG
- comment_cnt++;
-#endif
- }
-
- if (size < 1)
- return -1;
-
-#ifndef NDEBUG
- uint16_t avali = dp_trace_buffer->buffer_len - dp_trace_buffer->buffer_used;
- snprintf(dst, size, "used: %u, avali: %u, comment: %u", dp_trace_buffer->buffer_used, avali, comment_cnt);
-#else
- *dst = '\0';
-#endif
-
- return 0;
-}
-
-int trace_file_mutex_lock(struct dp_trace_process * trace)
-{
- int ret = pthread_mutex_lock(&trace->inst->trace_file_mutex);
- if (ret == EOWNERDEAD)
- {
- ret = pthread_mutex_consistent(&trace->inst->trace_file_mutex);
- ret = pthread_mutex_unlock(&trace->inst->trace_file_mutex);
- if (ret != 0)
- {
- MR_ERROR("EOWNERDEAD -> job ctx unlock failed");
- return -1;
- }
- }
- else if (ret != 0)
- {
- MR_ERROR("job ctx lock failed");
- return -1;
- }
- return 0;
-}
-
-int dp_trace_file_mutex_unlock(struct dp_trace_process * trace)
-{
- return pthread_mutex_unlock(&trace->inst->trace_file_mutex);
-}
-
-/* copy from pdump.c */
-/* length of option including padding */
-static uint16_t pcapng_optlen(uint16_t len)
-{
- return RTE_ALIGN(sizeof(struct pcapng_option) + len, sizeof(uint32_t));
-}
-
-static struct pcapng_option * pcapng_add_option(struct pcapng_option * popt, uint16_t code, const void * data,
- uint16_t len)
-{
- popt->code = code;
- popt->length = len;
- memcpy(popt->data, data, len);
-
- return (struct pcapng_option *)((uint8_t *)popt + pcapng_optlen(len));
-}
-
-static char * os_info_get(void)
-{
- struct utsname uts;
- char * os_name = NULL;
-
- if (uname(&uts) < 0)
- return NULL;
-
- if (asprintf(&os_name, "%s %s", uts.sysname, uts.release) == -1)
- return NULL;
-
- return os_name;
-}
-
-/* Make a copy of original mbuf with pcapng header and options */
-static struct rte_mbuf * dpdk_23_rte_pcapng_copy(uint16_t port_id, uint32_t queue, const struct rte_mbuf * md,
- struct rte_mempool * mp, uint32_t length,
- enum rte_pcapng_direction direction, const char * comment)
-{
- struct pcapng_enhance_packet_block * epb;
- uint32_t orig_len, data_len, padding, flags;
- struct pcapng_option * opt;
- uint64_t timestamp;
- uint16_t optlen;
- struct rte_mbuf * mc;
- bool rss_hash;
-
-#ifdef RTE_LIBRTE_ETHDEV_DEBUG
- RTE_ETH_VALID_PORTID_OR_ERR_RET(port_id, NULL);
-#endif
- orig_len = rte_pktmbuf_pkt_len(md);
-
- /* Take snapshot of the data */
- mc = rte_pktmbuf_copy(md, mp, 0, length);
- if (unlikely(mc == NULL))
- return NULL;
-
- /* Expand any offloaded VLAN information */
- if ((direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_VLAN_STRIPPED)) ||
- (direction == RTE_PCAPNG_DIRECTION_OUT && (md->ol_flags & RTE_MBUF_F_TX_VLAN)))
- {
- if (dpdk_23_pcapng_vlan_insert(mc, RTE_ETHER_TYPE_VLAN, md->vlan_tci) != 0)
- goto fail;
- }
-
- if ((direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_QINQ_STRIPPED)) ||
- (direction == RTE_PCAPNG_DIRECTION_OUT && (md->ol_flags & RTE_MBUF_F_TX_QINQ)))
- {
- if (dpdk_23_pcapng_vlan_insert(mc, RTE_ETHER_TYPE_QINQ, md->vlan_tci_outer) != 0)
- goto fail;
- }
-
- /* record HASH on incoming packets */
- rss_hash = (direction == RTE_PCAPNG_DIRECTION_IN && (md->ol_flags & RTE_MBUF_F_RX_RSS_HASH));
-
- /* pad the packet to 32 bit boundary */
- data_len = rte_pktmbuf_data_len(mc);
- padding = RTE_ALIGN(data_len, sizeof(uint32_t)) - data_len;
- if (padding > 0)
- {
- void * tail = rte_pktmbuf_append(mc, padding);
-
- if (tail == NULL)
- goto fail;
- memset(tail, 0, padding);
- }
-
- optlen = pcapng_optlen(sizeof(flags));
- optlen += pcapng_optlen(sizeof(queue));
- if (rss_hash)
- optlen += pcapng_optlen(sizeof(uint8_t) + sizeof(uint32_t));
-
- if (comment)
- optlen += pcapng_optlen(strlen(comment));
-
- /* reserve trailing options and block length */
- opt = (struct pcapng_option *)rte_pktmbuf_append(mc, optlen + sizeof(uint32_t));
- if (unlikely(opt == NULL))
- goto fail;
-
- switch (direction)
- {
- case RTE_PCAPNG_DIRECTION_IN:
- flags = PCAPNG_IFB_INBOUND;
- break;
- case RTE_PCAPNG_DIRECTION_OUT:
- flags = PCAPNG_IFB_OUTBOUND;
- break;
- default:
- flags = 0;
- }
-
- opt = pcapng_add_option(opt, PCAPNG_EPB_FLAGS, &flags, sizeof(flags));
-
- opt = pcapng_add_option(opt, PCAPNG_EPB_QUEUE, &queue, sizeof(queue));
-
- if (rss_hash)
- {
- uint8_t hash_opt[5];
-
- /* The algorithm could be something else if
- * used rte_flow_action_rss; but the current API does not
- * have a way for ethdev to report this on a per-packet basis.
- */
- hash_opt[0] = PCAPNG_HASH_TOEPLITZ;
-
- memcpy(&hash_opt[1], &md->hash.rss, sizeof(uint32_t));
- opt = pcapng_add_option(opt, PCAPNG_EPB_HASH, &hash_opt, sizeof(hash_opt));
- }
-
- if (comment)
- opt = pcapng_add_option(opt, PCAPNG_OPT_COMMENT, comment, strlen(comment));
-
- /* Note: END_OPT necessary here. Wireshark doesn't do it. */
-
- /* Add PCAPNG packet header */
- epb = (struct pcapng_enhance_packet_block *)rte_pktmbuf_prepend(mc, sizeof(*epb));
- if (unlikely(epb == NULL))
- goto fail;
-
- epb->block_type = PCAPNG_ENHANCED_PACKET_BLOCK;
- epb->block_length = rte_pktmbuf_data_len(mc);
-
- /* Interface index is filled in later during write */
- mc->port = port_id;
-
- /* Put timestamp in cycles here - adjust in packet write */
- // timestamp = rte_get_tsc_cycles();
-
- // Modification: use system time
- struct timespec current_time;
- clock_gettime(CLOCK_REALTIME, &current_time);
- timestamp = (uint64_t)current_time.tv_sec * 1000000000 + current_time.tv_nsec;
-
- epb->timestamp_hi = timestamp >> 32;
- epb->timestamp_lo = (uint32_t)timestamp;
- epb->capture_length = data_len;
- epb->original_length = orig_len;
-
- /* set trailer of block length */
- *(uint32_t *)opt = epb->block_length;
-
- return mc;
-
-fail:
- rte_pktmbuf_free(mc);
- return NULL;
-}
-
-static int dpdk_23_pcapng_vlan_insert(struct rte_mbuf * m, uint16_t ether_type, uint16_t tci)
+static inline bool dp_trace_is_disable(struct dp_trace_process * trace)
{
- struct rte_ether_hdr *nh, *oh;
- struct rte_vlan_hdr * vh;
-
- if (!RTE_MBUF_DIRECT(m) || rte_mbuf_refcnt_read(m) > 1)
- return -EINVAL;
-
- if (rte_pktmbuf_data_len(m) < sizeof(*oh))
- return -EINVAL;
-
- oh = rte_pktmbuf_mtod(m, struct rte_ether_hdr *);
- nh = (struct rte_ether_hdr *)rte_pktmbuf_prepend(m, sizeof(struct rte_vlan_hdr));
- if (nh == NULL)
- return -ENOSPC;
-
- memmove(nh, oh, 2 * RTE_ETHER_ADDR_LEN);
- nh->ether_type = rte_cpu_to_be_16(ether_type);
-
- vh = (struct rte_vlan_hdr *)(nh + 1);
- vh->vlan_tci = rte_cpu_to_be_16(tci);
-
- return 0;
+ return trace->inst->enable == 0;
}
static void dp_trace_buffer_free(struct rte_mbuf * mbuf)
{
if (unlikely(mbuf == NULL))
return;
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(mbuf, 1);
+ struct mrb_metadata * mrb_meta = rte_mbuf_to_priv(mbuf);
struct dp_trace_buffer * dp_trace_buffer = mrb_meta->dp_trace_buffer;
if (dp_trace_buffer != NULL)
{
- // rte_atomic64_inc(&dp_trace_buffer->inst->statistics.uncategorized_failed);
rte_mempool_put(dp_trace_buffer->inst->pool, (void *)dp_trace_buffer);
}
}
@@ -1169,6 +511,7 @@ void infra_rte_pktmbuf_free(struct rte_mbuf * mbuf)
rte_pktmbuf_free(mbuf);
}
+#if 0
void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count)
{
for (unsigned int idx = 0; idx < count; idx++)
@@ -1176,4 +519,20 @@ void infra_rte_pktmbuf_free_bulk(struct rte_mbuf ** mbufs, unsigned int count)
dp_trace_buffer_free(mbufs[idx]);
}
rte_pktmbuf_free_bulk(mbufs, count);
+}
+#endif
+
+const char * dp_trace_strerror(unsigned int err)
+{
+ const static char * errlist[] = {"data path trace: operate successfully",
+ "data path trace: no enough job id",
+ "data path trace: job id is being used",
+ "data path trace: job id is illegal",
+ "data path trace: encountered an illegal expression",
+ "data path trace: unkonw error"};
+ if (err < DP_TRACE_ERROR_MAX)
+ {
+ return errlist[err];
+ }
+ return errlist[DP_TRACE_ERROR_MAX];
} \ No newline at end of file
diff --git a/infra/test/TestDataPathTrace.cc b/infra/test/TestDataPathTrace.cc
index 8b34a22..6306e42 100644
--- a/infra/test/TestDataPathTrace.cc
+++ b/infra/test/TestDataPathTrace.cc
@@ -85,8 +85,7 @@ class DataPathTraceTest : public testing::Test
{
// create trace
trace = dp_trace_process_create(DP_TRACE_PROCESS_MARSIO);
- snprintf(trace->inst->trace_file_path, sizeof(trace->inst->trace_file_path), "./dp_trace.pcapng");
- dp_trace_start(trace);
+ trace->inst->enable = true;
}
void TearDown() override
@@ -99,51 +98,27 @@ class DataPathTraceTest : public testing::Test
rte_eal_cleanup();
}
- void wait_save_trace_thread_close(struct dp_trace_process * trace)
- {
- rte_atomic16_set(&trace->save_thread_still_run, 0);
- sem_post(&trace->sem);
- pthread_join(trace->save_trace_file_thread, NULL);
- }
-
void dp_trace_process_destroy(struct dp_trace_process * trace, enum dp_trace_process_type process_tpye)
{
// This function is for unit testing only. This instance created by marsio or app should not be deleted
// manually. It accompanies the entire life cycle of the process.
- dp_trace_stop(trace);
- wait_save_trace_thread_close(trace);
if (process_tpye == DP_TRACE_PROCESS_MARSIO)
{
- sem_destroy(&trace->sem);
rte_mempool_free(trace->inst->pool);
rte_mempool_free(trace->inst->dump_pool);
- rte_ring_free(trace->inst->ring);
+ for (unsigned int i = 0; i < trace->inst->nr_ring; i++)
+ {
+ rte_ring_free(trace->inst->ring[i]);
+ }
FREE(trace->inst);
FREE(trace);
}
else
{
- sem_destroy(&trace->sem);
FREE(trace);
}
}
- bool file_contains_string(const std::string & filename, const std::string & searchString)
- {
- std::ifstream file(filename.c_str());
- if (!file.is_open())
- return false;
-
- std::string line;
- while (std::getline(file, line))
- {
- if (line.find(searchString) != std::string::npos)
- return true;
- }
-
- return false;
- }
-
struct rte_mbuf * mbuf_construct(const unsigned char * pkt, unsigned int len)
{
if (trace == NULL)
@@ -151,7 +126,6 @@ class DataPathTraceTest : public testing::Test
struct rte_mbuf * pkt_mbuf = rte_pktmbuf_alloc(trace->inst->dump_pool);
unsigned int sz_align_priv = RTE_ALIGN(sizeof(struct mrb_metadata), RTE_MBUF_PRIV_ALIGN);
- // memset(((unsigned char *)pkt_mbuf + sizeof(struct rte_mbuf)), 0, sz_align_priv);
uint16_t data_off = sizeof(struct rte_mbuf) + sz_align_priv;
unsigned char * data_addr = (unsigned char *)pkt_mbuf->buf_addr + data_off;
@@ -177,7 +151,6 @@ class DataPathTraceTest : public testing::Test
}
struct dp_trace_process * trace;
- pthread_t save_trace_file_thread;
};
TEST_F(DataPathTraceTest, PackageConstruct)
@@ -214,24 +187,22 @@ TEST_F(DataPathTraceTest, InstanceCreate)
TEST_F(DataPathTraceTest, JobInit)
{
int n;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- n = dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ n = dp_trace_job_add(trace, &desc);
EXPECT_GE(n, 0);
EXPECT_EQ(trace->inst->job_ctx[0].used, true);
}
TEST_F(DataPathTraceTest, JobDestroy)
{
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
dp_trace_jobs_destroy(trace, 1 | 1 << 1);
- wait_save_trace_thread_close(trace);
-
EXPECT_EQ(trace->inst->job_ctx[0].used, false);
EXPECT_EQ(trace->inst->job_ctx[1].used, false);
}
@@ -242,13 +213,12 @@ TEST_F(DataPathTraceTest, BPFMatch)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
- ;
dp_trace_filter_exec(trace, pkt73_mbuf, offset, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
EXPECT_EQ(ret, 3);
@@ -264,8 +234,8 @@ TEST_F(DataPathTraceTest, BPFMatchNet)
{
struct rte_mbuf * pkt1_buf = mbuf_construct(pkt1, sizeof(pkt1));
- struct dp_trace_job_desc desc = {true, "net 0.0.0.0/0", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "net 0.0.0.0/0", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt1_buf, 0, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt1_buf);
@@ -285,11 +255,11 @@ TEST_F(DataPathTraceTest, BPFMatchExternalAndInternal)
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- struct dp_trace_job_desc desc = {true, "ip src 1.1.15.100", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ip src 1.1.15.100", 10, 1};
+ dp_trace_job_add(trace, &desc);
- struct dp_trace_job_desc desc_2 = {true, "vlan && ip src 125.33.49.137", 10, 1};
- dp_trace_job_add(trace, &desc_2, 1);
+ struct dp_trace_job_desc desc_2 = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "vlan && ip src 125.33.49.137", 10, 1};
+ dp_trace_job_add(trace, &desc_2);
dp_trace_filter_exec(trace, pkt73_mbuf, 0, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
@@ -308,8 +278,8 @@ TEST_F(DataPathTraceTest, BPFUnmatch)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "vlan && ip src 127.0.0.1", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "vlan && ip src 127.0.0.1", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt73_mbuf, offset, 0);
u_int16_t ret = dp_trace_job_id_bitmap_get(trace, pkt73_mbuf);
@@ -325,31 +295,37 @@ TEST_F(DataPathTraceTest, EmitTrace)
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 1);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 1, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
mrb_meta->packet_create_from_nf = 1;
memset(&mrb_meta->pkt_parser_result, 0, sizeof(mrb_meta->pkt_parser_result));
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
+ dp_trace_filter_exec(trace, pkt73_mbuf, 0, rte_lcore_id());
+
+ struct dp_trace_record_meta meta = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
- struct dp_trace_record_meta meta_2 = {"test", "emit", NULL};
+ struct dp_trace_record_meta meta_2 = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta_2, "def");
- char decode_record[256];
- dp_trace_record_decode(pkt73_mbuf, decode_record, 256);
- std::string str(decode_record);
- std::cout << "decode emit:" << str << std::endl;
+ struct dp_trace_buffer * dp_trace_buffer = (struct dp_trace_buffer *)mrb_meta->dp_trace_buffer;
+
+ char * cur = dp_trace_buffer->buffer;
+ struct dp_trace_record_header * record_header = (struct dp_trace_record_header *)(cur);
+ char * str = cur + sizeof(struct dp_trace_record_header);
+ unsigned int str_len = record_header->recode_len;
+ EXPECT_TRUE(std::string(str, str_len).find("abc") != std::string::npos);
- EXPECT_TRUE(str.find("abc") != std::string::npos);
- EXPECT_TRUE(str.find("def") != std::string::npos);
+ cur += sizeof(struct dp_trace_record_header) + str_len;
+ record_header = (struct dp_trace_record_header *)(cur);
+ str = cur + sizeof(struct dp_trace_record_header);
+ EXPECT_TRUE(std::string(str, str_len).find("def") != std::string::npos);
// Excessive content will not cause buffer overflow
std::string big_buf(DP_TRACE_RECORD_SIZE, 'a');
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta_2, big_buf.c_str());
- struct dp_trace_buffer * dp_trace_buffer = (struct dp_trace_buffer *)mrb_meta->dp_trace_buffer;
EXPECT_TRUE(dp_trace_buffer->buffer_used <= DP_TRACE_RECORD_SIZE);
infra_rte_pktmbuf_free(pkt73_mbuf);
@@ -361,42 +337,22 @@ TEST_F(DataPathTraceTest, StripToRing)
struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 10, 1};
+ dp_trace_job_add(trace, &desc);
dp_trace_filter_exec(trace, pkt73_mbuf, offset, rte_lcore_id());
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
+ struct dp_trace_record_meta meta = {DP_TRACE_MEASUREMENT_TYPE_TRACE, "test", "emit", NULL};
dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
dp_trace_record_write(trace, pkt73_mbuf, rte_lcore_id());
- EXPECT_TRUE(rte_ring_count(trace->ring) == 1);
-
- infra_rte_pktmbuf_free(pkt73_mbuf);
-}
-
-TEST_F(DataPathTraceTest, SaveTraceToFile)
-{
- struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
- struct mrb_metadata * mrb_meta = (struct mrb_metadata *)mrbuf_cz_data(pkt73_mbuf, 1);
- unsigned int offset = mrb_meta->pkt_parser_result.layers[4].offset;
-
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 10, 1};
- dp_trace_job_add(trace, &desc, 0);
-
- dp_trace_filter_exec(trace, pkt73_mbuf, offset, rte_lcore_id());
-
- struct dp_trace_record_meta meta = {"test", "emit", NULL};
- dp_trace_record_emit_str(trace, pkt73_mbuf, rte_lcore_id(), &meta, "abc");
-
- dp_trace_record_write(trace, pkt73_mbuf, rte_lcore_id());
-
- dp_trace_record_flush(trace);
-
- sleep(1);
-
- EXPECT_TRUE(file_contains_string(trace->inst->trace_file_path, "test:emit"));
+ unsigned int trace_record_cnt = 0;
+ for (unsigned int i = 0; i < trace->nr_ring; i++)
+ {
+ trace_record_cnt += rte_ring_count(trace->ring[i]);
+ }
+ EXPECT_TRUE(trace_record_cnt == 1);
infra_rte_pktmbuf_free(pkt73_mbuf);
}
@@ -405,8 +361,8 @@ TEST_F(DataPathTraceTest, MaxRecordCount)
{
unsigned int lcore_id = rte_lcore_id();
- struct dp_trace_job_desc desc = {true, "ether host 64:f6:9d:5f:b9:76", 2, 1};
- dp_trace_job_add(trace, &desc, 0);
+ struct dp_trace_job_desc desc = {true, DP_TRACE_MEASUREMENT_TYPE_TRACE, 0, "ether host 64:f6:9d:5f:b9:76", 2, 1};
+ dp_trace_job_add(trace, &desc);
struct rte_mbuf * pkt73_mbuf = mbuf_construct(pkt73, sizeof(pkt73));
dp_trace_filter_exec(trace, pkt73_mbuf, 0, 0);