diff options
| author | tongzongzhen <[email protected]> | 2024-05-06 18:42:40 +0800 |
|---|---|---|
| committer | tongzongzhen <[email protected]> | 2024-05-06 18:42:40 +0800 |
| commit | d717475982e08f95f64306601689efbf374dbafa (patch) | |
| tree | 71297cd81ce4936efeb90a3577fe8014108aebb7 | |
| parent | fadf6b0309f0cd56ae1f8ad6a034043c8bdc00fc (diff) | |
When the program starts, clear old packets;dump ctx; Use the type in marsio.h to replace the custom role;
| -rw-r--r-- | README.md | 6 | ||||
| -rw-r--r-- | doc/dp_telemetry_maat_job.json | 9 | ||||
| -rw-r--r-- | include/common.h | 2 | ||||
| -rw-r--r-- | include/job_ctx.h | 2 | ||||
| -rw-r--r-- | include/monit.h | 20 | ||||
| -rw-r--r-- | include/trace_output.h | 21 | ||||
| -rw-r--r-- | src/config.c | 2 | ||||
| -rw-r--r-- | src/job_ctx.c | 25 | ||||
| -rw-r--r-- | src/main.c | 11 | ||||
| -rw-r--r-- | src/monit.c | 4 | ||||
| -rw-r--r-- | src/trace_output.c | 25 |
11 files changed, 81 insertions, 46 deletions
@@ -1,7 +1,9 @@ -## 设计稿 +## 文档 -见: [Datapath Trace and Telemetry方案设计](https://docs.geedge.net/pages/viewpage.action?pageId=124754302) +1. [Datapath Trace and Telemetry方案设计](https://docs.geedge.net/pages/viewpage.action?pageId=124754302) +2. [Datapath Telemetry Job -- maat字段](https://docs.geedge.net/display/TSG/Datapath+Telemetry+Job) +3. [Datapath Telemetry Record -- kafka发送内容](https://docs.geedge.net/pages/viewpage.action?pageId=129088240) ## 编译 diff --git a/doc/dp_telemetry_maat_job.json b/doc/dp_telemetry_maat_job.json deleted file mode 100644 index 8abe0b8..0000000 --- a/doc/dp_telemetry_maat_job.json +++ /dev/null @@ -1,9 +0,0 @@ -{ - "job_id": "499a49aa-b4b9-4e8e-a66c-80d4cd40257b", - "job_name": "test_2", - "bpf_expr": "ether host 00:15:5d:b8:10:a6", - "pkt_cnt_max": 100, - "timeout": 120, - "sampling": 2, - "snaplen": 100 -}
\ No newline at end of file diff --git a/include/common.h b/include/common.h index da3884f..12c71a0 100644 --- a/include/common.h +++ b/include/common.h @@ -43,8 +43,6 @@ extern unsigned int zlog_env_is_init; } while (0) #define DP_TRACE_NO_ROLE 0 -#define DP_TRACE_ROLE 1 -#define DP_TELEMETRY_ROLE 2 extern struct mr_instance * mr_instance; diff --git a/include/job_ctx.h b/include/job_ctx.h index aed399c..57a79e8 100644 --- a/include/job_ctx.h +++ b/include/job_ctx.h @@ -8,7 +8,7 @@ struct dp_trace_telemetry_desc struct dp_trace_job_desc job_desc; }; -void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role); +void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc); int job_id_to_index(job_bitmap_t job_id); job_bitmap_t index_to_job_id(unsigned int index); int is_job_id_used(job_bitmap_t job_id); diff --git a/include/monit.h b/include/monit.h index 94abc02..d833832 100644 --- a/include/monit.h +++ b/include/monit.h @@ -1,6 +1,24 @@ #pragma once #include "common.h" -#include "trace_output.h" + +struct record_saving_stat +{ + uint64_t recv_success; + uint64_t init_old_packet_drop; + + uint64_t save_failed_at_job_deleted; + uint64_t save_failed_at_mutex_lock; + + uint64_t save_to_kafka_failed_at_decode_messagepack; + uint64_t save_to_kafka_failed_at_send; + uint64_t save_to_kafka_success; + + uint64_t save_to_file_failed_at_decode_to_str; + uint64_t save_to_file_failed_at_pcapng_open; + uint64_t save_to_file_failed_at_pcapng_format; + uint64_t save_to_file_failed_at_write_to_disk; + uint64_t save_to_file_success; +}; struct monit { diff --git a/include/trace_output.h b/include/trace_output.h index b1fbc9b..bf454a2 100644 --- a/include/trace_output.h +++ b/include/trace_output.h @@ -1,23 +1,8 @@ #pragma once #include "common.h" - -struct record_saving_stat -{ - uint64_t recv_success; - uint64_t save_failed_at_job_deleted; - uint64_t save_failed_at_mutex_lock; - - uint64_t save_to_kafka_failed_at_decode_messagepack; - uint64_t save_to_kafka_failed_at_send; - uint64_t save_to_kafka_success; - - uint64_t save_to_file_failed_at_decode_to_str; - uint64_t save_to_file_failed_at_pcapng_open; - uint64_t save_to_file_failed_at_pcapng_format; - uint64_t save_to_file_failed_at_write_to_disk; - uint64_t save_to_file_success; -}; +#include "monit.h" void dp_trace_output_init(); void * dp_trace_process_thread(void * arg); -void dp_trace_pcapng_merger(job_bitmap_t job_id);
\ No newline at end of file +void dp_trace_pcapng_merger(job_bitmap_t job_id); +void dp_trace_ring_clear();
\ No newline at end of file diff --git a/src/config.c b/src/config.c index 88600fc..c556ec5 100644 --- a/src/config.c +++ b/src/config.c @@ -191,7 +191,7 @@ void dynamic_config_load() void dynamic_config_load_and_apply() { dynamic_config_load(); - job_rule_apply(g_conf->desc, g_conf->nr_desc, DP_TRACE_ROLE); + job_rule_apply(g_conf->desc, g_conf->nr_desc); } //////////////////////// helper function ///////////////////////// diff --git a/src/job_ctx.c b/src/job_ctx.c index 75175de..57ea593 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -17,12 +17,15 @@ static struct dp_trace_job_occupy dp_trace_job_occupy[DP_TRACE_JOB_NUM_MAX] = {} static struct dp_trace_telemetry_desc telemetry_descs[DP_TRACE_JOB_NUM_MAX] = {}; static int telemetry_unused_job_index_get(); +static void job_desc_dump(struct dp_trace_job_desc * desc); -void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8_t role) +void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc) { int ret = 0; for (unsigned int i = 0; i < nr_desc; i++) { + job_desc_dump(&desc[i]); + uint8_t rule_index = desc[i].rule_index; job_bitmap_t cur_job_id = index_to_job_id(rule_index); uint8_t cur_job_uesd = is_job_id_used(cur_job_id); @@ -38,7 +41,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8 else { dp_trace_job_occupy[rule_index].uesd = 1; - dp_trace_job_occupy[rule_index].role = role; + dp_trace_job_occupy[rule_index].role = desc[i].measurement_type; dzlog_info("add rule %u successfully. bpf_expr is: %s", rule_index, desc[i].bpf_expr); } continue; @@ -58,7 +61,7 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc, uint8 dp_trace_job_occupy[rule_index].uesd = 0; dp_trace_job_occupy[rule_index].role = DP_TRACE_NO_ROLE; - if (tmp_role == DP_TRACE_ROLE) + if (tmp_role == DP_TRACE_MEASUREMENT_TYPE_TRACE) { dp_trace_pcapng_merger(cur_job_id); } @@ -128,7 +131,7 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc)); - job_rule_apply(&telemetry_descs[index].job_desc, 1, DP_TELEMETRY_ROLE); + job_rule_apply(&telemetry_descs[index].job_desc, 1); *ad = &telemetry_descs[index]; } @@ -138,7 +141,7 @@ void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp) struct dp_trace_telemetry_desc * telemetry_desc = *ad; struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc; job_desc->enable = false; - job_rule_apply(job_desc, 1, DP_TELEMETRY_ROLE); + job_rule_apply(job_desc, 1); return; } @@ -193,4 +196,16 @@ static int telemetry_unused_job_index_get() } return ret; +} + +static void job_desc_dump(struct dp_trace_job_desc * desc) +{ + dzlog_info("dp trace job desc dump"); + dzlog_info("enable:%u", desc->enable); + dzlog_info("measurement_type:%u", desc->measurement_type); + dzlog_info("rule_index:%u", desc->rule_index); + dzlog_info("bpf_expr:%s", desc->bpf_expr); + dzlog_info("pkt_cnt_max:%u", desc->pkt_cnt_max); + dzlog_info("sampling:%u", desc->sampling); + dzlog_info("snaplen:%u", desc->snaplen); }
\ No newline at end of file @@ -125,8 +125,6 @@ int main(int argc, char * argv[]) http_serv_init(); - dp_trace_output_init(); - mr_instance = marsio_create(); cpu_set_t cpu_set_io = conf->cpu_set_io; @@ -136,19 +134,22 @@ int main(int argc, char * argv[]) ret = marsio_init(mr_instance, appsym); DP_TRACE_VERIFY(ret >= 0, "marsio init failed."); + unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); + monit_init(nr_thread); + + dp_trace_ring_clear(); + dp_trace_output_init(); + dynamic_config_load_and_apply(); dp_trace_maat_init(); signal_event_init(); - unsigned int nr_thread = CPU_COUNT(&conf->cpu_set_io); unsigned int ring_num = DP_TRACE_RING_NUM; dzlog_info("thread count = %u", nr_thread); dzlog_info("ring num = %u", ring_num); - monit_init(nr_thread); - pthread_t tmp_pid[nr_thread]; for (int i = 0; i < nr_thread; i++) diff --git a/src/monit.c b/src/monit.c index f71361d..5441c5b 100644 --- a/src/monit.c +++ b/src/monit.c @@ -33,6 +33,8 @@ static void monit_dump() for (unsigned int i = 0; i < monit->nr_stat; i++) { total_stat.recv_success += monit->savint_stats[i].recv_success; + total_stat.init_old_packet_drop += monit->savint_stats[i].init_old_packet_drop; + total_stat.save_failed_at_job_deleted += monit->savint_stats[i].save_failed_at_job_deleted; total_stat.save_failed_at_mutex_lock += monit->savint_stats[i].save_failed_at_mutex_lock; @@ -50,6 +52,8 @@ static void monit_dump() struct cJSON * json_root = cJSON_CreateObject(); cJSON_AddNumberToObject(json_root, "recv_success", total_stat.recv_success); + cJSON_AddNumberToObject(json_root, "init_old_packet_drop", total_stat.init_old_packet_drop); + cJSON_AddNumberToObject(json_root, "save_failed_at_job_deleted", total_stat.save_failed_at_job_deleted); cJSON_AddNumberToObject(json_root, "save_failed_at_mutex_lock", total_stat.save_failed_at_mutex_lock); diff --git a/src/trace_output.c b/src/trace_output.c index 98e57cd..73bbc75 100644 --- a/src/trace_output.c +++ b/src/trace_output.c @@ -156,7 +156,7 @@ void * dp_trace_process_thread(void * arg) } uint8_t role = job_id_role_get(index_to_job_id(i)); - if (role == DP_TRACE_ROLE) + if (role == DP_TRACE_MEASUREMENT_TYPE_TRACE) { for (unsigned int j = 0; j < nr_mbufs; j++) { @@ -165,7 +165,7 @@ void * dp_trace_process_thread(void * arg) job_bitmap_t job_id = index_to_job_id(i); cli_job_mbufs_write_process(tx_buff, nr_mbufs, job_id); } - else if (role == DP_TELEMETRY_ROLE) + else if (role == DP_TRACE_MEASUREMENT_TYPE_TELEMETRY) { for (unsigned int j = 0; j < nr_mbufs; j++) { @@ -750,4 +750,25 @@ static void thread_id_to_ring_id_calculate(unsigned int nr_thread, unsigned int qids[i] = (++prefix_index); (*nr_qids)++; } +} + +void dp_trace_ring_clear() +{ + unsigned int ring_clear_cnt = 0; + unsigned int nr_ring = DP_TRACE_RING_NUM; + marsio_buff_t * rx_buff[1024]; + + saving_stat = record_saving_stat_point_get(0); + + for (unsigned int i = 0; i < nr_ring; i++) + { + unsigned int nr_recv = marsio_dp_trace_mbuf_recv_burst(mr_instance, i, rx_buff, TELEMETRY_DIM(rx_buff)); + saving_stat->recv_success += nr_recv; + saving_stat->init_old_packet_drop += nr_recv; + + marsio_dp_trace_mbuf_free(mr_instance, rx_buff, nr_recv); + ring_clear_cnt += nr_recv; + } + + dzlog_info("The program starts and clears %u mbufs", ring_clear_cnt); }
\ No newline at end of file |
