diff options
| author | liuchang <[email protected]> | 2023-05-08 06:54:16 +0000 |
|---|---|---|
| committer | liuchang <[email protected]> | 2023-05-08 06:54:16 +0000 |
| commit | 36db46774880dceab9abfb1965f0d228257ec4d5 (patch) | |
| tree | d4b3f818288c75cbb509bd3f62e75e3e6550331c | |
| parent | d8c5e552a9ee59f5d94df49ca71f9c5927107713 (diff) | |
send log wrapped by mpack
| -rw-r--r-- | shaping/include/shaper.h | 20 | ||||
| -rw-r--r-- | shaping/include/shaper_marsio.h | 15 | ||||
| -rw-r--r-- | shaping/include/shaper_session.h | 4 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 6 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 90 | ||||
| -rw-r--r-- | shaping/test/CMakeLists.txt | 9 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper_send_log.cpp | 148 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 20 |
8 files changed, 273 insertions, 39 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 5197267..5bf3feb 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -1,10 +1,10 @@ -#ifndef _SHAPER_H -#define _SHAPER_H +#pragma once #include <sys/queue.h> #include <marsio.h> #include "uthash.h" #include "session_table.h" +#include "utils.h" #define SHAPING_DIR_IN 0x1 #define SHAPING_DIR_OUT 0x2 @@ -98,6 +98,19 @@ struct shaping_packet_wrapper { }; TAILQ_HEAD(delay_queue, shaping_packet_wrapper); +struct metadata +{ + uint64_t session_id; + char *raw_data; + int raw_len; + int dir; + int is_tcp_pure_ctrl; + int is_ctrl_pkt; + uint16_t l7_offset; // only control packet set l7_offset + struct sids sids; + struct route_ctx route_ctx; +}; + struct shaping_flow { struct addr_tuple4 tuple4; struct delay_queue packet_queue; @@ -107,7 +120,7 @@ struct shaping_flow { int ref_count; unsigned int queue_len; unsigned int flag; - struct metadata *ctrl_meta; + struct metadata ctrl_meta; }; struct shaper_flow_instance { @@ -141,4 +154,3 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu struct shaping_ctx *shaping_engine_init(); void shaping_engine_destroy(struct shaping_ctx *ctx); -#endif
\ No newline at end of file diff --git a/shaping/include/shaper_marsio.h b/shaping/include/shaper_marsio.h index ca12671..fdc9c86 100644 --- a/shaping/include/shaper_marsio.h +++ b/shaping/include/shaper_marsio.h @@ -1,3 +1,5 @@ +#pragma once + #include <marsio.h> #include "shaper.h" #include "utils.h" @@ -27,19 +29,6 @@ struct shaping_marsio_info int rx_brust_max; }; -struct metadata -{ - uint64_t session_id; - char *raw_data; - int raw_len; - int dir; - int is_tcp_pure_ctrl; - int is_ctrl_pkt; - uint16_t l7_offset; // only control packet set l7_offset - struct sids sids; - struct route_ctx route_ctx; -}; - enum session_state { SESSION_STATE_OPENING = 1, diff --git a/shaping/include/shaper_session.h b/shaping/include/shaper_session.h index 1cfdf6d..98af727 100644 --- a/shaping/include/shaper_session.h +++ b/shaping/include/shaper_session.h @@ -4,4 +4,6 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct metadata *meta); struct shaping_flow* shaper_session_active(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser); struct shaping_flow* shaper_session_reset_all(struct shaping_thread_ctx *ctx, struct metadata *meta); -void shaper_session_data_free_cb(void *session_data, void *data);
\ No newline at end of file +void shaper_session_data_free_cb(void *session_data, void *data); + +void shaper_session_log_prepare(struct shaping_flow *sf, char **mpack_data, size_t *mpack_size);
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index b3e0825..6d1ec51 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -113,9 +113,11 @@ static void shaping_node_free(struct shaping_node *s_node) avl_tree_node_free(s_node->avl_node[i]); } } - if (s_node->shaping_flow.ctrl_meta) { - free(s_node->shaping_flow.ctrl_meta); + + if (s_node->shaping_flow.ctrl_meta.raw_data) { + free(s_node->shaping_flow.ctrl_meta.raw_data); } + free(s_node); } diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index ae3dfae..ec7b9d0 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -8,6 +8,7 @@ #include "shaper_stat.h" #include "shaper_global_stat.h" #include "shaper.h" +#include "mpack.h" struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, struct metadata *meta, struct ctrl_pkt_data *ctrl_data, struct raw_pkt_parser *raw_parser) { @@ -24,7 +25,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru sf = shaping_flow_new(); raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4); //shaper_rules_update(ctx, sf, ctrl_data->shaping_rule_ids, ctrl_data->shaping_rule_num); - shaper_marsio_metadata_deep_copy(sf->ctrl_meta, meta); + shaper_marsio_metadata_deep_copy(&sf->ctrl_meta, meta); session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL); @@ -33,53 +34,105 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru return sf; } -static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +void shaper_session_log_prepare(struct shaping_flow *sf, char **mpack_data, size_t *mpack_size) { - if (!sf->ctrl_meta) { - LOG_ERROR("%s: ctrl meta for session %s doesn't exist", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); - return; + mpack_writer_t writer; + struct shaping_rule_info *rule_info; + + mpack_writer_init_growable(&writer, mpack_data, mpack_size); + mpack_build_map(&writer); + + //tsync + mpack_write_cstr(&writer, "tsync"); + mpack_write_cstr(&writer, "2.0"); + + //session_id + mpack_write_cstr(&writer, "session_id"); + mpack_write_u64(&writer, sf->ctrl_meta.session_id); + + //state + mpack_write_cstr(&writer, "state"); + mpack_write_cstr(&writer, "active"); + + //method + mpack_write_cstr(&writer, "method"); + mpack_write_cstr(&writer, "log_update"); + + //params + mpack_write_cstr(&writer, "params"); + mpack_build_map(&writer); + mpack_write_cstr(&writer, "shaper"); + mpack_build_array(&writer); + for (int i = 0; i < sf->rule_num; i++) { + rule_info = &sf->matched_rule_infos[i]; + + mpack_build_map(&writer); + mpack_write_cstr(&writer, "rule_id"); + mpack_write_i64(&writer, rule_info->id); + mpack_write_cstr(&writer, "profile_ids"); + mpack_build_array(&writer); + mpack_write_i64(&writer, rule_info->primary.id); + for (int j = 0; j < rule_info->borrowing_num; j++) { + mpack_write_i64(&writer, rule_info->borrowing[j].id); + } + mpack_complete_array(&writer);//end build array for profile_ids + mpack_complete_map(&writer); } + mpack_complete_array(&writer);//end build array for shaper + mpack_complete_map(&writer);//end build map for params + mpack_complete_map(&writer);//end build map at the beginning - /************************/ - //TODO: add content to msgpack - /************************/ + mpack_writer_destroy(&writer); + return; +} + +static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) +{ + char *mpack_data = NULL; + size_t mpack_size = 0; + + shaper_session_log_prepare(sf, &mpack_data, &mpack_size); marsio_buff_t *tx_buff; - char *pkt_header_data = sf->ctrl_meta->raw_data; - int pkt_header_len = sf->ctrl_meta->l7_offset; - struct metadata *ctrl_meta = sf->ctrl_meta; + char *pkt_header_data = sf->ctrl_meta.raw_data; + int pkt_header_len = sf->ctrl_meta.l7_offset; + struct metadata *ctrl_meta = &sf->ctrl_meta; struct sids sids; marsio_buff_malloc_global(ctx->marsio_info->instance, &tx_buff, 1, 0, ctx->thread_index); - char *dst = marsio_buff_append(tx_buff, pkt_header_len + msgpack_len); + char *dst = marsio_buff_append(tx_buff, pkt_header_len + mpack_size); memcpy(dst, pkt_header_data, pkt_header_len); - memcpy(dst + pkt_header_len, msgpack_data, msgpack_len); + memcpy(dst + pkt_header_len, mpack_data, mpack_size); if (marsio_buff_set_metadata(tx_buff, MR_BUFF_SESSION_ID, &ctrl_meta->session_id, sizeof(ctrl_meta->session_id)) != 0) { LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_SESSION_ID failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); - return; + goto END; } if (marsio_buff_set_metadata(tx_buff, MR_BUFF_PAYLOAD_OFFSET, &pkt_header_len, sizeof(pkt_header_len)) != 0) { LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_PAYLOAD_OFFSET failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); - return; + goto END; } sids.elems[0] = ctx->firewall_sid; sids.num = 1; if (marsio_buff_set_sid_list(tx_buff, sids.elems, sids.num) != 0) { LOG_ERROR("%s: marsio_buff_set_sid_list failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); - return; + goto END; } if (marsio_buff_set_metadata(tx_buff, MR_BUFF_ROUTE_CTX, ctrl_meta->route_ctx.data, ctrl_meta->route_ctx.len) != 0) { LOG_ERROR("%s: marsio_buff_set_metadata MR_BUFF_ROUTE_CTX failed for session %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); - return; + goto END; } marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &tx_buff, 1); shaper_global_stat_session_log_send_num_inc(ctx->global_stat); +END: + if (mpack_data) { + free(mpack_data); + } return; } @@ -96,8 +149,6 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct sf = (struct shaping_flow *)session_node->val_data; sf->flag |= STREAM_CLOSE; - shaper_session_log_send(ctx, sf); - session_table_delete_by_id(ctx->session_table, meta->session_id); shaper_global_stat_curr_session_dec(ctx->global_stat); @@ -117,6 +168,7 @@ struct shaping_flow* shaper_session_active(struct shaping_thread_ctx *ctx, struc } shaper_rules_update(ctx, sf, ctrl_data->shaping_rule_ids, ctrl_data->shaping_rule_num); + shaper_session_log_send(ctx, sf);//send log of rules and profiles when receive new matched rules return sf; } diff --git a/shaping/test/CMakeLists.txt b/shaping/test/CMakeLists.txt index 1b527b8..4a9436f 100644 --- a/shaping/test/CMakeLists.txt +++ b/shaping/test/CMakeLists.txt @@ -9,6 +9,15 @@ target_include_directories(gtest_shaper_maat PUBLIC ${CMAKE_SOURCE_DIR}/shaping/ target_link_libraries(gtest_shaper_maat common shaper pthread gtest) ############################################################################### +# gtest_shaper_maat +############################################################################### + +add_executable(gtest_shaper_send_log gtest_shaper_send_log.cpp stub.cpp) +target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/common/include) +target_include_directories(gtest_shaper_send_log PUBLIC ${CMAKE_SOURCE_DIR}/shaping/include) +target_link_libraries(gtest_shaper_send_log common shaper pthread gtest) + +############################################################################### # gtest_shaper ############################################################################### diff --git a/shaping/test/gtest_shaper_send_log.cpp b/shaping/test/gtest_shaper_send_log.cpp new file mode 100644 index 0000000..effd7bf --- /dev/null +++ b/shaping/test/gtest_shaper_send_log.cpp @@ -0,0 +1,148 @@ +#include <gtest/gtest.h> +#include "mpack.h" +#include "shaper.h" +#include "shaper_session.h" +#include "shaper_marsio.h" + +static void gtest_shaper_log_parse(struct shaping_flow *sf, const char *data, size_t length) +{ + int shaping_rule_array_size; + char temp_str[16] = {0}; + mpack_tree_t tree; + + mpack_node_t root; + mpack_node_t tmp_node; + mpack_node_t tmp_rule_node; + mpack_node_t tmp_profile_node; + + mpack_tree_init_data(&tree, data, length); + mpack_tree_parse(&tree); + root = mpack_tree_root(&tree); + + //tsync + tmp_node = mpack_node_map_cstr(root, "tsync"); + EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node)); + memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node)); + EXPECT_STREQ("2.0", temp_str); + + //session_id + tmp_node = mpack_node_map_cstr(root, "session_id"); + EXPECT_EQ(mpack_type_uint, mpack_node_type(tmp_node)); + sf->ctrl_meta.session_id = mpack_node_u64(tmp_node); + + //state + tmp_node = mpack_node_map_cstr(root, "state"); + EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node)); + memset(temp_str, 0, sizeof(temp_str)); + memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node)); + EXPECT_STREQ("active", temp_str); + + //method + tmp_node = mpack_node_map_cstr(root, "method"); + EXPECT_EQ(mpack_type_str, mpack_node_type(tmp_node)); + memset(temp_str, 0, sizeof(temp_str)); + memcpy(temp_str, mpack_node_str(tmp_node), mpack_node_strlen(tmp_node)); + EXPECT_STREQ("log_update", temp_str); + + //shaping rules + tmp_node = mpack_node_map_cstr(root, "params"); + EXPECT_EQ(mpack_type_map, mpack_node_type(tmp_node)); + + tmp_node = mpack_node_map_cstr(tmp_node, "shaper"); + EXPECT_EQ(mpack_type_array, mpack_node_type(tmp_node)); + + shaping_rule_array_size = mpack_node_array_length(tmp_node); + sf->rule_num = MIN(shaping_rule_array_size, SHAPING_RULE_NUM_MAX); + for (int i = 0; i < sf->rule_num; i++) { + ASSERT_EQ(mpack_type_map, mpack_node_type(mpack_node_array_at(tmp_node, i)) ); + tmp_rule_node = mpack_node_map_cstr(mpack_node_array_at(tmp_node, i), "rule_id"); + EXPECT_EQ(mpack_type_uint, mpack_node_type(tmp_rule_node)); + sf->matched_rule_infos[i].id = mpack_node_u64(tmp_rule_node); + + tmp_profile_node = mpack_node_map_cstr(mpack_node_array_at(tmp_node, i), "profile_ids"); + ASSERT_EQ(mpack_type_array, mpack_node_type(tmp_profile_node)); + int profile_array_len = mpack_node_array_length(tmp_profile_node); + sf->matched_rule_infos[i].borrowing_num = profile_array_len - 1; + for (int j = 0; j < profile_array_len; j++) { + ASSERT_EQ(mpack_type_uint, mpack_node_type(mpack_node_array_at(tmp_profile_node, j)) ); + if (j == 0) { + sf->matched_rule_infos[i].primary.id = mpack_node_u64(mpack_node_array_at(tmp_profile_node, j)); + } else { + sf->matched_rule_infos[i].borrowing[j - 1].id = mpack_node_u64(mpack_node_array_at(tmp_profile_node, j)); + } + } + } + + mpack_tree_destroy(&tree); + + return; +} + +TEST(MPACK_LOG, PARSE) +{ + struct shaping_flow sf_in; + struct shaping_flow sf_out; + char *mpack_data = NULL; + size_t mpack_size = 0; + + sf_in.ctrl_meta.session_id = 12345678; + sf_in.rule_num = 3; + + //rule_id 0, primary profile id 0, borrow profile id 1 + sf_in.matched_rule_infos[0].id = 0; + sf_in.matched_rule_infos[0].primary.id = 0; + sf_in.matched_rule_infos[0].borrowing_num = 1; + sf_in.matched_rule_infos[0].borrowing[0].id = 1; + + //rule id 1, primary profile id 2, borrow profile id 3,4 + sf_in.matched_rule_infos[1].id = 1; + sf_in.matched_rule_infos[1].primary.id = 2; + sf_in.matched_rule_infos[1].borrowing_num = 2; + sf_in.matched_rule_infos[1].borrowing[0].id = 3; + sf_in.matched_rule_infos[1].borrowing[1].id = 4; + + //rule id 2, primary profile id 5, borrow profile id 6,7,8 + sf_in.matched_rule_infos[2].id = 2; + sf_in.matched_rule_infos[2].primary.id = 5; + sf_in.matched_rule_infos[2].borrowing_num = 3; + sf_in.matched_rule_infos[2].borrowing[0].id = 6; + sf_in.matched_rule_infos[2].borrowing[1].id = 7; + sf_in.matched_rule_infos[2].borrowing[2].id = 8; + + shaper_session_log_prepare(&sf_in, &mpack_data, &mpack_size); + gtest_shaper_log_parse(&sf_out, mpack_data, mpack_size); + + EXPECT_EQ(sf_out.ctrl_meta.session_id, 12345678); + EXPECT_EQ(sf_out.rule_num, 3); + + //rule_id 0, primary profile id 0, borrow profile id 1 + EXPECT_EQ(sf_out.matched_rule_infos[0].id, 0); + EXPECT_EQ(sf_out.matched_rule_infos[0].primary.id, 0); + EXPECT_EQ(sf_out.matched_rule_infos[0].borrowing_num, 1); + EXPECT_EQ(sf_out.matched_rule_infos[0].borrowing[0].id, 1); + + //rule id 1, primary profile id 2, borrow profile id 3,4 + EXPECT_EQ(sf_out.matched_rule_infos[1].id, 1); + EXPECT_EQ(sf_out.matched_rule_infos[1].primary.id, 2); + EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing_num, 2); + EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing[0].id, 3); + EXPECT_EQ(sf_out.matched_rule_infos[1].borrowing[1].id, 4); + + //rule id 2, primary profile id 5, borrow profile id 6,7,8 + EXPECT_EQ(sf_out.matched_rule_infos[2].id, 2); + EXPECT_EQ(sf_out.matched_rule_infos[2].primary.id, 5); + EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing_num, 3); + EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[0].id, 6); + EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[1].id, 7); + EXPECT_EQ(sf_out.matched_rule_infos[2].borrowing[2].id, 8); + + if (mpack_data) { + free(mpack_data); + } +} + +int main(int argc, char **argv) +{ + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}
\ No newline at end of file diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index 67ba815..b2d8cc9 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -604,4 +604,24 @@ char * marsio_buff_mtod(marsio_buff_t * m) { return NULL; } + +int marsio_buff_malloc_global(struct mr_instance * instance, marsio_buff_t * marsio_buff[], unsigned int nr_mbufs, int socket_id, int thread_id) +{ + return 0; +} + +char * marsio_buff_append(marsio_buff_t * m, uint16_t len) +{ + return NULL; +} + +int marsio_buff_set_metadata(marsio_buff_t * m, enum mr_buff_metadata_type type, void * data, unsigned int sz_data) +{ + return 0; +} + +int marsio_buff_set_sid_list(marsio_buff_t * m, sid_t * slist, uint8_t sz_slist) +{ + return 0; +} /***************************************************/
\ No newline at end of file |
