diff options
| author | 刘畅 <[email protected]> | 2023-05-31 03:06:52 +0000 |
|---|---|---|
| committer | 刘畅 <[email protected]> | 2023-05-31 03:06:52 +0000 |
| commit | 7361fcee06190b68bcfb1d5a93c85df8062bb30d (patch) | |
| tree | c0c9f864d8e940e4df076e5674808630241667f4 | |
| parent | 3e7e3ba55af41456a879b9cf644ac809e9d6c766 (diff) | |
| parent | bec95db1ec0ba5af97a96c4b0c0177bd73c81c69 (diff) | |
Merge branch 'rel' into 'add_global_metrics_table'
# Conflicts:
# shaping/src/shaper.cpp
| -rw-r--r-- | common/include/addr_tuple4.h | 1 | ||||
| -rw-r--r-- | common/src/addr_tuple4.cpp | 21 | ||||
| -rw-r--r-- | common/test/gtest_addr_tuple4.cpp | 24 | ||||
| -rw-r--r-- | shaping/include/shaper.h | 15 | ||||
| -rw-r--r-- | shaping/include/shaper_maat.h | 3 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 53 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 50 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 13 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 6 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper_maat.cpp | 4 | ||||
| -rw-r--r-- | shaping/test/stub.cpp | 32 |
11 files changed, 170 insertions, 52 deletions
diff --git a/common/include/addr_tuple4.h b/common/include/addr_tuple4.h index 5efcce1..22caa1d 100644 --- a/common/include/addr_tuple4.h +++ b/common/include/addr_tuple4.h @@ -58,6 +58,7 @@ extern "C" char *addr_tuple4_to_str(const struct addr_tuple4 *addr); void addr_tuple4_reverse(const struct addr_tuple4 *orin, struct addr_tuple4 *out); + char *addr_src_ip_to_str(const struct addr_tuple4 *addr); #ifdef __cpluscplus } diff --git a/common/src/addr_tuple4.cpp b/common/src/addr_tuple4.cpp index fd60f0d..7e294f8 100644 --- a/common/src/addr_tuple4.cpp +++ b/common/src/addr_tuple4.cpp @@ -33,6 +33,27 @@ char *addr_tuple4_to_str(const struct addr_tuple4 *addr) return str_ret; } +char *addr_src_ip_to_str(const struct addr_tuple4 *addr)//TODO: self test +{ + char *str_ret = NULL; + + if (addr->addr_type == ADDR_TUPLE4_TYPE_V4) + { + char src_addr[INET_ADDRSTRLEN] = {0}; + inet_ntop(AF_INET, &addr->addr_v4.src_addr, src_addr, sizeof(src_addr)); + asprintf(&str_ret, "%s", src_addr); + } + + if (addr->addr_type == ADDR_TUPLE4_TYPE_V6) + { + char src_addr[INET6_ADDRSTRLEN] = {0}; + inet_ntop(AF_INET6, &addr->addr_v6.src_addr, src_addr, sizeof(src_addr)); + asprintf(&str_ret, "%s", src_addr); + } + + return str_ret; +} + void addr_tuple4_reverse(const struct addr_tuple4 *orin, struct addr_tuple4 *out) { memset(out, 0, sizeof(struct addr_tuple4)); diff --git a/common/test/gtest_addr_tuple4.cpp b/common/test/gtest_addr_tuple4.cpp index ad9b246..e81996c 100644 --- a/common/test/gtest_addr_tuple4.cpp +++ b/common/test/gtest_addr_tuple4.cpp @@ -41,6 +41,30 @@ TEST(ADDR_TUPLE4, IPV6) free(ret_str); } +TEST(ADDR_TUPLE4, SRC_IPV4_TO_STRING) +{ + char *ret_str = NULL; + + INIT_ADDR_V4(orin_addr, "1.2.3.4", 12345, "4.3.2.1", 23456) + + ret_str = addr_src_ip_to_str(&orin_addr); + EXPECT_TRUE(ret_str != nullptr); + EXPECT_STREQ(ret_str, "1.2.3.4"); + free(ret_str); +} + +TEST(ADDR_TUPLE4, SRC_IPV6_TO_STRING) +{ + char *ret_str = NULL; + + INIT_ADDR_V6(orin_addr, "1:2::3", 12345, "a:b::c", 23456); + + ret_str = addr_src_ip_to_str(&orin_addr); + EXPECT_TRUE(ret_str != nullptr); + EXPECT_STREQ(ret_str, "1:2::3"); + free(ret_str); +} + int main(int argc, char **argv) { ::testing::InitGoogleTest(&argc, argv); diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index eb0ca65..cc7c608 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -67,13 +67,21 @@ enum shaping_packet_action { SHAPING_DROP }; +enum shaping_profile_type_in_rule { + PROFILE_IN_RULE_TYPE_PRIMARY = 0, + PROFILE_IN_RULE_TYPE_BORROW +}; + enum shaping_profile_type { - SHAPING_PROFILE_TYPE_PRIMARY = 0, - SHAPING_PROFILE_TYPE_BORROW + PROFILE_TYPE_GENERIC, + PROFILE_TYPE_HOST_FARINESS, + PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS, + PROFILE_TYPE_SPLIT_BY_LOCAL_HOST }; struct shaping_profile_info { int id;//profile_id + enum shaping_profile_type type; int priority; int in_deposit_token; int out_deposit_token; @@ -88,6 +96,7 @@ struct shaping_profile_info { struct shaping_rule_info { int vsys_id; int id;//rule_id + int fair_factor; struct shaping_profile_info primary; struct shaping_profile_info borrowing[SHAPING_REF_PROFILE_NUM_MAX]; int borrowing_num; @@ -119,6 +128,8 @@ struct metadata struct shaping_flow { struct addr_tuple4 tuple4; + char *src_ip_str; + size_t src_ip_str_len; struct delay_queue packet_queue; struct shaping_rule_info matched_rule_infos[SHAPING_RULE_NUM_MAX]; int priority; diff --git a/shaping/include/shaper_maat.h b/shaping/include/shaper_maat.h index 9390161..df3558f 100644 --- a/shaping/include/shaper_maat.h +++ b/shaping/include/shaper_maat.h @@ -18,8 +18,7 @@ struct shaping_rule { struct shaping_profile { int id; - int split; - int host_fairness; + enum shaping_profile_type type; int in_limit_bandwidth; int out_limit_bandwidth; int valid; diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 10ca792..69c691a 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -115,6 +115,10 @@ static void shaping_node_free(struct shaping_node *s_node) } } + if (s_node->shaping_flow.src_ip_str) { + free(s_node->shaping_flow.src_ip_str); + } + if (s_node->shaping_flow.ctrl_meta.raw_data) { free(s_node->shaping_flow.ctrl_meta.raw_data); } @@ -452,7 +456,7 @@ static int shaper_deposit_token_is_enough(struct shaping_profile_info *pf_info, static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *pf_info, int profile_type, int req_token_bits, unsigned char direction) { - struct shaping_async_cb_arg *arg; + struct shaping_async_cb_arg *arg = NULL; char key[32] = {0}; __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); @@ -466,14 +470,31 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->direction = direction; shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); + switch (pf_info->type) { + case PROFILE_TYPE_GENERIC: + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); + break; + case PROFILE_TYPE_HOST_FARINESS: + case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); + break; + case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits, shaper_token_get_cb, arg); + break; + default: + if (arg) { + free(arg); + } + break; + } + if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed shaper_deposit_token_sub(pf_info, req_token_bits, direction); return 0; } if (pf_info->is_invalid) { - if (profile_type == SHAPING_PROFILE_TYPE_PRIMARY) {//for primary, means this rule don't need get token + if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return 0; } else {//for borrowing, means this profile has no token to borrow return -1; @@ -573,14 +594,14 @@ int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, stru if (priority == SHAPING_PRIORITY_NUM_MAX - 1) {//priority 9 allow multi profiles for one priority if (s_rule_info->primary.priority == priority) { - pf_container[num].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[num].pf_type = PROFILE_IN_RULE_TYPE_PRIMARY; pf_container[num].pf_info = &s_rule_info->primary; num++; } for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (s_rule_info->borrowing[i].priority == priority) { - pf_container[num].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[num].pf_type = PROFILE_IN_RULE_TYPE_BORROW; pf_container[num].pf_info = &s_rule_info->borrowing[i]; num++; } @@ -589,14 +610,14 @@ int shaper_profile_get(struct shaping_rule_info *s_rule_info, int priority, stru return num; } else { if (s_rule_info->primary.priority == priority) { - pf_container[0].pf_type = SHAPING_PROFILE_TYPE_PRIMARY; + pf_container[0].pf_type = PROFILE_IN_RULE_TYPE_PRIMARY; pf_container[0].pf_info = &s_rule_info->primary; return 1; } for (int i = 0; i < s_rule_info->borrowing_num; i++) { if (s_rule_info->borrowing[i].priority == priority) { - pf_container[0].pf_type = SHAPING_PROFILE_TYPE_BORROW; + pf_container[0].pf_type = PROFILE_IN_RULE_TYPE_BORROW; pf_container[0].pf_info = &s_rule_info->borrowing[i]; return 1; } @@ -676,7 +697,7 @@ static enum shaping_packet_action shaper_pkt_action_decide_queueing(struct shapi static enum shaping_packet_action shaper_pkt_action_decide_no_queue(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_profile_info *profile) { - int profile_type = SHAPING_PROFILE_TYPE_PRIMARY; + int profile_type = PROFILE_IN_RULE_TYPE_PRIMARY; struct shaping_rule_info *rule = NULL; struct shaping_packet_wrapper *pkt_wrapper = NULL; struct timespec curr_time; @@ -835,7 +856,13 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); - LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); + + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_ERROR("%s: shaping enqueue packet failed while queue empty for session: %s", LOG_TAG_SHAPING, addr_str); + if (addr_str) { + free(addr_str); + } + goto END; } @@ -868,8 +895,14 @@ END: shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); + shaping_flow_free(ctx, sf); - LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_tuple4_to_str(&sf->tuple4)); + + if (addr_str) { + free(addr_str); + } } } diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 9993142..5578338 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -8,6 +8,7 @@ #include <MESA/MESA_handle_logger.h> #include "log.h" +#include "shaper.h" #include "shaper_maat.h" #include "shaper_stat.h" #include "utils.h" @@ -154,7 +155,8 @@ void shaper_profile_ex_new(const char *table_name, int table_id, const char *key cJSON *json=NULL; cJSON *tmp_array_obj = NULL; cJSON *tmp_obj = NULL; - char split_by[64] = {0}; + char profile_type[64] = {0}; + char type_arg[64] = {0}; char limits[128] = {0}; char aqm_options[64] = {0}; char dscp_marking[64] = {0}; @@ -170,17 +172,41 @@ void shaper_profile_ex_new(const char *table_name, int table_id, const char *key s_pf = (struct shaping_profile*)calloc(1, sizeof(struct shaping_profile)); s_pf->ref_cnt = 1; - ret = sscanf(table_line, "%d\t%63s\t%127s\t%63s\t%63s\t%63s\t%d", - &s_pf->id, split_by, limits, aqm_options, dscp_marking, volume_based_shaping, &s_pf->valid); - if (ret != 7) { + ret = sscanf(table_line, "%d\t%63s\t%63s\t%127s\t%63s\t%63s\t%63s\t%d", + &s_pf->id, profile_type, type_arg, limits, aqm_options, dscp_marking, volume_based_shaping, &s_pf->valid); + if (ret != 8) { LOG_ERROR("%s: sscanf parse failed for profile line %s", LOG_TAG_MAAT, table_line); goto END; } + + if (strcmp(profile_type, "generic") == 0) { + s_pf->type = PROFILE_TYPE_GENERIC; + } else if (strcmp(profile_type, "fair_share") == 0) { + if (strcmp(type_arg, "host_fairness") == 0) { + s_pf->type = PROFILE_TYPE_HOST_FARINESS; + } else if (strcmp(type_arg, "max_min_host_fairness") == 0) { + s_pf->type = PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS; + } else { + LOG_ERROR("%s: profile type argument wrong for profile line %s", LOG_TAG_MAAT, table_line); + goto END; + } + } else if (strcmp(profile_type, "split_by") == 0) { + if (strcmp(type_arg, "local_host") == 0) { + s_pf->type = PROFILE_TYPE_SPLIT_BY_LOCAL_HOST; + } else { + LOG_ERROR("%s: profile type argument wrong for profile line %s", LOG_TAG_MAAT, table_line); + goto END; + } + } else { + LOG_ERROR("%s: profile type wrong for profile line %s", LOG_TAG_MAAT, table_line); + goto END; + } + //parse limits of profile json = cJSON_Parse(limits); if (!json) { - LOG_ERROR("%s: json parse profile failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line); + LOG_ERROR("%s: json parse profile limits failed for profile id %d, line %s", LOG_TAG_MAAT, s_pf->id, table_line); goto END; } @@ -206,12 +232,6 @@ void shaper_profile_ex_new(const char *table_name, int table_id, const char *key } } - cJSON_Delete(json); - json = NULL; - - - //TODO:parse other argumens of profile - END: *ad = s_pf; if (json) { @@ -254,6 +274,7 @@ void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp) void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex) { s_pf_info->id = s_pf_ex->id; + s_pf_info->type = s_pf_ex->type; shaper_profile_ex_free(0, (void **)&s_pf_ex, 0, NULL); @@ -272,6 +293,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl goto END; } s_rule_info->id = s_rule->id; + s_rule_info->fair_factor = s_rule->fair_factor; s_rule_info->vsys_id = s_rule->vsys_id; snprintf(pf_id_key, sizeof(pf_id_key), "%d", s_rule->primary_pf_id); @@ -331,7 +353,11 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf int priority_changed = 0; if (sf->rule_num + rule_num > SHAPING_RULE_NUM_MAX) { - LOG_ERROR("%s: shaping exceed maat rule num limit %d for flow: %s", LOG_TAG_MAAT, SHAPING_RULE_NUM_MAX, addr_tuple4_to_str(&sf->tuple4)); + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_ERROR("%s: shaping exceed maat rule num limit %d for flow: %s", LOG_TAG_MAAT, SHAPING_RULE_NUM_MAX, addr_str); + if (addr_str) { + free(addr_str); + } return; } diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index dc4bc08..27a8c51 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -1,3 +1,4 @@ +#include "addr_tuple4.h" #include "session_table.h" #include "raw_packet.h" #include "utils.h" @@ -18,12 +19,22 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru node = session_table_search_by_id(ctx->session_table, meta->session_id); if (node) { sf = (struct shaping_flow *)node->val_data; - LOG_ERROR("%s: session id %lu for %s has already exist", LOG_TAG_SHAPING, meta->session_id, addr_tuple4_to_str(&sf->tuple4)); + + char *addr_str = addr_tuple4_to_str(&sf->tuple4); + LOG_ERROR("%s: session id %lu for %s has already exist", LOG_TAG_SHAPING, meta->session_id, addr_str); + + if (addr_str) { + free(addr_str); + } + return NULL; } sf = shaping_flow_new(); raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4); + sf->src_ip_str = addr_src_ip_to_str(&sf->tuple4); + sf->src_ip_str_len = strlen(sf->src_ip_str); + //shaper_rules_update(ctx, sf, ctrl_data->shaping_rule_ids, ctrl_data->shaping_rule_num); shaper_marsio_metadata_deep_copy(&sf->ctrl_meta, meta); diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 74da337..0685e01 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -137,7 +137,7 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int tags[TAG_PRIORITY_IDX].value_int = priority; - if (profile_type == SHAPING_PROFILE_TYPE_PRIMARY) { + if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) { tags[TAG_PROFILE_TYPE_IDX].value_str = "primary"; } else { tags[TAG_PROFILE_TYPE_IDX].value_str = "borrow"; @@ -214,10 +214,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, SHAPING_PROFILE_TYPE_PRIMARY, need_update_guage); + shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], SHAPING_PROFILE_TYPE_BORROW, need_update_guage); + shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); } } diff --git a/shaping/test/gtest_shaper_maat.cpp b/shaping/test/gtest_shaper_maat.cpp index 79c6a94..80b91de 100644 --- a/shaping/test/gtest_shaper_maat.cpp +++ b/shaping/test/gtest_shaper_maat.cpp @@ -41,7 +41,8 @@ TEST(shaping_rule, parse) TEST(shaping_profile, parse) { const char *data = "1\t\ - {\"value\":\"local_host\",\"host_fairness\":1}\t\ + fair_share\t\ + max_min_host_fairness\t\ [{\"direction\":\"incoming\",\"bandwidth\":1024},{\"direction\":\"outgoing\",\"bandwidth\":2048}]\t\ {\"enabled\":1,\"algorithm\":\"codel\"}\t\ null\t\ @@ -54,6 +55,7 @@ TEST(shaping_profile, parse) EXPECT_EQ(s_pf->id, 1); EXPECT_EQ(s_pf->in_limit_bandwidth, 1024); EXPECT_EQ(s_pf->out_limit_bandwidth, 2048); + EXPECT_EQ(s_pf->type, PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS); EXPECT_EQ(s_pf->ref_cnt, 1); shaper_profile_ex_dup(0, (void**)&s_pf_dup, (void**)&s_pf, 0, NULL); diff --git a/shaping/test/stub.cpp b/shaping/test/stub.cpp index c04ee13..fc2a53a 100644 --- a/shaping/test/stub.cpp +++ b/shaping/test/stub.cpp @@ -14,6 +14,7 @@ #include <assert.h> #include <pthread.h> #include "stub.h" +#include "shaper_maat.h" using namespace std; @@ -22,27 +23,6 @@ using namespace std; #define MAX_STUB_RULE_NUM 8 #define MAX_STUB_PROFILE_NUM 8 -struct shaping_rule { - int vsys_id; - int id; - int priority; - int primary_pf_id; - int borrow_pf_id_array[MAX_STUB_PROFILE_NUM]; - int borrow_pf_num; - int fair_factor; - int ref_cnt; -}; - -struct shaping_profile { - int id; - int split; - int host_fairness; - int in_limit_bandwidth; - int out_limit_bandwidth; - int valid; - int ref_cnt; -}; - struct stub_matched_rules { struct shaping_rule rules[MAX_STUB_RULE_NUM]; int rule_num; @@ -421,6 +401,16 @@ void swarmkv_tconsume(struct swarmkv * db, const char * key, size_t keylen, long return; } +void swarmkv_ftconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long weight, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + return; +} + +void swarmkv_btconsume(struct swarmkv * db, const char * key, size_t keylen, const char * member, size_t member_len, long long tokens, swarmkv_on_reply_callback_t *cb, void *cb_arg) +{ + return; +} + void swarmkv_close(struct swarmkv * db) { if (db) { |
