summaryrefslogtreecommitdiff
path: root/shaping
diff options
context:
space:
mode:
authorroot <[email protected]>2024-01-26 06:36:19 +0000
committerroot <[email protected]>2024-01-26 06:36:19 +0000
commitf9cd8219dc43b5d19da8f421c19c08d65240683d (patch)
treee8da8950e9a90c2c5da4103ae0fd848173e9a427 /shaping
parent4bc81cc24f2989b84670c54252585c5403acbc01 (diff)
optimize performance
Diffstat (limited to 'shaping')
-rw-r--r--shaping/include/shaper.h15
-rw-r--r--shaping/include/shaper_global_stat.h3
-rw-r--r--shaping/include/shaper_stat.h3
-rw-r--r--shaping/include/shaper_swarmkv.h3
-rw-r--r--shaping/src/shaper.cpp69
-rw-r--r--shaping/src/shaper_global_stat.cpp9
-rw-r--r--shaping/src/shaper_maat.cpp34
-rw-r--r--shaping/src/shaper_stat.cpp44
-rw-r--r--shaping/src/shaper_swarmkv.cpp30
-rw-r--r--shaping/test/gtest_shaper.cpp10
10 files changed, 98 insertions, 122 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 20fc9b1..b17dd40 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -59,8 +59,6 @@ struct shaping_thread_ctx {
struct shaping_stat *stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
- int swarmkv_aqm_prob;
- time_t swarmkv_aqm_update_time;
struct shaping_maat_info *maat_info;
struct session_table *session_table;
struct timeouts *expires;
@@ -112,8 +110,8 @@ struct shaper_aqm_blue_para {
struct shaping_profile_hash_node {
int id;
enum shaper_aqm_type aqm_type;
- int in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
- int out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ long long in_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
+ long long out_deposit_token_bits[SHAPING_PRIORITY_NUM_MAX];
long long last_failed_get_token_ms;
long long last_hmget_ms;
long long queue_len[SHAPING_PRIORITY_NUM_MAX];
@@ -124,6 +122,7 @@ struct shaping_profile_hash_node {
int tconsume_ref_cnt;
struct shaper_aqm_blue_para aqm_blue_para;
unsigned char is_invalid;
+ struct timeout timeout_handle;
UT_hash_handle hh;
};
@@ -131,8 +130,8 @@ struct shaping_profile_info {
int id;//profile_id
enum shaping_profile_type type;
int priority;
- int in_deposit_token_bits;
- int out_deposit_token_bits;
+ long long in_deposit_token_bits;
+ long long out_deposit_token_bits;
long long last_failed_get_token_ms;
unsigned long long enqueue_time_us;//to calculate max latency
struct shaping_stat_for_profile stat;
@@ -190,8 +189,6 @@ struct shaping_flow {
unsigned long long processed_pkts;
unsigned long long stat_update_time_us;
time_t check_rule_time;
- struct timeout timeout_handle;
- time_t last_update_timeout_sec;
};
struct shaper_flow_instance {
@@ -234,7 +231,7 @@ struct shaping_packet_wrapper* shaper_first_pkt_get(struct shaping_flow *sf);
void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx);
int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_ins[], int priority, int max_sf_num);
-void shaper_profile_hash_node_update(struct shaping_profile_info *profile);
+void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile);
int shaper_global_conf_init(struct shaping_system_conf *conf);
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index 940745e..4c100a7 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -38,6 +38,7 @@ enum shaping_global_stat_column_index {
HIT_POLICY_RX_BYTES_IDX,
HIT_POLICY_TX_PKTS_IDX,
HIT_POLICY_TX_BYTES_IDX,
+ HIT_POLICY_TX_SYN_ACK_PKTS_IDX,
HIT_POLICY_DROP_PKTS_IDX,
HIT_POLICY_DROP_BYTES_IDX,
@@ -49,6 +50,7 @@ struct shaping_global_stat_traffic_data {
long long rx_bytes;
long long tx_pkts;
long long tx_bytes;
+ long long tx_syn_ack_pkts;
long long drop_pkts;
long long drop_bytes;
};
@@ -127,6 +129,7 @@ void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *threa
void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat);
void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
void shaper_global_stat_refresh(struct shaping_ctx *ctx); \ No newline at end of file
diff --git a/shaping/include/shaper_stat.h b/shaping/include/shaper_stat.h
index d9ca979..132049c 100644
--- a/shaping/include/shaper_stat.h
+++ b/shaping/include/shaper_stat.h
@@ -57,4 +57,5 @@ void shaper_stat_forward_all_rule_inc(struct shaping_stat *stat, struct shaping_
void shaper_stat_drop_inc(struct shaping_stat_for_profile *profile_stat, unsigned char direction, int thread_id);
void shaper_stat_max_latency_update(struct shaping_stat_for_profile *profile_stat, unsigned char direction, unsigned long long latency, int thread_id);
-void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force); \ No newline at end of file
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force);
+void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node); \ No newline at end of file
diff --git a/shaping/include/shaper_swarmkv.h b/shaping/include/shaper_swarmkv.h
index 963fff0..e533802 100644
--- a/shaping/include/shaper_swarmkv.h
+++ b/shaping/include/shaper_swarmkv.h
@@ -2,5 +2,4 @@
struct swarmkv* shaper_swarmkv_init(int caller_thread_num);
void shaper_swarmkv_destroy(struct swarmkv* swarmkv_db);
-void swarmkv_reload_log_level();
-int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx); \ No newline at end of file
+void swarmkv_reload_log_level(); \ No newline at end of file
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index e38af34..f1b963a 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -24,7 +24,7 @@ extern "C" {
#include "shaper_global_stat.h"
#include "shaper_aqm.h"
-#define TOKEN_ENLARGE_TIMES 10
+#define TOKEN_ENLARGE_TIMES 10//TODO
#define TOKEN_GET_FAILED_INTERVAL_MS 1
#define HMGET_REQUEST_INTERVAL_MS 10
#define PRIORITY_BLOCK_MIN_TIME_MS 500
@@ -138,8 +138,6 @@ struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
TAILQ_INIT(&s_node->shaping_flow.packet_queue);
s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
- timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
- timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
s_node->shaping_flow.ref_cnt = 1;
@@ -159,8 +157,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
- timeouts_del(ctx->expires, &sf->timeout_handle);
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, 1);
shaping_node_free(s_node);
return;
@@ -182,7 +179,7 @@ static int shaper_packet_enqueue(struct shaping_thread_ctx *ctx, struct shaping_
struct shaping_packet_wrapper *s_pkt = NULL;
struct timespec curr_time;
- if (sf->queue_len == ctx->conf.session_queue_len_max) {
+ if (sf->queue_len == ctx->conf.session_queue_len_max) {//TODO: profile queue_len???
return -1;
}
@@ -386,7 +383,7 @@ int shaper_flow_in_order_get(struct shaper *sp, struct shaper_flow_instance sf_i
static void shaper_deposit_token_add(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- int *deposit_token;
+ long long *deposit_token;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
switch (profile->type) {
@@ -431,7 +428,7 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
- LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
+ LOG_INFO("Swarmkv reply type =%d, profile_id %d, profile_consume_ref = %d, direction =%d, integer =%llu",reply->type, profile->id, pf_hash_node->tconsume_ref_cnt, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
@@ -482,7 +479,7 @@ END:
static void shaper_deposit_token_sub(struct shaping_profile_info *profile, int req_token_bits, unsigned char direction, int priority)
{
- int *deposit_token;
+ long long *deposit_token;
struct shaping_profile_hash_node *pf_hash_node = profile->hash_node;
switch (profile->type) {
@@ -551,23 +548,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
struct shaping_tconsume_cb_arg *arg = NULL;
char key[32] = {0};
- switch (pf_info->type) {
- case PROFILE_TYPE_GENERIC:
- if (pf_info->hash_node->tconsume_ref_cnt > 0) {
- goto END;
- }
- break;
- case PROFILE_TYPE_HOST_FARINESS:
- case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- if (sf->ref_cnt > 1) {
- goto END;
- }
- break;
- default:
- break;
+ if (pf_info->hash_node->tconsume_ref_cnt > 0) {
+ goto END;
}
-
+
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
arg = (struct shaping_tconsume_cb_arg *)calloc(1, sizeof(struct shaping_tconsume_cb_arg));
arg->ctx = ctx;
@@ -580,10 +564,10 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
+ pf_info->hash_node->tconsume_ref_cnt++;
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- pf_info->hash_node->tconsume_ref_cnt++;
swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
@@ -711,7 +695,7 @@ END:
}
}
-void shaper_profile_hash_node_update(struct shaping_profile_info *profile)
+void shaper_profile_hash_node_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile)
{
if (profile->hash_node == NULL) {
struct shaping_profile_hash_node *hash_node = NULL;
@@ -722,6 +706,8 @@ void shaper_profile_hash_node_update(struct shaping_profile_info *profile)
profile->hash_node = (struct shaping_profile_hash_node*)calloc(1, sizeof(struct shaping_profile_hash_node));
profile->hash_node->id = profile->id;
HASH_ADD_INT(thread_sp_hashtbl, id, profile->hash_node);
+ timeout_init(&profile->hash_node->timeout_handle, TIMEOUT_ABS);
+ timeouts_add(ctx->expires, &profile->hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
}
}
@@ -787,11 +773,6 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_FAILED;
}
- if (shaper_swarmkv_pending_queue_aqm_drop(ctx) == 1) {
- profile->hash_node->last_failed_get_token_ms = curr_time_ms;
- return SHAPER_TOKEN_GET_FAILED;
- }
-
if (shaper_profile_is_priority_blocked(ctx, sf, profile, &curr_timespec, curr_time_ms)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
@@ -1018,7 +999,7 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
@@ -1068,6 +1049,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(&ctx->thread_global_stat);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
@@ -1109,12 +1091,7 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
- time_t curr_time = time(NULL);
- if (curr_time > sf->last_update_timeout_sec) {
- timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_add will delete if sf exist, then add
- sf->last_update_timeout_sec = curr_time;
- }
+ shaper_stat_refresh(ctx, sf, 0);
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
@@ -1137,11 +1114,11 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
{
swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
- struct shaping_flow *sf = NULL;
+ struct shaping_profile_hash_node *hash_node = NULL;
time_t curr_time = time(NULL);
int cnt = 0;
- /*if (curr_time > ctx->last_update_timeout_sec) {
+ if (curr_time > ctx->last_update_timeout_sec) {
timeouts_update(ctx->expires, curr_time);
ctx->last_update_timeout_sec = curr_time;
}
@@ -1153,11 +1130,11 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
break;
}
- sf = container_of(t, struct shaping_flow, timeout_handle);
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
- timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete sf from queue, add it back
+ hash_node = container_of(t, struct shaping_profile_hash_node, timeout_handle);
+ shaper_stat_priority_queue_len_refresh_all(ctx, hash_node);
+ timeouts_add(ctx->expires, &hash_node->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);//timeouts_get will delete item from queue, add it back
cnt++;
- }*/
+ }
if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) {
return;
@@ -1488,4 +1465,4 @@ struct shaping_ctx *shaping_engine_init()
ERROR:
shaping_engine_destroy(ctx);
return NULL;
-} \ No newline at end of file
+}
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index 7144658..d59b290 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -63,6 +63,7 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
stat->column_ids[HIT_POLICY_RX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_rx_bytes", NULL, 0);
stat->column_ids[HIT_POLICY_TX_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_TX_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_bytes", NULL, 0);
+ stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_tx_syn_ack_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_DROP_PKTS_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_pkts", NULL, 0);
stat->column_ids[HIT_POLICY_DROP_BYTES_IDX] = fieldstat_register(stat->instance, FIELD_TYPE_COUNTER, "shape_drop_bytes", NULL, 0);
@@ -344,6 +345,12 @@ void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_
data->tx_bytes += pkt_len;
}
+void shaper_global_stat_hit_policy_throughput_tx_syn_ack_inc(struct shaping_global_stat_data *thread_global_stat)
+{
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
+ data->tx_syn_ack_pkts++;
+}
+
void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
@@ -402,6 +409,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx)
sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes;
sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts;
sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes;
+ sum.hit_policy_traffic.tx_syn_ack_pkts += stat_data[i].hit_policy_traffic.tx_syn_ack_pkts;
sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts;
sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes;
}
@@ -446,6 +454,7 @@ void shaper_global_stat_refresh(struct shaping_ctx *ctx)
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_SYN_ACK_PKTS_IDX], hit_policy_traffic_data->tx_syn_ack_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index 4e07439..ec7b59a 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -253,16 +253,16 @@ void shaper_profile_ex_free(int table_id, void **ad, long argl, void *argp)
return;
}
-void shaper_profile_update(struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex)
+void shaper_profile_update(struct shaping_thread_ctx *ctx, struct shaping_profile_info *s_pf_info, struct shaping_profile *s_pf_ex)
{
s_pf_info->id = s_pf_ex->id;
s_pf_info->type = s_pf_ex->type;
- shaper_profile_hash_node_update(s_pf_info);
+ shaper_profile_hash_node_update(ctx, s_pf_info);
return;
}
-static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed)
+static int shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, struct shaping_rule_info *s_rule_info, long long rule_compile_id, int *priority_changed)
{
struct shaping_rule *s_rule = NULL;
struct shaping_profile *s_pf = NULL;
@@ -271,7 +271,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_rule = (struct shaping_rule*)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->rule_table_id, (char *)&rule_compile_id, sizeof(rule_compile_id));
if (!s_rule) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get rule failed for compile id %lld", LOG_TAG_MAAT, rule_compile_id);
- goto END;
+ return -1;
}
s_rule_info->id = s_rule->id;
s_rule_info->fair_factor = s_rule->fair_factor;
@@ -283,9 +283,9 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key));
if (!s_pf) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
- goto END;
+ return -1;
}
- shaper_profile_update(&s_rule_info->primary, s_pf);
+ shaper_profile_update(ctx, &s_rule_info->primary, s_pf);
if (sf->processed_pkts <= CONFIRM_PRIORITY_PKTS) {
if (sf->priority > s_rule->priority) {
@@ -295,7 +295,7 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
}
if (s_rule->borrow_pf_num == 0) {
- goto END;
+ return 0;
}
for (int i = 0; i < s_rule->borrow_pf_num; i++) {
@@ -304,15 +304,14 @@ static void shaper_rule_update(struct shaping_thread_ctx *ctx, struct shaping_fl
s_pf = (struct shaping_profile *)maat_plugin_table_get_ex_data(g_maat_instance, ctx->maat_info->profile_table_id, pf_id_key, strlen(pf_id_key));
if (!s_pf) {
LOG_ERROR("%s maat_plugin_table_get_ex_data get profile failed for key %s", LOG_TAG_MAAT, pf_id_key);
- goto END;
+ return -1;
}
- shaper_profile_update(&s_rule_info->borrowing[i], s_pf);
+ shaper_profile_update(ctx, &s_rule_info->borrowing[i], s_pf);
s_rule_info->borrowing_num++;
}
-END:
- return;
+ return 0;
}
static void shaper_profiles_priority_update(struct shaping_flow *sf)
@@ -361,10 +360,10 @@ static int shaper_rules_dup_remove(struct shaping_flow *sf, long long *rule_comp
void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, long long *rule_compile_ids, int rule_num)
{
- int i, j;
int priority_changed = 0;
long long rule_ids_remove_dup[SHAPING_RULE_NUM_MAX] = {0};
int rule_num_remove_dup = 0;
+ int old_rule_num = sf->rule_num;
if (rule_num > SHAPING_RULE_NUM_MAX) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -389,15 +388,16 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
return;
}
- for (i = sf->rule_num, j = 0; i < sf->rule_num + rule_num_remove_dup; i++, j++) {
- shaper_rule_update(ctx, sf, &sf->matched_rule_infos[i], rule_ids_remove_dup[j], &priority_changed);
+ for (int i = 0; rule_num_remove_dup; i++) {
+ if (shaper_rule_update(ctx, sf, &sf->matched_rule_infos[sf->rule_num], rule_ids_remove_dup[i], &priority_changed) == 0) {
+ sf->rule_num++;
+ }
}
- if (sf->rule_num > 0 && priority_changed) {
- shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
+ if (old_rule_num > 0 && priority_changed) {
+ shaper_stat_refresh(ctx, sf, 1);
}
- sf->rule_num += rule_num_remove_dup;
shaper_profiles_priority_update(sf);
return;
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 75db668..1e36a35 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -165,9 +165,13 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
return;
}
-static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_info *profile, int priority, long long curr_time_us)
+static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node, int priority, long long curr_time_us)
{
- if (curr_time_us - profile->hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) {
+ if (profile_hash_node->queue_len[priority] == 0) {
+ return;
+ }
+
+ if (curr_time_us - profile_hash_node->local_queue_len_update_time_us[priority] < SHAPER_STAT_REFRESH_TIME_US) {
return;
}
@@ -175,24 +179,40 @@ static void shaper_stat_priority_queue_len_refresh(struct shaping_thread_ctx *ct
arg->ctx = ctx;
arg->start_time_us = curr_time_us;
- arg->profile_id = profile->id;
+ arg->profile_id = profile_hash_node->id;
arg->priority = priority;
- arg->queue_len = profile->hash_node->queue_len[priority];
+ arg->queue_len = profile_hash_node->queue_len[priority];
shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", arg->profile_id, arg->priority, arg->queue_len);
- profile->hash_node->local_queue_len_update_time_us[priority] = curr_time_us;
- profile->hash_node->queue_len[priority] = 0;
+ profile_hash_node->local_queue_len_update_time_us[priority] = curr_time_us;
+ profile_hash_node->queue_len[priority] = 0;
+
+ return;
+}
+
+void shaper_stat_priority_queue_len_refresh_all(struct shaping_thread_ctx *ctx, struct shaping_profile_hash_node *profile_hash_node)
+{
+ struct timespec curr_time;
+ long long curr_time_us;
+
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
+
+ for (int i = 0; i < SHAPING_PRIORITY_NUM_MAX; i++) {
+ shaper_stat_priority_queue_len_refresh(ctx, profile_hash_node, i, curr_time_us);
+ }
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, int thread_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us)
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, struct shaping_rule_info *rule, struct shaping_profile_info *profile, int profile_type, int need_update_guage, long long curr_time_us)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
struct shaping_stat *stat = ctx->stat;
int priority = profile->priority;
+ int thread_id = ctx->thread_index;
unsigned long long old_latency;
shaper_stat_tags_build(rule->vsys_id, rule->id, profile->id, priority, profile_type);
@@ -221,7 +241,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
}
profile->hash_node->queue_len[priority] += profile_stat->in.queue_len + profile_stat->out.queue_len;
- shaper_stat_priority_queue_len_refresh(ctx, profile, priority, curr_time_us);
+ shaper_stat_priority_queue_len_refresh(ctx, profile->hash_node, priority, curr_time_us);
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
@@ -239,7 +259,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, s
return;
}
-void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force)
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int force)
{
struct shaping_rule_info *rule;
struct timespec curr_time;
@@ -258,7 +278,7 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
}
}
- if (!need_refresh) {
+ if (!need_refresh) {//TODO: add queue_len to profile???
return;
}
@@ -266,10 +286,10 @@ void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(ctx, rule, thread_id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage, curr_time_us);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(ctx, rule, thread_id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us);
+ shaper_stat_profile_metirc_refresh(ctx, rule, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage, curr_time_us);
}
}
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 6d5ddc0..6ac1db5 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -106,36 +106,6 @@ void swarmkv_reload_log_level()
return;
}
-int shaper_swarmkv_pending_queue_aqm_drop(struct shaping_thread_ctx *ctx)
-{
- long long pending_queue_len = swarmkv_caller_get_pending_commands(ctx->swarmkv_db);
- time_t now = time(NULL);
-
- if (now - ctx->swarmkv_aqm_update_time < FREEZE_TIME) {
- goto END;
- }
-
- if (pending_queue_len > PENDING_QUEUE_LEN_MAX) {
- if (ctx->swarmkv_aqm_prob < PROBABILITY_MAX) {
- ctx->swarmkv_aqm_prob += INCREMENT;
- }
- LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
- } else {
- if (ctx->swarmkv_aqm_prob >= DECREMENT) {
- ctx->swarmkv_aqm_prob -= DECREMENT;
- }
- LOG_DEBUG("%s: shaping pending queue len %lld, aqm prob %d", LOG_TAG_SWARMKV, pending_queue_len, ctx->swarmkv_aqm_prob);
- }
- ctx->swarmkv_aqm_update_time = now;
-
-END:
- if (rand() % PROBABILITY_MAX < ctx->swarmkv_aqm_prob) {
- return 1;
- }
-
- return 0;
-}
-
struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index f262412..8d792f3 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -419,7 +419,7 @@ TEST(single_session, tcp_tx_in_order)
/***********send stat data here********************/
- shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
sleep(2);//wait telegraf generate metric
@@ -1199,7 +1199,7 @@ TEST(two_session_diff_priority_same_profile, udp_borrow_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue2, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
}
- shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 0, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf2, 1);//refresh stat, to ensure priority queue_len in swarmkv is correct
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue1)) {//last 90 delay packets
@@ -1373,7 +1373,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 10));
ASSERT_TRUE(TAILQ_EMPTY(actual_tx_queue));
- shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
for (int i = 0; i < 10; i++) {//线程1中的session优先级为2,被线程0中优先级为1的session阻断
stub_refresh_token_bucket(0);
@@ -1388,7 +1388,7 @@ TEST(two_session_diff_priority_same_profile, two_thread_udp_tx_in_order)
ASSERT_EQ(0, judge_packet_eq(&expec_tx_queue1, actual_tx_queue, 1));//sf1 priority 1
}
- shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 0, 1);//刷新线程0中的优先级队列长度到swarmkv中
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf1, 1);//刷新线程0中的优先级队列长度到swarmkv中
stub_curr_time_s_inc(1);//inc time to refresh hmget interval
while (!TAILQ_EMPTY(&expec_tx_queue2)) {
stub_refresh_token_bucket(0);
@@ -1837,7 +1837,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
- shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
+ shaper_stat_refresh(&ctx->thread_ctx[0], sf, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
shaper_global_stat_refresh(ctx);
sleep(2);//wait telegraf generate metric