diff options
| author | root <[email protected]> | 2024-01-26 06:36:19 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2024-01-26 06:36:19 +0000 |
| commit | f9cd8219dc43b5d19da8f421c19c08d65240683d (patch) | |
| tree | e8da8950e9a90c2c5da4103ae0fd848173e9a427 /shaping | |
| parent | 4bc81cc24f2989b84670c54252585c5403acbc01 (diff) | |
optimize performance
Diffstat (limited to 'shaping')
| -rw-r--r-- | shaping/include/shaper.h | 15 | ||||
| -rw-r--r-- | shaping/include/shaper_global_stat.h | 3 | ||||
| -rw-r--r-- | shaping/include/shaper_stat.h | 3 | ||||
| -rw-r--r-- | shaping/include/shaper_swarmkv.h | 3 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 69 | ||||
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 9 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 34 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 44 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 30 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 10 |
10 files changed, 98 insertions, 122 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 20fc9b1..b17dd40 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -59,8 +59,6 @@ struct shaping_thread_ctx { struct shaping_stat *stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv - int swarmkv_aqm_prob; - time_t swarmkv_aqm_update_time; struct shaping_maat_info *maat_info; struct session_table *session_table; struct timeouts *expires; @@ -112,8 +110,8 @@ struct shaper_aqm_blue_para { struct shaping_profile_hash_node { int id; enum shaper_aqm_type aqm_type; - int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; - int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + long long in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; + long long out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX]; long long last_failed_get_token_ms; long long last_hmget_ms; long long queue_len[SHAPING_PRIORITY_NUM_MAX]; @@ -124,6 +122,7 @@ struct shaping_profile_hash_node { int tconsume_ref_cnt; struct shaper_aqm_blue_para aqm_blue_para; unsigned char is_invalid; + struct timeout timeout_handle; UT_hash_handle hh; }; @@ -131,8 +130,8 @@ struct shaping_profile_info { int id;//profile_id enum shaping_profile_type type; int priority; - int in_deposit_token_bits; - int out_deposit_token_bits; + long long in_deposit_token_bits; + long long out_deposit_token_bits; long long last_failed_get_token_ms; unsigned long long enqueue_time_us;//to calculate max latency struct shaping_stat_for_profile stat; @@ -190,8 +189,6 @@ struct shaping_flow { unsigned long long processed_pkts; unsigned long long stat_update_time_us; time_t check_rule_time; - struct timeout timeout_handle; - time_t last_update_timeout_sec; }; struct shaper_flow_instance { @@ -234,7 +231,7 @@ struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf); void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx); int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num); -void shaper_profile_hash_node_update(struct shaping_profile_info *profile); +void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile); int shaper_global_conf_init(struct shaping_system_conf *conf); diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h index 940745e..4c100a7 100644 --- a/shaping/include/shaper_global_stat.h +++ b/shaping/include/shaper_global_stat.h @@ -38,6 +38,7 @@ enum shaping_global_stat_column_index { HIT_POLICY_RX_BYTES_IDX, HIT_POLICY_TX_PKTS_IDX, HIT_POLICY_TX_BYTES_IDX, + HIT_POLICY_TX_SYN_ACK_PKTS_IDX, HIT_POLICY_DROP_PKTS_IDX, HIT_POLICY_DROP_BYTES_IDX, @@ -49,6 +50,7 @@ struct shaping_global_stat_traffic_data { long long rx_bytes; long long tx_pkts; long long tx_bytes; + long long tx_syn_ack_pkts; long long drop_pkts; long long drop_bytes; }; @@ -127,6 +129,7 @@ void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *threa void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat); void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); void shaper_global_stat_refresh(struct shaping_ctx *ctx);
\ No newline at end of file diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h index d9ca979..132049c 100644 --- a/shaping/include/shaper_stat.h +++ b/shaping/include/shaper_stat.h @@ -57,4 +57,5 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_ void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id); void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id); -void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force);
\ No newline at end of file +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force); +void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node);
\ No newline at end of file diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h index 963fff0..e533802 100644 --- a/shaping/include/shaper_swarmkv.h +++ b/shaping/include/shaper_swarmkv.h @@ -2,5 +2,4 @@ struct swarmkv* shaper_swarmkv_init(int caller_thread_num); void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db); -void swarmkv_reload_log_level(); -int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx);
\ No newline at end of file +void swarmkv_reload_log_level();
\ No newline at end of file diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index e38af34..f1b963a 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -24,7 +24,7 @@ extern "C" { #include "shaper_global_stat.h" #include "shaper_aqm.h" -#define TOKEN_ENLARGE_TIMES 10 +#define TOKEN_ENLARGE_TIMES 10//TODO #define TOKEN_GET_FAILED_INTERVAL_MS 1 #define HMGET_REQUEST_INTERVAL_MS 10 #define PRIORITY_BLOCK_MIN_TIME_MS 500 @@ -138,8 +138,6 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; - timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); - timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); s_node->shaping_flow.ref_cnt = 1; @@ -159,8 +157,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; - timeouts_del(ctx->expires, &sf->timeout_handle); - shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, 1); shaping_node_free(s_node); return; @@ -182,7 +179,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_ struct shaping_packet_wrapper *s_pkt = NULL; struct timespec curr_time; - if (sf->queue_len == ctx->conf.session_queue_len_max) { + if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len??? return -1; } @@ -386,7 +383,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - int *deposit_token; + long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; switch (profile->type) { @@ -431,7 +428,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); - LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); + LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); @@ -482,7 +479,7 @@ END: static void shaper_deposit_token_sub(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority) { - int *deposit_token; + long long *deposit_token; struct shaping_profile_hash_node *pf_hash_node = profile->hash_node; switch (profile->type) { @@ -551,23 +548,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct struct shaping_tconsume_cb_arg *arg = NULL; char key[32] = {0}; - switch (pf_info->type) { - case PROFILE_TYPE_GENERIC: - if (pf_info->hash_node->tconsume_ref_cnt > 0) { - goto END; - } - break; - case PROFILE_TYPE_HOST_FARINESS: - case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - if (sf->ref_cnt > 1) { - goto END; - } - break; - default: - break; + if (pf_info->hash_node->tconsume_ref_cnt > 0) { + goto END; } - + snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg)); arg->ctx = ctx; @@ -580,10 +564,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; + pf_info->hash_node->tconsume_ref_cnt++; switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - pf_info->hash_node->tconsume_ref_cnt++; swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: @@ -711,7 +695,7 @@ END: } } -void shaper_profile_hash_node_update(struct shaping_profile_info *profile) +void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile) { if (profile->hash_node == NULL) { struct shaping_profile_hash_node *hash_node = NULL; @@ -722,6 +706,8 @@ void shaper_profile_hash_node_update(struct shaping_profile_info *profile) profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node)); profile->hash_node->id = profile->id; HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node); + timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS); + timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); } } @@ -787,11 +773,6 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_FAILED; } - if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) { - profile->hash_node->last_failed_get_token_ms = curr_time_ms; - return SHAPER_TOKEN_GET_FAILED; - } - if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) { return SHAPER_TOKEN_GET_FAILED; } else { @@ -1018,7 +999,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { @@ -1068,6 +1049,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(&ctx->thread_global_stat); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } @@ -1109,12 +1091,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); - time_t curr_time = time(NULL); - if (curr_time > sf->last_update_timeout_sec) { - timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_add will delete if sf exist, then add - sf->last_update_timeout_sec = curr_time; - } + shaper_stat_refresh(ctx, sf, 0); if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { @@ -1137,11 +1114,11 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ { swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); - struct shaping_flow *sf = NULL; + struct shaping_profile_hash_node *hash_node = NULL; time_t curr_time = time(NULL); int cnt = 0; - /*if (curr_time > ctx->last_update_timeout_sec) { + if (curr_time > ctx->last_update_timeout_sec) { timeouts_update(ctx->expires, curr_time); ctx->last_update_timeout_sec = curr_time; } @@ -1153,11 +1130,11 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ break; } - sf = container_of(t, struct shaping_flow, timeout_handle); - shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); - timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete sf from queue, add it back + hash_node = container_of(t, struct shaping_profile_hash_node, timeout_handle); + shaper_stat_priority_queue_len_refresh_all(ctx, hash_node); + timeouts_add(ctx->expires, &hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete item from queue, add it back cnt++; - }*/ + } if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) { return; @@ -1488,4 +1465,4 @@ struct shaping_ctx *shaping_engine_init() ERROR: shaping_engine_destroy(ctx); return NULL; -}
\ No newline at end of file +} diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index 7144658..d59b290 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -63,6 +63,7 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0); stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0); stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0); + stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0); stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0); stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0); @@ -344,6 +345,12 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_ data->tx_bytes += pkt_len; } +void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat) +{ + struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; + data->tx_syn_ack_pkts++; +} + void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; @@ -402,6 +409,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx) sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes; sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts; sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes; + sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts; sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts; sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes; } @@ -446,6 +454,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx) fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 4e07439..ec7b59a 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -253,16 +253,16 @@ void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp) return; } -void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex) +void shaper_profile_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex) { s_pf_info->id = s_pf_ex->id; s_pf_info->type = s_pf_ex->type; - shaper_profile_hash_node_update(s_pf_info); + shaper_profile_hash_node_update(ctx, s_pf_info); return; } -static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed) +static int shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed) { struct shaping_rule *s_rule = NULL; struct shaping_profile *s_pf = NULL; @@ -271,7 +271,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_rule = (struct shaping_rule*)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->rule_table_id, (char *)&rule_compile_id, sizeof(rule_compile_id)); if (!s_rule) { LOG_ERROR("%s maat_plugin_table_get_ex_data get rule failed for compile id %lld", LOG_TAG_MAAT, rule_compile_id); - goto END; + return -1; } s_rule_info->id = s_rule->id; s_rule_info->fair_factor = s_rule->fair_factor; @@ -283,9 +283,9 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key)); if (!s_pf) { LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key); - goto END; + return -1; } - shaper_profile_update(&s_rule_info->primary, s_pf); + shaper_profile_update(ctx, &s_rule_info->primary, s_pf); if (sf->processed_pkts <= CONFIRM_PRIORITY_PKTS) { if (sf->priority > s_rule->priority) { @@ -295,7 +295,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl } if (s_rule->borrow_pf_num == 0) { - goto END; + return 0; } for (int i = 0; i < s_rule->borrow_pf_num; i++) { @@ -304,15 +304,14 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key)); if (!s_pf) { LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key); - goto END; + return -1; } - shaper_profile_update(&s_rule_info->borrowing[i], s_pf); + shaper_profile_update(ctx, &s_rule_info->borrowing[i], s_pf); s_rule_info->borrowing_num++; } -END: - return; + return 0; } static void shaper_profiles_priority_update(struct shaping_flow *sf) @@ -361,10 +360,10 @@ static int shaper_rules_dup_remove(struct shaping_flow *sf, long long *rule_comp void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, long long *rule_compile_ids, int rule_num) { - int i, j; int priority_changed = 0; long long rule_ids_remove_dup[SHAPING_RULE_NUM_MAX] = {0}; int rule_num_remove_dup = 0; + int old_rule_num = sf->rule_num; if (rule_num > SHAPING_RULE_NUM_MAX) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -389,15 +388,16 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf return; } - for (i = sf->rule_num, j = 0; i < sf->rule_num + rule_num_remove_dup; i++, j++) { - shaper_rule_update(ctx, sf, &sf->matched_rule_infos[i], rule_ids_remove_dup[j], &priority_changed); + for (int i = 0; rule_num_remove_dup; i++) { + if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) { + sf->rule_num++; + } } - if (sf->rule_num > 0 && priority_changed) { - shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); + if (old_rule_num > 0 && priority_changed) { + shaper_stat_refresh(ctx, sf, 1); } - sf->rule_num += rule_num_remove_dup; shaper_profiles_priority_update(sf); return; diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 75db668..1e36a35 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -165,9 +165,13 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo return; } -static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile, int priority, long long curr_time_us) +static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us) { - if (curr_time_us - profile->hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) { + if (profile_hash_node->queue_len[priority] == 0) { + return; + } + + if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) { return; } @@ -175,24 +179,40 @@ static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ct arg->ctx = ctx; arg->start_time_us = curr_time_us; - arg->profile_id = profile->id; + arg->profile_id = profile_hash_node->id; arg->priority = priority; - arg->queue_len = profile->hash_node->queue_len[priority]; + arg->queue_len = profile_hash_node->queue_len[priority]; shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len); - profile->hash_node->local_queue_len_update_time_us[priority] = curr_time_us; - profile->hash_node->queue_len[priority] = 0; + profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us; + profile_hash_node->queue_len[priority] = 0; + + return; +} + +void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node) +{ + struct timespec curr_time; + long long curr_time_us; + + clock_gettime(CLOCK_MONOTONIC, &curr_time); + curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; + + for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) { + shaper_stat_priority_queue_len_refresh(ctx, profile_hash_node, i, curr_time_us); + } return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, int thread_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us) +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us) { struct shaping_stat_for_profile *profile_stat = &profile->stat; struct shaping_stat *stat = ctx->stat; int priority = profile->priority; + int thread_id = ctx->thread_index; unsigned long long old_latency; shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type); @@ -221,7 +241,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s } profile->hash_node->queue_len[priority] += profile_stat->in.queue_len + profile_stat->out.queue_len; - shaper_stat_priority_queue_len_refresh(ctx, profile, priority, curr_time_us); + shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us); memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { @@ -239,7 +259,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s return; } -void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force) +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force) { struct shaping_rule_info *rule; struct timespec curr_time; @@ -258,7 +278,7 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf } } - if (!need_refresh) { + if (!need_refresh) {//TODO: add queue_len to profile??? return; } @@ -266,10 +286,10 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(ctx, rule, thread_id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(ctx, rule, thread_id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us); + shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us); } } diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 6d5ddc0..6ac1db5 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -106,36 +106,6 @@ void swarmkv_reload_log_level() return; } -int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx) -{ - long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db); - time_t now = time(NULL); - - if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) { - goto END; - } - - if (pending_queue_len > PENDING_QUEUE_LEN_MAX) { - if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) { - ctx->swarmkv_aqm_prob += INCREMENT; - } - LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); - } else { - if (ctx->swarmkv_aqm_prob >= DECREMENT) { - ctx->swarmkv_aqm_prob -= DECREMENT; - } - LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob); - } - ctx->swarmkv_aqm_update_time = now; - -END: - if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) { - return 1; - } - - return 0; -} - struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index f262412..8d792f3 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -419,7 +419,7 @@ TEST(single_session, tcp_tx_in_order) /***********send stat data here********************/ - shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy sleep(2);//wait telegraf generate metric @@ -1199,7 +1199,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); } - shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct + shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets @@ -1373,7 +1373,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10)); ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue)); - shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中 stub_curr_time_s_inc(1);//inc time to refresh hmget interval for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断 stub_refresh_token_bucket(0); @@ -1388,7 +1388,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order) ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1 } - shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中 + shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中 stub_curr_time_s_inc(1);//inc time to refresh hmget interval while (!TAILQ_EMPTY(&expec_tx_queue2)) { stub_refresh_token_bucket(0); @@ -1837,7 +1837,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ - shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); + shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy shaper_global_stat_refresh(ctx); sleep(2);//wait telegraf generate metric |
