diff options
| author | 童宗振 <[email protected]> | 2024-05-08 14:45:52 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-05-08 14:45:52 +0000 |
| commit | 93c60b86385ea8c1cb7c3f33a82978cee802bf65 (patch) | |
| tree | 8858894d7d4f1c14e5e0f3d755e84549e14f4ea2 | |
| parent | 5389605c85d95ebe2fbfdebcb8a9f9f097eafa84 (diff) | |
| parent | 118dbbfcfda78fbed2ca13dccec2f8fc701df9a1 (diff) | |
Merge branch 'add_traffic_link_id' into 'master'v0.1.7-20240508
(TSG-21101)dp_trace support device_group and traffic_link_id
See merge request tsg/dp_telemetry_app!28
| -rw-r--r-- | etc/dp_telemetry_rules.json | 2 | ||||
| -rw-r--r-- | etc/dp_trace.conf | 2 | ||||
| -rw-r--r-- | include/common.h | 2 | ||||
| -rw-r--r-- | include/maat.h | 4 | ||||
| -rw-r--r-- | src/config.c | 2 | ||||
| -rw-r--r-- | src/job_ctx.c | 92 | ||||
| -rw-r--r-- | src/maat.c | 147 | ||||
| -rw-r--r-- | test/CMakeLists.txt | 1 |
8 files changed, 205 insertions, 47 deletions
diff --git a/etc/dp_telemetry_rules.json b/etc/dp_telemetry_rules.json index 8a67a73..7a3b115 100644 --- a/etc/dp_telemetry_rules.json +++ b/etc/dp_telemetry_rules.json @@ -3,7 +3,7 @@ { "table_name": "DATAPATH_TELEMETRY_JOB", "table_content": [ - "72694b1c-6833-4c46-acde-52e2d6409314\t100\tether\\bhost\\b00:15:5d:b8:10:a6\t60\t2\t200\t0\t{}\t[]\t1" + "72694b1c-6833-4c46-acde-52e2d6409314\t100000\thost\\b192.168.64.11\t60\t1\t200\t0\t{\"tag_sets\":[[{\"value\":[\"device-abc\",\"device-xxx\"],\"tag\":\"data_center\"}]]}\t[1000,10001]\t1" ] } ] diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf index b44ea96..cb79fdd 100644 --- a/etc/dp_trace.conf +++ b/etc/dp_trace.conf @@ -2,7 +2,7 @@ iocore=4,5,6,7 zlog_config_path=../etc/dp_trace_zlog.conf dp_trace_dir=./ -device_group= +device_group="device-xxx" monit_file_path=/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving [http_server] diff --git a/include/common.h b/include/common.h index 12c71a0..613d337 100644 --- a/include/common.h +++ b/include/common.h @@ -11,7 +11,7 @@ #include <unistd.h> #ifndef MR_SYMBOL_MAX -#define MR_SYMBOL_MAX 64 +#define MR_SYMBOL_MAX 128 #endif #ifndef likely diff --git a/include/maat.h b/include/maat.h index ca9ea7d..2d81cf8 100644 --- a/include/maat.h +++ b/include/maat.h @@ -1,6 +1,8 @@ #pragma once +#include "job_ctx.h" #define MAAT_INPUT_JSON 0 #define MAAT_INPUT_REDIS 1 -void dp_trace_maat_init();
\ No newline at end of file +void dp_trace_maat_init(); +bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * telemetry_desc);
\ No newline at end of file diff --git a/src/config.c b/src/config.c index c556ec5..376d266 100644 --- a/src/config.c +++ b/src/config.c @@ -77,7 +77,7 @@ void config_load() "/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving"); MESA_load_profile_string_def(config_path, "global", "device_group", g_conf->device_group, - sizeof(g_conf->device_group), "unknow"); + sizeof(g_conf->device_group), ""); MESA_load_profile_string_def(config_path, "http_server", "listen_addr", g_conf->str_listen_addr, sizeof(g_conf->str_listen_addr), "127.0.0.1"); diff --git a/src/job_ctx.c b/src/job_ctx.c index 57ea593..778dc29 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -1,5 +1,7 @@ #include "job_ctx.h" #include "common.h" +#include "config.h" +#include "maat.h" #include "trace_output.h" #include <MESA/maat.h> @@ -75,60 +77,25 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc) void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad, long argl, void * argp) { - int ret = 0; struct dp_trace_telemetry_desc telemetry_desc = {}; struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc; - char uuid[40] = {}; - unsigned int pkt_cnt_max = 0; - char bpf_expr[MR_BPF_EXPRESSION_MAX]; - unsigned int timeout = 0; - unsigned int sampling = 0; - unsigned int snaplen = 0; - unsigned int with_packet_capture = 0; - char device_group[512]; - char traffic_link_id[1024]; - unsigned int is_valid = 0; - - dzlog_debug("telemetry add maat parse config rule:%s", table_line); - ret = sscanf(table_line, "%s\t%u\t%s\t%u\t%u\t%u\t%u\t%512s\t%1024s\t%u", uuid, &pkt_cnt_max, bpf_expr, &timeout, - &sampling, &snaplen, &with_packet_capture, device_group, traffic_link_id, &is_valid); - if (ret != 10) - { - dzlog_warn("maat parse config failed. Not enough fields:%s", table_line); - // return; - } - - if (is_valid == 0) + bool valid_rule = maat_rule_parse(table_line, &telemetry_desc); + if (valid_rule == false) { - dzlog_warn("rule is not valid:%s", table_line); - // return; - } - - ret = uuid_parse(uuid, telemetry_desc.uuid); - if (ret != 0) - { - dzlog_error("uuid parsing failed: %s", uuid); + dzlog_warn("parse maat rule failed."); return; } - backspace_remove(bpf_expr, job_desc->bpf_expr); - - job_desc->pkt_cnt_max = pkt_cnt_max; - job_desc->sampling = sampling; - job_desc->snaplen = snaplen; - job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY; - int index = telemetry_unused_job_index_get(); if (index < 0) { - dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr); + dzlog_warn("no enough job for current rule:%s", table_line); return; } job_desc->rule_index = index; job_desc->enable = true; - memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc)); job_rule_apply(&telemetry_descs[index].job_desc, 1); @@ -201,11 +168,54 @@ static int telemetry_unused_job_index_get() static void job_desc_dump(struct dp_trace_job_desc * desc) { dzlog_info("dp trace job desc dump"); + dzlog_info("rule_index:%u", desc->rule_index); dzlog_info("enable:%u", desc->enable); dzlog_info("measurement_type:%u", desc->measurement_type); - dzlog_info("rule_index:%u", desc->rule_index); dzlog_info("bpf_expr:%s", desc->bpf_expr); dzlog_info("pkt_cnt_max:%u", desc->pkt_cnt_max); dzlog_info("sampling:%u", desc->sampling); dzlog_info("snaplen:%u", desc->snaplen); -}
\ No newline at end of file + + dzlog_info("traffic_link_id_cnt:%u", desc->traffic_link_id_cnt); + char traffic_link_id_str[512]; + int len = snprintf(traffic_link_id_str, sizeof(traffic_link_id_str), "%s", "["); + for (unsigned int i = 0; i < desc->traffic_link_id_cnt; i++) + { + char * normal_format = "%u,"; + char * last_format = "%u"; + + char * fromat = normal_format; + if (i == desc->traffic_link_id_cnt - 1) + { + fromat = last_format; + } + len += + snprintf(traffic_link_id_str + len, sizeof(traffic_link_id_str) - len, fromat, desc->traffic_link_ids[i]); + } + len += snprintf(traffic_link_id_str + len, sizeof(traffic_link_id_str) - len, "%s", "]"); + dzlog_info("traffic_link_ids:%s", traffic_link_id_str); +} + +#if 0 +int traffic_link_id_extract(char * traffic_str, uint16_t traffic_link_ids[], unsigned int size) +{ + unsigned int traffic_link_id_cnt = 0; + const char * delim = "[], "; + char * token = strtok(traffic_str, delim); + + for (unsigned int i = 0; i < size; i++) + { + if (token != NULL) + { + char * endptr; + uint16_t id = (uint16_t)strtol(token, &endptr, 10); + if (endptr != token) + { + traffic_link_ids[traffic_link_id_cnt++] = id; + } + } + token = strtok(NULL, delim); + } + return traffic_link_id_cnt; +} +#endif
\ No newline at end of file @@ -1,8 +1,8 @@ #include "maat.h" #include "config.h" -#include "job_ctx.h" #include <MESA/maat.h> +#include <cjson/cJSON.h> static struct maat * dp_trace_maat_instance_create() { @@ -88,4 +88,149 @@ void dp_trace_maat_init() telemetry_job_del_cb, NULL, 0, NULL); DP_TRACE_VERIFY(ret == 0, "failed at register callback of DATAPATH_TELEMETRY_JOB."); dzlog_info("data path trace maat init end"); +} + +bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * telemetry_desc) +{ + struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc; + bool valid_rule = false; + + const struct config * conf = global_config_get(); + + char uuid[40] = {}; + unsigned int pkt_cnt_max = 0; + char bpf_expr[MR_BPF_EXPRESSION_MAX] = ""; + unsigned int timeout = 0; + unsigned int sampling = 0; + unsigned int snaplen = 0; + unsigned int with_packet_capture = 0; + char device_group[1024] = ""; + char traffic_link_id[128] = ""; + unsigned int is_valid = 1; + + cJSON * device_group_json = NULL; + cJSON * traffic_json = NULL; + int ret = 0; + + dzlog_info("telemetry add maat parse config rule:%s", table_line); + ret = sscanf(table_line, "%s\t%u\t%127s\t%u\t%u\t%u\t%u\t%1023s\t%127s\t%u", uuid, &pkt_cnt_max, bpf_expr, &timeout, + &sampling, &snaplen, &with_packet_capture, device_group, traffic_link_id, &is_valid); + if (ret != 10) + { + dzlog_warn("maat parse config failed. Not enough fields:%s", table_line); + goto end; + } + + bool device_group_match = false; + device_group_json = cJSON_Parse(device_group); + if (device_group_json == NULL) + { + dzlog_error("parse to json failed:%s", device_group); + goto end; + } + + cJSON * tag_sets_item = cJSON_GetObjectItem(device_group_json, "tag_sets"); + if (!cJSON_IsArray(tag_sets_item)) + { + dzlog_warn("tag_sets value is not array"); + goto device_group_match_end; + } + + int tag_sets_array_size = cJSON_GetArraySize(tag_sets_item); + for (int i = 0; i < tag_sets_array_size; i++) + { + cJSON * tag_sets_item_i = cJSON_GetArrayItem(tag_sets_item, i); + if (!cJSON_IsArray(tag_sets_item_i)) + { + dzlog_warn("tag_sets value %d is not array", i); + goto device_group_match_end; + } + + int tag_sets_item_i_array_size = cJSON_GetArraySize(tag_sets_item_i); + for (int j = 0; j < tag_sets_item_i_array_size; j++) + { + cJSON * inner_item = cJSON_GetArrayItem(tag_sets_item_i, j); + cJSON * inner_item_value = cJSON_GetObjectItem(inner_item, "value"); + if (!cJSON_IsArray(inner_item_value)) + { + dzlog_warn("tag_sets %d-%d 'value' is not array", i, j); + goto device_group_match_end; + } + + int value_array_size = cJSON_GetArraySize(inner_item_value); + for (int k = 0; k < value_array_size; k++) + { + cJSON * value_item_k = cJSON_GetArrayItem(inner_item_value, k); + if (cJSON_IsString(value_item_k)) + { + if (strcasecmp(conf->device_group, value_item_k->valuestring) == 0) + { + device_group_match = true; + goto device_group_match_end; + } + } + } + } + } + +device_group_match_end: + if (device_group_match == false) + { + dzlog_info("This rule does not apply to the current device(%s)", conf->device_group); + goto end; + } + + if (is_valid == 0) + { + dzlog_warn("rule is not valid:%s", table_line); + goto end; + } + + ret = uuid_parse(uuid, telemetry_desc->uuid); + if (ret != 0) + { + dzlog_error("uuid parsing failed: %s", uuid); + goto end; + } + + backspace_remove(bpf_expr, job_desc->bpf_expr); + + traffic_json = cJSON_Parse(traffic_link_id); + if (traffic_json == NULL || !cJSON_IsArray(traffic_json)) + { + dzlog_error("parse traffic link id string to json failed. the string is:%s", traffic_link_id); + goto end; + } + + int traffic_obj_array_size = cJSON_GetArraySize(traffic_json); + int id_num_max = (traffic_obj_array_size < DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX) + ? traffic_obj_array_size + : DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX; + for (int i = 0; i < id_num_max; i++) + { + cJSON * traffic_obj_item_i = cJSON_GetArrayItem(traffic_json, i); + if (traffic_obj_item_i != NULL && cJSON_IsNumber(traffic_obj_item_i)) + { + uint16_t value = (uint16_t)traffic_obj_item_i->valueint; + job_desc->traffic_link_ids[job_desc->traffic_link_id_cnt++] = value; + } + } + + job_desc->pkt_cnt_max = pkt_cnt_max; + job_desc->sampling = sampling; + job_desc->snaplen = snaplen; + job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY; + + valid_rule = true; + +end: + if (device_group_json) + { + cJSON_Delete(device_group_json); + } + if (traffic_json) + { + cJSON_Delete(traffic_json); + } + return valid_rule; }
\ No newline at end of file diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 3c175f2..2098f58 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -6,6 +6,7 @@ set(DP_TRACE_TELEMETRY_SOURCES ${CMAKE_SOURCE_DIR}/src/job_ctx.c ${CMAKE_SOURCE_DIR}/src/trace_output.c ${CMAKE_SOURCE_DIR}/src/kafka.c + ${CMAKE_SOURCE_DIR}/src/maat.c ${CMAKE_SOURCE_DIR}/src/mocking.c ${CMAKE_SOURCE_DIR}/src/monit.c ${CMAKE_SOURCE_DIR}/src/http_serv.c |
