summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author童宗振 <[email protected]>2024-05-08 14:45:52 +0000
committer童宗振 <[email protected]>2024-05-08 14:45:52 +0000
commit118dbbfcfda78fbed2ca13dccec2f8fc701df9a1 (patch)
tree8858894d7d4f1c14e5e0f3d755e84549e14f4ea2
parent5389605c85d95ebe2fbfdebcb8a9f9f097eafa84 (diff)
(TSG-21101)dp_trace support device_group and traffic_link_id
-rw-r--r--etc/dp_telemetry_rules.json2
-rw-r--r--etc/dp_trace.conf2
-rw-r--r--include/common.h2
-rw-r--r--include/maat.h4
-rw-r--r--src/config.c2
-rw-r--r--src/job_ctx.c92
-rw-r--r--src/maat.c147
-rw-r--r--test/CMakeLists.txt1
8 files changed, 205 insertions, 47 deletions
diff --git a/etc/dp_telemetry_rules.json b/etc/dp_telemetry_rules.json
index 8a67a73..7a3b115 100644
--- a/etc/dp_telemetry_rules.json
+++ b/etc/dp_telemetry_rules.json
@@ -3,7 +3,7 @@
{
"table_name": "DATAPATH_TELEMETRY_JOB",
"table_content": [
- "72694b1c-6833-4c46-acde-52e2d6409314\t100\tether\\bhost\\b00:15:5d:b8:10:a6\t60\t2\t200\t0\t{}\t[]\t1"
+ "72694b1c-6833-4c46-acde-52e2d6409314\t100000\thost\\b192.168.64.11\t60\t1\t200\t0\t{\"tag_sets\":[[{\"value\":[\"device-abc\",\"device-xxx\"],\"tag\":\"data_center\"}]]}\t[1000,10001]\t1"
]
}
]
diff --git a/etc/dp_trace.conf b/etc/dp_trace.conf
index b44ea96..cb79fdd 100644
--- a/etc/dp_trace.conf
+++ b/etc/dp_trace.conf
@@ -2,7 +2,7 @@
iocore=4,5,6,7
zlog_config_path=../etc/dp_trace_zlog.conf
dp_trace_dir=./
-device_group=
+device_group="device-xxx"
monit_file_path=/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving
[http_server]
diff --git a/include/common.h b/include/common.h
index 12c71a0..613d337 100644
--- a/include/common.h
+++ b/include/common.h
@@ -11,7 +11,7 @@
#include <unistd.h>
#ifndef MR_SYMBOL_MAX
-#define MR_SYMBOL_MAX 64
+#define MR_SYMBOL_MAX 128
#endif
#ifndef likely
diff --git a/include/maat.h b/include/maat.h
index ca9ea7d..2d81cf8 100644
--- a/include/maat.h
+++ b/include/maat.h
@@ -1,6 +1,8 @@
#pragma once
+#include "job_ctx.h"
#define MAAT_INPUT_JSON 0
#define MAAT_INPUT_REDIS 1
-void dp_trace_maat_init(); \ No newline at end of file
+void dp_trace_maat_init();
+bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * telemetry_desc); \ No newline at end of file
diff --git a/src/config.c b/src/config.c
index c556ec5..376d266 100644
--- a/src/config.c
+++ b/src/config.c
@@ -77,7 +77,7 @@ void config_load()
"/var/run/mrzcpd/mrmonit.app.dp_trace_telemetry.saving");
MESA_load_profile_string_def(config_path, "global", "device_group", g_conf->device_group,
- sizeof(g_conf->device_group), "unknow");
+ sizeof(g_conf->device_group), "");
MESA_load_profile_string_def(config_path, "http_server", "listen_addr", g_conf->str_listen_addr,
sizeof(g_conf->str_listen_addr), "127.0.0.1");
diff --git a/src/job_ctx.c b/src/job_ctx.c
index 57ea593..778dc29 100644
--- a/src/job_ctx.c
+++ b/src/job_ctx.c
@@ -1,5 +1,7 @@
#include "job_ctx.h"
#include "common.h"
+#include "config.h"
+#include "maat.h"
#include "trace_output.h"
#include <MESA/maat.h>
@@ -75,60 +77,25 @@ void job_rule_apply(struct dp_trace_job_desc desc[], unsigned int nr_desc)
void telemetry_job_add_cb(const char * table_name, int table_id, const char * key, const char * table_line, void ** ad,
long argl, void * argp)
{
- int ret = 0;
struct dp_trace_telemetry_desc telemetry_desc = {};
struct dp_trace_job_desc * job_desc = &telemetry_desc.job_desc;
- char uuid[40] = {};
- unsigned int pkt_cnt_max = 0;
- char bpf_expr[MR_BPF_EXPRESSION_MAX];
- unsigned int timeout = 0;
- unsigned int sampling = 0;
- unsigned int snaplen = 0;
- unsigned int with_packet_capture = 0;
- char device_group[512];
- char traffic_link_id[1024];
- unsigned int is_valid = 0;
-
- dzlog_debug("telemetry add maat parse config rule:%s", table_line);
- ret = sscanf(table_line, "%s\t%u\t%s\t%u\t%u\t%u\t%u\t%512s\t%1024s\t%u", uuid, &pkt_cnt_max, bpf_expr, &timeout,
- &sampling, &snaplen, &with_packet_capture, device_group, traffic_link_id, &is_valid);
- if (ret != 10)
- {
- dzlog_warn("maat parse config failed. Not enough fields:%s", table_line);
- // return;
- }
-
- if (is_valid == 0)
+ bool valid_rule = maat_rule_parse(table_line, &telemetry_desc);
+ if (valid_rule == false)
{
- dzlog_warn("rule is not valid:%s", table_line);
- // return;
- }
-
- ret = uuid_parse(uuid, telemetry_desc.uuid);
- if (ret != 0)
- {
- dzlog_error("uuid parsing failed: %s", uuid);
+ dzlog_warn("parse maat rule failed.");
return;
}
- backspace_remove(bpf_expr, job_desc->bpf_expr);
-
- job_desc->pkt_cnt_max = pkt_cnt_max;
- job_desc->sampling = sampling;
- job_desc->snaplen = snaplen;
- job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY;
-
int index = telemetry_unused_job_index_get();
if (index < 0)
{
- dzlog_warn("no enough job for bpf_expr:", job_desc->bpf_expr);
+ dzlog_warn("no enough job for current rule:%s", table_line);
return;
}
job_desc->rule_index = index;
job_desc->enable = true;
-
memcpy(&telemetry_descs[index], &telemetry_desc, sizeof(struct dp_trace_telemetry_desc));
job_rule_apply(&telemetry_descs[index].job_desc, 1);
@@ -201,11 +168,54 @@ static int telemetry_unused_job_index_get()
static void job_desc_dump(struct dp_trace_job_desc * desc)
{
dzlog_info("dp trace job desc dump");
+ dzlog_info("rule_index:%u", desc->rule_index);
dzlog_info("enable:%u", desc->enable);
dzlog_info("measurement_type:%u", desc->measurement_type);
- dzlog_info("rule_index:%u", desc->rule_index);
dzlog_info("bpf_expr:%s", desc->bpf_expr);
dzlog_info("pkt_cnt_max:%u", desc->pkt_cnt_max);
dzlog_info("sampling:%u", desc->sampling);
dzlog_info("snaplen:%u", desc->snaplen);
-} \ No newline at end of file
+
+ dzlog_info("traffic_link_id_cnt:%u", desc->traffic_link_id_cnt);
+ char traffic_link_id_str[512];
+ int len = snprintf(traffic_link_id_str, sizeof(traffic_link_id_str), "%s", "[");
+ for (unsigned int i = 0; i < desc->traffic_link_id_cnt; i++)
+ {
+ char * normal_format = "%u,";
+ char * last_format = "%u";
+
+ char * fromat = normal_format;
+ if (i == desc->traffic_link_id_cnt - 1)
+ {
+ fromat = last_format;
+ }
+ len +=
+ snprintf(traffic_link_id_str + len, sizeof(traffic_link_id_str) - len, fromat, desc->traffic_link_ids[i]);
+ }
+ len += snprintf(traffic_link_id_str + len, sizeof(traffic_link_id_str) - len, "%s", "]");
+ dzlog_info("traffic_link_ids:%s", traffic_link_id_str);
+}
+
+#if 0
+int traffic_link_id_extract(char * traffic_str, uint16_t traffic_link_ids[], unsigned int size)
+{
+ unsigned int traffic_link_id_cnt = 0;
+ const char * delim = "[], ";
+ char * token = strtok(traffic_str, delim);
+
+ for (unsigned int i = 0; i < size; i++)
+ {
+ if (token != NULL)
+ {
+ char * endptr;
+ uint16_t id = (uint16_t)strtol(token, &endptr, 10);
+ if (endptr != token)
+ {
+ traffic_link_ids[traffic_link_id_cnt++] = id;
+ }
+ }
+ token = strtok(NULL, delim);
+ }
+ return traffic_link_id_cnt;
+}
+#endif \ No newline at end of file
diff --git a/src/maat.c b/src/maat.c
index e0b4ea1..4fc741e 100644
--- a/src/maat.c
+++ b/src/maat.c
@@ -1,8 +1,8 @@
#include "maat.h"
#include "config.h"
-#include "job_ctx.h"
#include <MESA/maat.h>
+#include <cjson/cJSON.h>
static struct maat * dp_trace_maat_instance_create()
{
@@ -88,4 +88,149 @@ void dp_trace_maat_init()
telemetry_job_del_cb, NULL, 0, NULL);
DP_TRACE_VERIFY(ret == 0, "failed at register callback of DATAPATH_TELEMETRY_JOB.");
dzlog_info("data path trace maat init end");
+}
+
+bool maat_rule_parse(const char * table_line, struct dp_trace_telemetry_desc * telemetry_desc)
+{
+ struct dp_trace_job_desc * job_desc = &telemetry_desc->job_desc;
+ bool valid_rule = false;
+
+ const struct config * conf = global_config_get();
+
+ char uuid[40] = {};
+ unsigned int pkt_cnt_max = 0;
+ char bpf_expr[MR_BPF_EXPRESSION_MAX] = "";
+ unsigned int timeout = 0;
+ unsigned int sampling = 0;
+ unsigned int snaplen = 0;
+ unsigned int with_packet_capture = 0;
+ char device_group[1024] = "";
+ char traffic_link_id[128] = "";
+ unsigned int is_valid = 1;
+
+ cJSON * device_group_json = NULL;
+ cJSON * traffic_json = NULL;
+ int ret = 0;
+
+ dzlog_info("telemetry add maat parse config rule:%s", table_line);
+ ret = sscanf(table_line, "%s\t%u\t%127s\t%u\t%u\t%u\t%u\t%1023s\t%127s\t%u", uuid, &pkt_cnt_max, bpf_expr, &timeout,
+ &sampling, &snaplen, &with_packet_capture, device_group, traffic_link_id, &is_valid);
+ if (ret != 10)
+ {
+ dzlog_warn("maat parse config failed. Not enough fields:%s", table_line);
+ goto end;
+ }
+
+ bool device_group_match = false;
+ device_group_json = cJSON_Parse(device_group);
+ if (device_group_json == NULL)
+ {
+ dzlog_error("parse to json failed:%s", device_group);
+ goto end;
+ }
+
+ cJSON * tag_sets_item = cJSON_GetObjectItem(device_group_json, "tag_sets");
+ if (!cJSON_IsArray(tag_sets_item))
+ {
+ dzlog_warn("tag_sets value is not array");
+ goto device_group_match_end;
+ }
+
+ int tag_sets_array_size = cJSON_GetArraySize(tag_sets_item);
+ for (int i = 0; i < tag_sets_array_size; i++)
+ {
+ cJSON * tag_sets_item_i = cJSON_GetArrayItem(tag_sets_item, i);
+ if (!cJSON_IsArray(tag_sets_item_i))
+ {
+ dzlog_warn("tag_sets value %d is not array", i);
+ goto device_group_match_end;
+ }
+
+ int tag_sets_item_i_array_size = cJSON_GetArraySize(tag_sets_item_i);
+ for (int j = 0; j < tag_sets_item_i_array_size; j++)
+ {
+ cJSON * inner_item = cJSON_GetArrayItem(tag_sets_item_i, j);
+ cJSON * inner_item_value = cJSON_GetObjectItem(inner_item, "value");
+ if (!cJSON_IsArray(inner_item_value))
+ {
+ dzlog_warn("tag_sets %d-%d 'value' is not array", i, j);
+ goto device_group_match_end;
+ }
+
+ int value_array_size = cJSON_GetArraySize(inner_item_value);
+ for (int k = 0; k < value_array_size; k++)
+ {
+ cJSON * value_item_k = cJSON_GetArrayItem(inner_item_value, k);
+ if (cJSON_IsString(value_item_k))
+ {
+ if (strcasecmp(conf->device_group, value_item_k->valuestring) == 0)
+ {
+ device_group_match = true;
+ goto device_group_match_end;
+ }
+ }
+ }
+ }
+ }
+
+device_group_match_end:
+ if (device_group_match == false)
+ {
+ dzlog_info("This rule does not apply to the current device(%s)", conf->device_group);
+ goto end;
+ }
+
+ if (is_valid == 0)
+ {
+ dzlog_warn("rule is not valid:%s", table_line);
+ goto end;
+ }
+
+ ret = uuid_parse(uuid, telemetry_desc->uuid);
+ if (ret != 0)
+ {
+ dzlog_error("uuid parsing failed: %s", uuid);
+ goto end;
+ }
+
+ backspace_remove(bpf_expr, job_desc->bpf_expr);
+
+ traffic_json = cJSON_Parse(traffic_link_id);
+ if (traffic_json == NULL || !cJSON_IsArray(traffic_json))
+ {
+ dzlog_error("parse traffic link id string to json failed. the string is:%s", traffic_link_id);
+ goto end;
+ }
+
+ int traffic_obj_array_size = cJSON_GetArraySize(traffic_json);
+ int id_num_max = (traffic_obj_array_size < DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX)
+ ? traffic_obj_array_size
+ : DP_TRACE_TRAFFIC_LINK_ID_ARRAY_SIZE_MAX;
+ for (int i = 0; i < id_num_max; i++)
+ {
+ cJSON * traffic_obj_item_i = cJSON_GetArrayItem(traffic_json, i);
+ if (traffic_obj_item_i != NULL && cJSON_IsNumber(traffic_obj_item_i))
+ {
+ uint16_t value = (uint16_t)traffic_obj_item_i->valueint;
+ job_desc->traffic_link_ids[job_desc->traffic_link_id_cnt++] = value;
+ }
+ }
+
+ job_desc->pkt_cnt_max = pkt_cnt_max;
+ job_desc->sampling = sampling;
+ job_desc->snaplen = snaplen;
+ job_desc->measurement_type = DP_TRACE_MEASUREMENT_TYPE_TELEMETRY;
+
+ valid_rule = true;
+
+end:
+ if (device_group_json)
+ {
+ cJSON_Delete(device_group_json);
+ }
+ if (traffic_json)
+ {
+ cJSON_Delete(traffic_json);
+ }
+ return valid_rule;
} \ No newline at end of file
diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 3c175f2..2098f58 100644
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -6,6 +6,7 @@ set(DP_TRACE_TELEMETRY_SOURCES
${CMAKE_SOURCE_DIR}/src/job_ctx.c
${CMAKE_SOURCE_DIR}/src/trace_output.c
${CMAKE_SOURCE_DIR}/src/kafka.c
+ ${CMAKE_SOURCE_DIR}/src/maat.c
${CMAKE_SOURCE_DIR}/src/mocking.c
${CMAKE_SOURCE_DIR}/src/monit.c
${CMAKE_SOURCE_DIR}/src/http_serv.c