diff options
| author | 童宗振 <[email protected]> | 2024-10-17 09:12:49 +0000 |
|---|---|---|
| committer | 童宗振 <[email protected]> | 2024-10-17 09:12:49 +0000 |
| commit | 358be31c7e4e4561e121729d9a6c604731a7063a (patch) | |
| tree | 1a5afdf48ab30242febc0502ec13f4d2bdfa7ea9 | |
| parent | c48d240bc4ec7c8aad7348c9b996a18b7565cd92 (diff) | |
| parent | fc85421d8628f3297e636c23f8b1710716188d08 (diff) | |
Merge branch 'adapt_maat_json' into 'dev-0.3'
(TSG-22350)Telemetry is adapted to MAAT, and the distribution format of Policy is changed...
See merge request tsg/dp_telemetry_app!48
| -rw-r--r-- | README.md | 5 | ||||
| -rw-r--r-- | doc/ToolUtil.java | 172 | ||||
| -rw-r--r-- | etc/dp_telemetry_rules.json | 36 | ||||
| -rw-r--r-- | etc/table_schema.json | 3 | ||||
| -rw-r--r-- | include/common.h | 25 | ||||
| -rw-r--r-- | include/job_ctx.h | 6 | ||||
| -rw-r--r-- | src/job_ctx.c | 7 | ||||
| -rw-r--r-- | src/maat.c | 135 |
8 files changed, 152 insertions, 237 deletions
@@ -1,10 +1,7 @@ ## 文档 -1. [Datapath Trace and Telemetry方案设计](https://docs.geedge.net/pages/viewpage.action?pageId=124754302) -2. [Datapath Telemetry Job -- maat字段](https://docs.geedge.net/display/TSG/Datapath+Telemetry+Job) -3. [Datapath Telemetry Record -- kafka发送内容](https://docs.geedge.net/pages/viewpage.action?pageId=129088240) -4. [telemetry入门指北](https://docs.geedge.net/pages/viewpage.action?pageId=129100805) +1. [telemetry入门](https://docs.geedge.net/pages/viewpage.action?pageId=129100805) ## 编译 diff --git a/doc/ToolUtil.java b/doc/ToolUtil.java deleted file mode 100644 index edc3987..0000000 --- a/doc/ToolUtil.java +++ /dev/null @@ -1,172 +0,0 @@ -/**
- * Copyright (c) 2015-2016, Chill Zhuang 庄骞 ([email protected]).
- * <p>
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package cn.nis.ntc.utils;
-
-
-import cn.hutool.core.collection.CollectionUtil;
-import cn.hutool.core.util.ObjectUtil;
-import cn.hutool.core.util.StrUtil;
-import cn.hutool.log.Log;
-import cn.nis.ntc.utils.bean.LocationPage;
-import cn.nis.ntc.utils.exception.NtcException;
-import com.baomidou.mybatisplus.annotation.TableField;
-import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
-import com.csvreader.CsvReader;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import jakarta.servlet.http.HttpServletRequest;
-import jakarta.validation.ValidationException;
-import org.apache.commons.beanutils.ConvertUtils;
-import org.eclipse.collections.impl.list.fixed.ArrayAdapter;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.lang.annotation.Annotation;
-import java.lang.reflect.Array;
-import java.lang.reflect.Field;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.Proxy;
-import java.math.BigDecimal;
-import java.net.URISyntaxException;
-import java.net.URLDecoder;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.security.SecureRandom;
-import java.util.*;
-import java.util.Map.Entry;
-import java.util.function.*;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-
-import static cn.nis.ntc.utils.DPs.log;
-
-
-/**
- * 高频方法集合类
- */
-public class ToolUtil {
-
- private static final Log logger = Log.get();
-
- private static final Pattern POINT_PATTERN = Pattern.compile("(\\s)(\\.)(\\s)|(\\.)(\\s)|(\\s)(\\.)");
-
- private static final Pattern POINT_PATTERN_DEEP = Pattern.compile("(\\.)");
-
- private static final Pattern humpPattern = Pattern.compile("[A-Z]");
-
- /**
- * 入库字段\处理查询入库
- *
- * @param cfgKeywords
- * @return
- */
- public static String strGetEscape(String cfgKeywords) {
- if (StringUtils.isNotEmpty(cfgKeywords)) {
- cfgKeywords = cfgKeywords.trim();// 首先去掉空白符号
- // 不能输入不可见字符
- if (containsInvisibleChar(cfgKeywords)) {
- throw new NtcException(Code.InvisibleChar.getCode(), Code.InvisibleChar.getDesc() + ": " + cfgKeywords);
- }
- // \\ 需要第一个替换
-// String[] fbsArr = { "\\","$","(",")","*","+",".","?","^","|","'","%","&"};
- cfgKeywords = cfgKeywords.replace("\\", "\\\\")
- .replace("&", "\\&")
- .replace(" ", "\\b");
- }
- return cfgKeywords;
- }
-
- /**
- * 反转义特殊字符 出库
- *
- * @param cfgKeywords
- * @return
- */
- public static String strUnEscape(String cfgKeywords, Boolean... flag) {
- if (StringUtils.isNotEmpty(cfgKeywords)) {
- // 不转译特殊字符
- cfgKeywords = cfgKeywords.trim();// 首先去掉首尾空格
- if (flag.length > 0 && flag[0]) {
- /**因为入库前空格要转换成了\b;但如果输入的是\b而不是空格,入库后就是\\b,这时在反转时不需要处理,
- * 在查询的时候要将\b转反换回空格,需要对\\b提前处理,否则会误转
- * * split本身对\需要转义,最终\\b写成了8个\
- */
- String[] cfgKeywordAry = cfgKeywords.split("\\\\\\\\b", -1);
-// String tempKeywords = "";
- StringBuilder tempKeywords = new StringBuilder();
- for (String str : cfgKeywordAry) {
- str = str.replace("\\b", " ");
-// tempKeywords+=str+"\\\\b";
- tempKeywords.append(str);
- tempKeywords.append("\\\\b");
- }
- if (StringUtils.endsWith(tempKeywords.toString(), "\\\\b")) {
- cfgKeywords = tempKeywords.substring(0, tempKeywords.lastIndexOf("\\\\b"));
- }
- cfgKeywords = cfgKeywords.indexOf("\\\\b") == -1 ? cfgKeywords.replace("\\b", " ") : cfgKeywords;
- } else {
- String[] tempStr = cfgKeywords.split("\\\\\\\\b", -1);
-// String tempKwd = "";
- StringBuilder tempKwd = new StringBuilder();
- for (String str : tempStr) {
- str = str.replace("\\b", "\b");
-// tempKwd += str+"\\\\b";
- tempKwd.append(str);
- tempKwd.append("\\\\b");
- }
- if (StringUtils.endsWith(tempKwd.toString(), "\\\\b")) {
- cfgKeywords = tempKwd.substring(0, tempKwd.lastIndexOf("\\\\b"));
- }
- char[] chars = cfgKeywords.toCharArray();
- StringBuilder sb = new StringBuilder();
- for (char aChar : chars) {
- if (aChar == '\b') {
- sb.append(" ");
- } else {
- sb.append(aChar);
- }
- }
- cfgKeywords = sb.toString();
-
- }
- cfgKeywords = cfgKeywords.replace("\\\\", "\\");
- cfgKeywords = cfgKeywords.replace("\\&", "&");
- }
- return cfgKeywords;
- }
-
- /**
- * 字符串是否包含不可见字符
- * 包含:true
- * 不包含:false
- *
- * @param content
- * @return Boolean
- */
- public static Boolean containsInvisibleChar(String content) {
- if (content != null && content.length() > 0) {
- char[] contentCharArr = content.toCharArray();
- for (int i = 0; i < contentCharArr.length; i++) {
- if ((contentCharArr[i] <= 0x1F) || contentCharArr[i] == 0x7F) {
- return true;
- }
- }
- return false;
- }
- return false;
- }
-}
diff --git a/etc/dp_telemetry_rules.json b/etc/dp_telemetry_rules.json index 51d1aee..aa024c8 100644 --- a/etc/dp_telemetry_rules.json +++ b/etc/dp_telemetry_rules.json @@ -3,7 +3,41 @@ { "table_name": "DATAPATH_TELEMETRY_JOB", "table_content": [ - "72694b1c-6833-4c46-acde-52e2d6409314\t100000\thost\\b192.168.64.11\t60\t1\t200\t0\t{\"tag_sets\":[[{\"value\":[\"group-xxg-tsgx-virtual1\",\"group-xxg-tsgx-virtual2\"],\"tag\":\"device_group\"}],[{\"value\":[\"center-nyj\",\"center-nyj-one\"],\"tag\":\"data_center\"}]]}\t[1000,10001]\t1" + { + "job_id": "72694b1c-6833-4c46-acde-52e2d6409314", + "max_packet": 100000, + "filter_rule": "host 192.168.64.11", + "timeout": 60, + "sampling": 1, + "snaplen": 200, + "with_packet_capture": 0, + "device_group": { + "tag_sets": [ + [ + { + "value": [ + "center-nyj", + "center-nyj-one" + ], + "tag": "data_center" + }, + { + "value": [ + "group-xxg-tsgx-virtual1", + "group-xxg-tsgx-virtual2" + ], + "tag": "device_group" + } + ] + ] + }, + "traffic_link_id": [ + 1000, + 1001 + ], + "is_valid": 1, + "modified_time": "1716531859000000" + } ] } ] diff --git a/etc/table_schema.json b/etc/table_schema.json index ed500c9..84d826e 100644 --- a/etc/table_schema.json +++ b/etc/table_schema.json @@ -3,9 +3,8 @@ "table_id": 1, "table_name": "DATAPATH_TELEMETRY_JOB", "table_type": "plugin", - "valid_column": 10, "custom": { - "key": 1, + "key_name": "job_id", "key_type": "pointer" } } diff --git a/include/common.h b/include/common.h index 6f75b8e..ec6ff45 100644 --- a/include/common.h +++ b/include/common.h @@ -125,6 +125,31 @@ static inline void bpf_str_unescape_for_cm(const char * input, char * output) i += 1; output[j++] = '&'; } + else if (input[i] == '\\' && input[i + 1] == '^') + { + i += 1; + output[j++] = '^'; + } + else if (input[i] == '\\' && input[i + 1] == '$') + { + i += 1; + output[j++] = '$'; + } + else if (input[i] == '\\' && input[i + 1] == '|') + { + i += 1; + output[j++] = '|'; + } + else if (input[i] == '\\' && input[i + 1] == '(') + { + i += 1; + output[j++] = '('; + } + else if (input[i] == '\\' && input[i + 1] == ')') + { + i += 1; + output[j++] = ')'; + } else { output[j++] = input[i]; diff --git a/include/job_ctx.h b/include/job_ctx.h index fbb1f95..95febb7 100644 --- a/include/job_ctx.h +++ b/include/job_ctx.h @@ -14,9 +14,9 @@ job_bitmap_t index_to_job_id(unsigned int index); int is_job_id_used(job_bitmap_t job_id); uint8_t job_id_role_get(job_bitmap_t job_id); -void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad, - long argl, void * argp); -void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp); +void telemetry_job_add_cb(const char * table_name, const char * key, const char * table_line, void ** ad, long argl, + void * argp); +void telemetry_job_del_cb(const char * table_name, void ** ad, long argl, void * argp); void telemetry_job_uuid_get(job_bitmap_t job_id, unsigned char * uu); void job_desc_dump(const struct dp_trace_job_desc * desc);
\ No newline at end of file diff --git a/src/job_ctx.c b/src/job_ctx.c index b54cf0e..45e1354 100644 --- a/src/job_ctx.c +++ b/src/job_ctx.c @@ -71,8 +71,9 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc) } } -void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad, - long argl, void * argp) +void telemetry_job_add_cb(const char * table_name, const char * key, const char * table_line, void ** ad, long argl, + void * argp) + { struct dp_trace_telemetry_desc telemetry_desc = {}; struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc; @@ -102,7 +103,7 @@ void telemetry_job_add_cb(const char * table_name, int table_id, const char * ke *ad = &telemetry_descs[index]; } -void telemetry_job_del_cb(int table_id, void ** ad, long argl, void * argp) +void telemetry_job_del_cb(const char * table_name, void ** ad, long argl, void * argp) { struct dp_trace_telemetry_desc * telemetry_desc = *ad; struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc; @@ -103,40 +103,83 @@ bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * t unsigned int timeout = 0; unsigned int sampling = 0; unsigned int snaplen = 0; - unsigned int with_packet_capture = 0; char device_group[4096] = ""; - char traffic_link_id[128] = ""; - unsigned int is_valid = 1; - cJSON * device_group_json = NULL; - cJSON * traffic_json = NULL; + cJSON * table_json = NULL; int ret = 0; - dzlog_info("telemetry add maat parse config rule:%s", table_line); - ret = sscanf(table_line, "%s\t%u\t%127s\t%u\t%u\t%u\t%u\t%4095s\t%127s\t%u", uuid, &pkt_cnt_max, bpf_expr, &timeout, - &sampling, &snaplen, &with_packet_capture, device_group, traffic_link_id, &is_valid); - if (ret != 10) + dzlog_info("telemetry parse config rule:%s", table_line); + + table_json = cJSON_Parse(table_line); + if (table_json == NULL) { - dzlog_warn("maat parse config failed. Not enough fields:%s", table_line); + dzlog_error("Parse to json failed. table_json fields are missing."); + valid_rule = false; goto end; } - bool device_group_or_date_center_match = false; - if (strcasecmp(device_group, "{}") == 0) + cJSON * uuid_item = cJSON_GetObjectItem(table_json, "job_id"); + if (uuid_item == NULL) { - dzlog_info("When device_group is empty, all devices are matched"); - device_group_or_date_center_match = true; - goto device_group_or_date_center_match_end; + dzlog_error("Parse to json failed. job_id fields are missing."); + valid_rule = false; + goto end; + } + snprintf(uuid, sizeof(uuid), "%s", uuid_item->valuestring); + + cJSON * max_packet_item = cJSON_GetObjectItem(table_json, "max_packet"); + if (max_packet_item == NULL) + { + dzlog_error("Parse to json failed. max_packet fields are missing."); + valid_rule = false; + goto end; + } + pkt_cnt_max = max_packet_item->valueint; + + cJSON * filter_rule_item = cJSON_GetObjectItem(table_json, "filter_rule"); + if (filter_rule_item == NULL) + { + dzlog_error("Parse to json failed. filter_rule fields are missing."); + valid_rule = false; + goto end; + } + snprintf(bpf_expr, sizeof(bpf_expr), "%s", filter_rule_item->valuestring); + + cJSON * sampling_item = cJSON_GetObjectItem(table_json, "sampling"); + if (sampling_item == NULL) + { + dzlog_error("Parse to json failed. sampling fields are missing."); + valid_rule = false; + goto end; + } + sampling = sampling_item->valueint; + + cJSON * snaplen_item = cJSON_GetObjectItem(table_json, "snaplen"); + if (snaplen_item == NULL) + { + dzlog_error("Parse to json failed. snaplen fields are missing."); + valid_rule = false; + goto end; + } + snaplen = snaplen_item->valueint; + + cJSON * device_group_item = cJSON_GetObjectItem(table_json, "device_group"); + if (device_group_item == NULL) + { + dzlog_error("Parse to json failed. device_group fields are missing."); + valid_rule = false; + goto end; } - device_group_json = cJSON_Parse(device_group); - if (device_group_json == NULL) + bool device_group_or_date_center_match = false; + if (cJSON_GetArraySize(device_group_item) == 0) { - dzlog_error("parse to json failed:%s", device_group); + dzlog_info("When device_group is empty, all devices are matched"); + device_group_or_date_center_match = true; goto device_group_or_date_center_match_end; } - cJSON * tag_sets_item = cJSON_GetObjectItem(device_group_json, "tag_sets"); + cJSON * tag_sets_item = cJSON_GetObjectItem(device_group_item, "tag_sets"); if (!cJSON_IsArray(tag_sets_item)) { dzlog_warn("tag_sets value is not array"); @@ -146,27 +189,28 @@ bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * t int tag_sets_array_size = cJSON_GetArraySize(tag_sets_item); for (int i = 0; i < tag_sets_array_size; i++) { - cJSON * tag_sets_item_i = cJSON_GetArrayItem(tag_sets_item, i); - if (!cJSON_IsArray(tag_sets_item_i)) + cJSON * tag_set_item = cJSON_GetArrayItem(tag_sets_item, i); + if (!cJSON_IsArray(tag_set_item)) { dzlog_warn("tag_sets value %d is not array", i); goto device_group_or_date_center_match_end; } - int tag_sets_item_i_array_size = cJSON_GetArraySize(tag_sets_item_i); - for (int j = 0; j < tag_sets_item_i_array_size; j++) + int tag_set_item_array_size = cJSON_GetArraySize(tag_set_item); + for (int j = 0; j < tag_set_item_array_size; j++) { - cJSON * inner_item = cJSON_GetArrayItem(tag_sets_item_i, j); + cJSON * tag_set_j_item = cJSON_GetArrayItem(tag_set_item, j); - cJSON * tag_item = cJSON_GetObjectItem(inner_item, "tag"); - const char * tag_key_str = cJSON_GetStringValue(tag_item); - const char * tag_val_str = NULL; - if (tag_key_str == NULL) + cJSON * tag_item = cJSON_GetObjectItem(tag_set_j_item, "tag"); + if (tag_item == NULL) { dzlog_warn("tag_sets %d no 'tag'", i); continue; } - else if (strcasecmp(tag_key_str, "device_group") == 0) + + const char * tag_key_str = cJSON_GetStringValue(tag_item); + const char * tag_val_str = NULL; + if (strcasecmp(tag_key_str, "device_group") == 0) { tag_val_str = conf->device_group; } @@ -175,17 +219,17 @@ bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * t tag_val_str = conf->data_center; } - cJSON * inner_item_value = cJSON_GetObjectItem(inner_item, "value"); - if (!cJSON_IsArray(inner_item_value)) + cJSON * valuer_item = cJSON_GetObjectItem(tag_set_j_item, "value"); + if (!cJSON_IsArray(valuer_item)) { dzlog_warn("tag_sets %d-%d 'value' is not array", i, j); continue; } - int value_array_size = cJSON_GetArraySize(inner_item_value); + int value_array_size = cJSON_GetArraySize(valuer_item); for (int k = 0; k < value_array_size; k++) { - cJSON * value_item_k = cJSON_GetArrayItem(inner_item_value, k); + cJSON * value_item_k = cJSON_GetArrayItem(valuer_item, k); if (cJSON_IsString(value_item_k)) { if (strcasecmp(tag_val_str, value_item_k->valuestring) == 0) @@ -206,12 +250,6 @@ device_group_or_date_center_match_end: goto end; } - if (is_valid == 0) - { - dzlog_warn("rule is not valid:%s", table_line); - goto end; - } - ret = uuid_parse(uuid, telemetry_desc->uuid); if (ret != 0) { @@ -221,20 +259,20 @@ device_group_or_date_center_match_end: bpf_str_unescape_for_cm(bpf_expr, job_desc->bpf_expr); - traffic_json = cJSON_Parse(traffic_link_id); - if (traffic_json == NULL || !cJSON_IsArray(traffic_json)) + cJSON * traffic_link_id_item = cJSON_GetObjectItem(table_json, "traffic_link_id"); + if (traffic_link_id_item == NULL || !cJSON_IsArray(traffic_link_id_item)) { - dzlog_error("parse traffic link id string to json failed. the string is:%s", traffic_link_id); + dzlog_error("parse traffic_link_id field failed."); goto end; } - int traffic_obj_array_size = cJSON_GetArraySize(traffic_json); + int traffic_obj_array_size = cJSON_GetArraySize(traffic_link_id_item); int id_num_max = (traffic_obj_array_size < DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX) ? traffic_obj_array_size : DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX; for (int i = 0; i < id_num_max; i++) { - cJSON * traffic_obj_item_i = cJSON_GetArrayItem(traffic_json, i); + cJSON * traffic_obj_item_i = cJSON_GetArrayItem(traffic_link_id_item, i); if (traffic_obj_item_i != NULL && cJSON_IsNumber(traffic_obj_item_i)) { uint16_t value = (uint16_t)traffic_obj_item_i->valueint; @@ -250,13 +288,6 @@ device_group_or_date_center_match_end: valid_rule = true; end: - if (device_group_json) - { - cJSON_Delete(device_group_json); - } - if (traffic_json) - { - cJSON_Delete(traffic_json); - } + cJSON_Delete(table_json); return valid_rule; }
\ No newline at end of file |
