summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorliuchang <[email protected]>2023-05-08 06:54:16 +0000
committerliuchang <[email protected]>2023-05-08 06:54:16 +0000
commit36db46774880dceab9abfb1965f0d228257ec4d5 (patch)
treed4b3f818288c75cbb509bd3f62e75e3e6550331c
parentd8c5e552a9ee59f5d94df49ca71f9c5927107713 (diff)
send log wrapped by mpack
-rw-r--r--shaping/include/shaper.h20
-rw-r--r--shaping/include/shaper_marsio.h15
-rw-r--r--shaping/include/shaper_session.h4
-rw-r--r--shaping/src/shaper.cpp6
-rw-r--r--shaping/src/shaper_session.cpp90
-rw-r--r--shaping/test/CMakeLists.txt9
-rw-r--r--shaping/test/gtest_shaper_send_log.cpp148
-rw-r--r--shaping/test/stub.cpp20
8 files changed, 273 insertions, 39 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 5197267..5bf3feb 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -1,10 +1,10 @@
-#ifndef _SHAPER_H
-#define _SHAPER_H
+#pragma once
#include <sys/queue.h>
#include <marsio.h>
#include "uthash.h"
#include "session_table.h"
+#include "utils.h"
#define SHAPING_DIR_IN 0x1
#define SHAPING_DIR_OUT 0x2
@@ -98,6 +98,19 @@ struct shaping_packet_wrapper {
};
TAILQ_HEAD(delay_queue, shaping_packet_wrapper);
+struct metadata
+{
+ uint64_t session_id;
+ char *raw_data;
+ int raw_len;
+ int dir;
+ int is_tcp_pure_ctrl;
+ int is_ctrl_pkt;
+ uint16_t l7_offset; // only control packet set l7_offset
+ struct sids sids;
+ struct route_ctx route_ctx;
+};
+
struct shaping_flow {
struct addr_tuple4 tuple4;
struct delay_queue packet_queue;
@@ -107,7 +120,7 @@ struct shaping_flow {
int ref_count;
unsigned int queue_len;
unsigned int flag;
- struct metadata *ctrl_meta;
+ struct metadata ctrl_meta;
};
struct shaper_flow_instance {
@@ -141,4 +154,3 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
struct shaping_ctx *shaping_engine_init();
void shaping_engine_destroy(struct shaping_ctx *ctx);
-#endif \ No newline at end of file
diff --git a/shaping/include/shaper_marsio.h b/shaping/include/shaper_marsio.h
index ca12671..fdc9c86 100644
--- a/shaping/include/shaper_marsio.h
+++ b/shaping/include/shaper_marsio.h
@@ -1,3 +1,5 @@
+#pragma once
+
#include <marsio.h>
#include "shaper.h"
#include "utils.h"
@@ -27,19 +29,6 @@ struct shaping_marsio_info
int rx_brust_max;
};
-struct metadata
-{
- uint64_t session_id;
- char *raw_data;
- int raw_len;
- int dir;
- int is_tcp_pure_ctrl;
- int is_ctrl_pkt;
- uint16_t l7_offset; // only control packet set l7_offset
- struct sids sids;
- struct route_ctx route_ctx;
-};
-
enum session_state
{
SESSION_STATE_OPENING = 1,
diff --git a/shaping/include/shaper_session.h b/shaping/include/shaper_session.h
index 1cfdf6d..98af727 100644
--- a/shaping/include/shaper_session.h
+++ b/shaping/include/shaper_session.h
@@ -4,4 +4,6 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct metadata *meta);
struct shaping_flow* shaper_session_active(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser);
struct shaping_flow* shaper_session_reset_all(struct shaping_thread_ctx *ctx, struct metadata *meta);
-void shaper_session_data_free_cb(void *session_data, void *data); \ No newline at end of file
+void shaper_session_data_free_cb(void *session_data, void *data);
+
+void shaper_session_log_prepare(struct shaping_flow *sf, char **mpack_data, size_t *mpack_size); \ No newline at end of file
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index b3e0825..6d1ec51 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -113,9 +113,11 @@ static void shaping_node_free(struct shaping_node *s_node)
avl_tree_node_free(s_node->avl_node[i]);
}
}
- if (s_node->shaping_flow.ctrl_meta) {
- free(s_node->shaping_flow.ctrl_meta);
+
+ if (s_node->shaping_flow.ctrl_meta.raw_data) {
+ free(s_node->shaping_flow.ctrl_meta.raw_data);
}
+
free(s_node);
}
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index ae3dfae..ec7b9d0 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -8,6 +8,7 @@
#include "shaper_stat.h"
#include "shaper_global_stat.h"
#include "shaper.h"
+#include "mpack.h"
struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser)
{
@@ -24,7 +25,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
sf = shaping_flow_new();
raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4);
//shaper_rules_update(ctx, sf, ctrl_data->shaping_rule_ids, ctrl_data->shaping_rule_num);
- shaper_marsio_metadata_deep_copy(sf->ctrl_meta, meta);
+ shaper_marsio_metadata_deep_copy(&sf->ctrl_meta, meta);
session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL);
@@ -33,53 +34,105 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
return sf;
}
-static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+void shaper_session_log_prepare(struct shaping_flow *sf, char **mpack_data, size_t *mpack_size)
{
- if (!sf->ctrl_meta) {
- LOG_ERROR("%s: ctrl meta for session %s doesn't exist", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
- return;
+ mpack_writer_t writer;
+ struct shaping_rule_info *rule_info;
+
+ mpack_writer_init_growable(&writer, mpack_data, mpack_size);
+ mpack_build_map(&writer);
+
+ //tsync
+ mpack_write_cstr(&writer, "tsync");
+ mpack_write_cstr(&writer, "2.0");
+
+ //session_id
+ mpack_write_cstr(&writer, "session_id");
+ mpack_write_u64(&writer, sf->ctrl_meta.session_id);
+
+ //state
+ mpack_write_cstr(&writer, "state");
+ mpack_write_cstr(&writer, "active");
+
+ //method
+ mpack_write_cstr(&writer, "method");
+ mpack_write_cstr(&writer, "log_update");
+
+ //params
+ mpack_write_cstr(&writer, "params");
+ mpack_build_map(&writer);
+ mpack_write_cstr(&writer, "shaper");
+ mpack_build_array(&writer);
+ for (int i = 0; i < sf->rule_num; i++) {
+ rule_info = &sf->matched_rule_infos[i];
+
+ mpack_build_map(&writer);
+ mpack_write_cstr(&writer, "rule_id");
+ mpack_write_i64(&writer, rule_info->id);
+ mpack_write_cstr(&writer, "profile_ids");
+ mpack_build_array(&writer);
+ mpack_write_i64(&writer, rule_info->primary.id);
+ for (int j = 0; j < rule_info->borrowing_num; j++) {
+ mpack_write_i64(&writer, rule_info->borrowing[j].id);
+ }
+ mpack_complete_array(&writer);//end build array for profile_ids
+ mpack_complete_map(&writer);
}
+ mpack_complete_array(&writer);//end build array for shaper
+ mpack_complete_map(&writer);//end build map for params
+ mpack_complete_map(&writer);//end build map at the beginning
- /************************/
- //TODO: add content to msgpack
- /************************/
+ mpack_writer_destroy(&writer);
+ return;
+}
+
+static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
+{
+ char *mpack_data = NULL;
+ size_t mpack_size = 0;
+
+ shaper_session_log_prepare(sf, &mpack_data, &mpack_size);
marsio_buff_t *tx_buff;
- char *pkt_header_data = sf->ctrl_meta->raw_data;
- int pkt_header_len = sf->ctrl_meta->l7_offset;
- struct metadata *ctrl_meta = sf->ctrl_meta;
+ char *pkt_header_data = sf->ctrl_meta.raw_data;
+ int pkt_header_len = sf->ctrl_meta.l7_offset;
+ struct metadata *ctrl_meta = &sf->ctrl_meta;
struct sids sids;
marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index);
- char *dst = marsio_buff_append(tx_buff, pkt_header_len + msgpack_len);
+ char *dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size);
memcpy(dst, pkt_header_data, pkt_header_len);
- memcpy(dst + pkt_header_len, msgpack_data, msgpack_len);
+ memcpy(dst + pkt_header_len, mpack_data, mpack_size);
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &ctrl_meta->session_id, sizeof(ctrl_meta->session_id)) != 0) {
LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_SESSION_ID failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
- return;
+ goto END;
}
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &pkt_header_len, sizeof(pkt_header_len)) != 0) {
LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_PAYLOAD_OFFSET failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
- return;
+ goto END;
}
sids.elems[0] = ctx->firewall_sid;
sids.num = 1;
if (marsio_buff_set_sid_list(tx_buff, sids.elems, sids.num) != 0) {
LOG_ERROR("%s: marsio_buff_set_sid_list failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
- return;
+ goto END;
}
if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, ctrl_meta->route_ctx.data, ctrl_meta->route_ctx.len) != 0) {
LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_ROUTE_CTX failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4));
- return;
+ goto END;
}
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &tx_buff, 1);
shaper_global_stat_session_log_send_num_inc(ctx->global_stat);
+END:
+ if (mpack_data) {
+ free(mpack_data);
+ }
return;
}
@@ -96,8 +149,6 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct
sf = (struct shaping_flow *)session_node->val_data;
sf->flag |= STREAM_CLOSE;
- shaper_session_log_send(ctx, sf);
-
session_table_delete_by_id(ctx->session_table, meta->session_id);
shaper_global_stat_curr_session_dec(ctx->global_stat);
@@ -117,6 +168,7 @@ struct shaping_flow* shaper_session_active(struct shaping_thread_ctx *ctx, struc
}
shaper_rules_update(ctx, sf, ctrl_data->shaping_rule_ids, ctrl_data->shaping_rule_num);
+ shaper_session_log_send(ctx, sf);//send log of rules and profiles when receive new matched rules
return sf;
}
diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt
index 1b527b8..4a9436f 100644
--- a/shaping/test/CMakeLists.txt
+++ b/shaping/test/CMakeLists.txt
@@ -9,6 +9,15 @@ target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/shaping/
target_link_libraries(gtest_shaper_maat common shaper pthread gtest)
###############################################################################
+# gtest_shaper_maat
+###############################################################################
+
+add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp)
+target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/common/include)
+target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include)
+target_link_libraries(gtest_shaper_send_log common shaper pthread gtest)
+
+###############################################################################
# gtest_shaper
###############################################################################
diff --git a/shaping/test/gtest_shaper_send_log.cpp b/shaping/test/gtest_shaper_send_log.cpp
new file mode 100644
index 0000000..effd7bf
--- /dev/null
+++ b/shaping/test/gtest_shaper_send_log.cpp
@@ -0,0 +1,148 @@
+#include <gtest/gtest.h>
+#include "mpack.h"
+#include "shaper.h"
+#include "shaper_session.h"
+#include "shaper_marsio.h"
+
+static void gtest_shaper_log_parse(struct shaping_flow *sf, const char *data, size_t length)
+{
+ int shaping_rule_array_size;
+ char temp_str[16] = {0};
+ mpack_tree_t tree;
+
+ mpack_node_t root;
+ mpack_node_t tmp_node;
+ mpack_node_t tmp_rule_node;
+ mpack_node_t tmp_profile_node;
+
+ mpack_tree_init_data(&tree, data, length);
+ mpack_tree_parse(&tree);
+ root = mpack_tree_root(&tree);
+
+ //tsync
+ tmp_node = mpack_node_map_cstr(root, "tsync");
+ EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node));
+ memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node));
+ EXPECT_STREQ("2.0", temp_str);
+
+ //session_id
+ tmp_node = mpack_node_map_cstr(root, "session_id");
+ EXPECT_EQ(mpack_type_uint, mpack_node_type(tmp_node));
+ sf->ctrl_meta.session_id = mpack_node_u64(tmp_node);
+
+ //state
+ tmp_node = mpack_node_map_cstr(root, "state");
+ EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node));
+ memset(temp_str, 0, sizeof(temp_str));
+ memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node));
+ EXPECT_STREQ("active", temp_str);
+
+ //method
+ tmp_node = mpack_node_map_cstr(root, "method");
+ EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node));
+ memset(temp_str, 0, sizeof(temp_str));
+ memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node));
+ EXPECT_STREQ("log_update", temp_str);
+
+ //shaping rules
+ tmp_node = mpack_node_map_cstr(root, "params");
+ EXPECT_EQ(mpack_type_map, mpack_node_type(tmp_node));
+
+ tmp_node = mpack_node_map_cstr(tmp_node, "shaper");
+ EXPECT_EQ(mpack_type_array, mpack_node_type(tmp_node));
+
+ shaping_rule_array_size = mpack_node_array_length(tmp_node);
+ sf->rule_num = MIN(shaping_rule_array_size, SHAPING_RULE_NUM_MAX);
+ for (int i = 0; i < sf->rule_num; i++) {
+ ASSERT_EQ(mpack_type_map, mpack_node_type(mpack_node_array_at(tmp_node, i)) );
+ tmp_rule_node = mpack_node_map_cstr(mpack_node_array_at(tmp_node, i), "rule_id");
+ EXPECT_EQ(mpack_type_uint, mpack_node_type(tmp_rule_node));
+ sf->matched_rule_infos[i].id = mpack_node_u64(tmp_rule_node);
+
+ tmp_profile_node = mpack_node_map_cstr(mpack_node_array_at(tmp_node, i), "profile_ids");
+ ASSERT_EQ(mpack_type_array, mpack_node_type(tmp_profile_node));
+ int profile_array_len = mpack_node_array_length(tmp_profile_node);
+ sf->matched_rule_infos[i].borrowing_num = profile_array_len - 1;
+ for (int j = 0; j < profile_array_len; j++) {
+ ASSERT_EQ(mpack_type_uint, mpack_node_type(mpack_node_array_at(tmp_profile_node, j)) );
+ if (j == 0) {
+ sf->matched_rule_infos[i].primary.id = mpack_node_u64(mpack_node_array_at(tmp_profile_node, j));
+ } else {
+ sf->matched_rule_infos[i].borrowing[j - 1].id = mpack_node_u64(mpack_node_array_at(tmp_profile_node, j));
+ }
+ }
+ }
+
+ mpack_tree_destroy(&tree);
+
+ return;
+}
+
+TEST(MPACK_LOG, PARSE)
+{
+ struct shaping_flow sf_in;
+ struct shaping_flow sf_out;
+ char *mpack_data = NULL;
+ size_t mpack_size = 0;
+
+ sf_in.ctrl_meta.session_id = 12345678;
+ sf_in.rule_num = 3;
+
+ //rule_id 0, primary profile id 0, borrow profile id 1
+ sf_in.matched_rule_infos[0].id = 0;
+ sf_in.matched_rule_infos[0].primary.id = 0;
+ sf_in.matched_rule_infos[0].borrowing_num = 1;
+ sf_in.matched_rule_infos[0].borrowing[0].id = 1;
+
+ //rule id 1, primary profile id 2, borrow profile id 3,4
+ sf_in.matched_rule_infos[1].id = 1;
+ sf_in.matched_rule_infos[1].primary.id = 2;
+ sf_in.matched_rule_infos[1].borrowing_num = 2;
+ sf_in.matched_rule_infos[1].borrowing[0].id = 3;
+ sf_in.matched_rule_infos[1].borrowing[1].id = 4;
+
+ //rule id 2, primary profile id 5, borrow profile id 6,7,8
+ sf_in.matched_rule_infos[2].id = 2;
+ sf_in.matched_rule_infos[2].primary.id = 5;
+ sf_in.matched_rule_infos[2].borrowing_num = 3;
+ sf_in.matched_rule_infos[2].borrowing[0].id = 6;
+ sf_in.matched_rule_infos[2].borrowing[1].id = 7;
+ sf_in.matched_rule_infos[2].borrowing[2].id = 8;
+
+ shaper_session_log_prepare(&sf_in, &mpack_data, &mpack_size);
+ gtest_shaper_log_parse(&sf_out, mpack_data, mpack_size);
+
+ EXPECT_EQ(sf_out.ctrl_meta.session_id, 12345678);
+ EXPECT_EQ(sf_out.rule_num, 3);
+
+ //rule_id 0, primary profile id 0, borrow profile id 1
+ EXPECT_EQ(sf_out.matched_rule_infos[0].id, 0);
+ EXPECT_EQ(sf_out.matched_rule_infos[0].primary.id, 0);
+ EXPECT_EQ(sf_out.matched_rule_infos[0].borrowing_num, 1);
+ EXPECT_EQ(sf_out.matched_rule_infos[0].borrowing[0].id, 1);
+
+ //rule id 1, primary profile id 2, borrow profile id 3,4
+ EXPECT_EQ(sf_out.matched_rule_infos[1].id, 1);
+ EXPECT_EQ(sf_out.matched_rule_infos[1].primary.id, 2);
+ EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing_num, 2);
+ EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing[0].id, 3);
+ EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing[1].id, 4);
+
+ //rule id 2, primary profile id 5, borrow profile id 6,7,8
+ EXPECT_EQ(sf_out.matched_rule_infos[2].id, 2);
+ EXPECT_EQ(sf_out.matched_rule_infos[2].primary.id, 5);
+ EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing_num, 3);
+ EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[0].id, 6);
+ EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[1].id, 7);
+ EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[2].id, 8);
+
+ if (mpack_data) {
+ free(mpack_data);
+ }
+}
+
+int main(int argc, char **argv)
+{
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+} \ No newline at end of file
diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp
index 67ba815..b2d8cc9 100644
--- a/shaping/test/stub.cpp
+++ b/shaping/test/stub.cpp
@@ -604,4 +604,24 @@ char * marsio_buff_mtod(marsio_buff_t * m)
{
return NULL;
}
+
+int marsio_buff_malloc_global(struct mr_instance * instance, marsio_buff_t * marsio_buff[], unsigned int nr_mbufs, int socket_id, int thread_id)
+{
+ return 0;
+}
+
+char * marsio_buff_append(marsio_buff_t * m, uint16_t len)
+{
+ return NULL;
+}
+
+int marsio_buff_set_metadata(marsio_buff_t * m, enum mr_buff_metadata_type type, void * data, unsigned int sz_data)
+{
+ return 0;
+}
+
+int marsio_buff_set_sid_list(marsio_buff_t * m, sid_t * slist, uint8_t sz_slist)
+{
+ return 0;
+}
/***************************************************/ \ No newline at end of file