summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authortongzongzhen <[email protected]>2024-05-06 18:42:40 +0800
committertongzongzhen <[email protected]>2024-05-06 18:42:40 +0800
commitd717475982e08f95f64306601689efbf374dbafa (patch)
tree71297cd81ce4936efeb90a3577fe8014108aebb7
parentfadf6b0309f0cd56ae1f8ad6a034043c8bdc00fc (diff)
When the program starts, clear old packets;dump ctx; Use the type in marsio.h to replace the custom role;
-rw-r--r--README.md6
-rw-r--r--doc/dp_telemetry_maat_job.json9
-rw-r--r--include/common.h2
-rw-r--r--include/job_ctx.h2
-rw-r--r--include/monit.h20
-rw-r--r--include/trace_output.h21
-rw-r--r--src/config.c2
-rw-r--r--src/job_ctx.c25
-rw-r--r--src/main.c11
-rw-r--r--src/monit.c4
-rw-r--r--src/trace_output.c25
11 files changed, 81 insertions, 46 deletions
diff --git a/README.md b/README.md
index 51cba91..05abd75 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,9 @@
-## 设计稿
+## 文档
-见: [Datapath Trace and Telemetry方案设计](https://docs.geedge.net/pages/viewpage.action?pageId=124754302)
+1. [Datapath Trace and Telemetry方案设计](https://docs.geedge.net/pages/viewpage.action?pageId=124754302)
+2. [Datapath Telemetry Job -- maat字段](https://docs.geedge.net/display/TSG/Datapath+Telemetry+Job)
+3. [Datapath Telemetry Record -- kafka发送内容](https://docs.geedge.net/pages/viewpage.action?pageId=129088240)
## 编译
diff --git a/doc/dp_telemetry_maat_job.json b/doc/dp_telemetry_maat_job.json
deleted file mode 100644
index 8abe0b8..0000000
--- a/doc/dp_telemetry_maat_job.json
+++ /dev/null
@@ -1,9 +0,0 @@
-{
- "job_id": "499a49aa-b4b9-4e8e-a66c-80d4cd40257b",
- "job_name": "test_2",
- "bpf_expr": "ether host 00:15:5d:b8:10:a6",
- "pkt_cnt_max": 100,
- "timeout": 120,
- "sampling": 2,
- "snaplen": 100
-} \ No newline at end of file
diff --git a/include/common.h b/include/common.h
index da3884f..12c71a0 100644
--- a/include/common.h
+++ b/include/common.h
@@ -43,8 +43,6 @@ extern unsigned int zlog_env_is_init;
} while (0)
#define DP_TRACE_NO_ROLE 0
-#define DP_TRACE_ROLE 1
-#define DP_TELEMETRY_ROLE 2
extern struct mr_instance * mr_instance;
diff --git a/include/job_ctx.h b/include/job_ctx.h
index aed399c..57a79e8 100644
--- a/include/job_ctx.h
+++ b/include/job_ctx.h
@@ -8,7 +8,7 @@ struct dp_trace_telemetry_desc
struct dp_trace_job_desc job_desc;
};
-void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role);
+void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc);
int job_id_to_index(job_bitmap_t job_id);
job_bitmap_t index_to_job_id(unsigned int index);
int is_job_id_used(job_bitmap_t job_id);
diff --git a/include/monit.h b/include/monit.h
index 94abc02..d833832 100644
--- a/include/monit.h
+++ b/include/monit.h
@@ -1,6 +1,24 @@
#pragma once
#include "common.h"
-#include "trace_output.h"
+
+struct record_saving_stat
+{
+ uint64_t recv_success;
+ uint64_t init_old_packet_drop;
+
+ uint64_t save_failed_at_job_deleted;
+ uint64_t save_failed_at_mutex_lock;
+
+ uint64_t save_to_kafka_failed_at_decode_messagepack;
+ uint64_t save_to_kafka_failed_at_send;
+ uint64_t save_to_kafka_success;
+
+ uint64_t save_to_file_failed_at_decode_to_str;
+ uint64_t save_to_file_failed_at_pcapng_open;
+ uint64_t save_to_file_failed_at_pcapng_format;
+ uint64_t save_to_file_failed_at_write_to_disk;
+ uint64_t save_to_file_success;
+};
struct monit
{
diff --git a/include/trace_output.h b/include/trace_output.h
index b1fbc9b..bf454a2 100644
--- a/include/trace_output.h
+++ b/include/trace_output.h
@@ -1,23 +1,8 @@
#pragma once
#include "common.h"
-
-struct record_saving_stat
-{
- uint64_t recv_success;
- uint64_t save_failed_at_job_deleted;
- uint64_t save_failed_at_mutex_lock;
-
- uint64_t save_to_kafka_failed_at_decode_messagepack;
- uint64_t save_to_kafka_failed_at_send;
- uint64_t save_to_kafka_success;
-
- uint64_t save_to_file_failed_at_decode_to_str;
- uint64_t save_to_file_failed_at_pcapng_open;
- uint64_t save_to_file_failed_at_pcapng_format;
- uint64_t save_to_file_failed_at_write_to_disk;
- uint64_t save_to_file_success;
-};
+#include "monit.h"
void dp_trace_output_init();
void * dp_trace_process_thread(void * arg);
-void dp_trace_pcapng_merger(job_bitmap_t job_id); \ No newline at end of file
+void dp_trace_pcapng_merger(job_bitmap_t job_id);
+void dp_trace_ring_clear(); \ No newline at end of file
diff --git a/src/config.c b/src/config.c
index 88600fc..c556ec5 100644
--- a/src/config.c
+++ b/src/config.c
@@ -191,7 +191,7 @@ void dynamic_config_load()
void dynamic_config_load_and_apply()
{
dynamic_config_load();
- job_rule_apply(g_conf->desc, g_conf->nr_desc, DP_TRACE_ROLE);
+ job_rule_apply(g_conf->desc, g_conf->nr_desc);
}
//////////////////////// helper function /////////////////////////
diff --git a/src/job_ctx.c b/src/job_ctx.c
index 75175de..57ea593 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -17,12 +17,15 @@ static struct dp_trace_job_occupy dp_trace_job_occupy[DP_TRACE_JOB_NUM_MAX] = {}
static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX] = {};
static int telemetry_unused_job_index_get();
+static void job_desc_dump(struct dp_trace_job_desc * desc);
-void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role)
+void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc)
{
int ret = 0;
for (unsigned int i = 0; i < nr_desc; i++)
{
+ job_desc_dump(&desc[i]);
+
uint8_t rule_index = desc[i].rule_index;
job_bitmap_t cur_job_id = index_to_job_id(rule_index);
uint8_t cur_job_uesd = is_job_id_used(cur_job_id);
@@ -38,7 +41,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8
else
{
dp_trace_job_occupy[rule_index].uesd = 1;
- dp_trace_job_occupy[rule_index].role = role;
+ dp_trace_job_occupy[rule_index].role = desc[i].measurement_type;
dzlog_info("add rule %u successfully. bpf_expr is: %s", rule_index, desc[i].bpf_expr);
}
continue;
@@ -58,7 +61,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8
dp_trace_job_occupy[rule_index].uesd = 0;
dp_trace_job_occupy[rule_index].role = DP_TRACE_NO_ROLE;
- if (tmp_role == DP_TRACE_ROLE)
+ if (tmp_role == DP_TRACE_MEASUREMENT_TYPE_TRACE)
{
dp_trace_pcapng_merger(cur_job_id);
}
@@ -128,7 +131,7 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke
memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc));
- job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE);
+ job_rule_apply(&telemetry_descs[index].job_desc, 1);
*ad = &telemetry_descs[index];
}
@@ -138,7 +141,7 @@ void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp)
struct dp_trace_telemetry_desc * telemetry_desc = *ad;
struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc;
job_desc->enable = false;
- job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE);
+ job_rule_apply(job_desc, 1);
return;
}
@@ -193,4 +196,16 @@ static int telemetry_unused_job_index_get()
}
return ret;
+}
+
+static void job_desc_dump(struct dp_trace_job_desc * desc)
+{
+ dzlog_info("dp trace job desc dump");
+ dzlog_info("enable:%u", desc->enable);
+ dzlog_info("measurement_type:%u", desc->measurement_type);
+ dzlog_info("rule_index:%u", desc->rule_index);
+ dzlog_info("bpf_expr:%s", desc->bpf_expr);
+ dzlog_info("pkt_cnt_max:%u", desc->pkt_cnt_max);
+ dzlog_info("sampling:%u", desc->sampling);
+ dzlog_info("snaplen:%u", desc->snaplen);
} \ No newline at end of file
diff --git a/src/main.c b/src/main.c
index d0c78e6..83d3c6c 100644
--- a/src/main.c
+++ b/src/main.c
@@ -125,8 +125,6 @@ int main(int argc, char * argv[])
http_serv_init();
- dp_trace_output_init();
-
mr_instance = marsio_create();
cpu_set_t cpu_set_io = conf->cpu_set_io;
@@ -136,19 +134,22 @@ int main(int argc, char * argv[])
ret = marsio_init(mr_instance, appsym);
DP_TRACE_VERIFY(ret >= 0, "marsio init failed.");
+ unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
+ monit_init(nr_thread);
+
+ dp_trace_ring_clear();
+ dp_trace_output_init();
+
dynamic_config_load_and_apply();
dp_trace_maat_init();
signal_event_init();
- unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io);
unsigned int ring_num = DP_TRACE_RING_NUM;
dzlog_info("thread count = %u", nr_thread);
dzlog_info("ring num = %u", ring_num);
- monit_init(nr_thread);
-
pthread_t tmp_pid[nr_thread];
for (int i = 0; i < nr_thread; i++)
diff --git a/src/monit.c b/src/monit.c
index f71361d..5441c5b 100644
--- a/src/monit.c
+++ b/src/monit.c
@@ -33,6 +33,8 @@ static void monit_dump()
for (unsigned int i = 0; i < monit->nr_stat; i++)
{
total_stat.recv_success += monit->savint_stats[i].recv_success;
+ total_stat.init_old_packet_drop += monit->savint_stats[i].init_old_packet_drop;
+
total_stat.save_failed_at_job_deleted += monit->savint_stats[i].save_failed_at_job_deleted;
total_stat.save_failed_at_mutex_lock += monit->savint_stats[i].save_failed_at_mutex_lock;
@@ -50,6 +52,8 @@ static void monit_dump()
struct cJSON * json_root = cJSON_CreateObject();
cJSON_AddNumberToObject(json_root, "recv_success", total_stat.recv_success);
+ cJSON_AddNumberToObject(json_root, "init_old_packet_drop", total_stat.init_old_packet_drop);
+
cJSON_AddNumberToObject(json_root, "save_failed_at_job_deleted", total_stat.save_failed_at_job_deleted);
cJSON_AddNumberToObject(json_root, "save_failed_at_mutex_lock", total_stat.save_failed_at_mutex_lock);
diff --git a/src/trace_output.c b/src/trace_output.c
index 98e57cd..73bbc75 100644
--- a/src/trace_output.c
+++ b/src/trace_output.c
@@ -156,7 +156,7 @@ void * dp_trace_process_thread(void * arg)
}
uint8_t role = job_id_role_get(index_to_job_id(i));
- if (role == DP_TRACE_ROLE)
+ if (role == DP_TRACE_MEASUREMENT_TYPE_TRACE)
{
for (unsigned int j = 0; j < nr_mbufs; j++)
{
@@ -165,7 +165,7 @@ void * dp_trace_process_thread(void * arg)
job_bitmap_t job_id = index_to_job_id(i);
cli_job_mbufs_write_process(tx_buff, nr_mbufs, job_id);
}
- else if (role == DP_TELEMETRY_ROLE)
+ else if (role == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY)
{
for (unsigned int j = 0; j < nr_mbufs; j++)
{
@@ -750,4 +750,25 @@ static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int
qids[i] = (++prefix_index);
(*nr_qids)++;
}
+}
+
+void dp_trace_ring_clear()
+{
+ unsigned int ring_clear_cnt = 0;
+ unsigned int nr_ring = DP_TRACE_RING_NUM;
+ marsio_buff_t * rx_buff[1024];
+
+ saving_stat = record_saving_stat_point_get(0);
+
+ for (unsigned int i = 0; i < nr_ring; i++)
+ {
+ unsigned int nr_recv = marsio_dp_trace_mbuf_recv_burst(mr_instance, i, rx_buff, TELEMETRY_DIM(rx_buff));
+ saving_stat->recv_success += nr_recv;
+ saving_stat->init_old_packet_drop += nr_recv;
+
+ marsio_dp_trace_mbuf_free(mr_instance, rx_buff, nr_recv);
+ ring_clear_cnt += nr_recv;
+ }
+
+ dzlog_info("The program starts and clears %u mbufs", ring_clear_cnt);
} \ No newline at end of file