summaryrefslogtreecommitdiff
path: root/shaping/src
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src')
-rw-r--r--shaping/src/main.cpp2
-rw-r--r--shaping/src/shaper.cpp140
-rw-r--r--shaping/src/shaper_maat.cpp2
-rw-r--r--shaping/src/shaper_session.cpp2
-rw-r--r--shaping/src/shaper_stat.cpp27
-rw-r--r--shaping/src/shaper_swarmkv.cpp6
6 files changed, 108 insertions, 71 deletions
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 91e1202..f9a38b2 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -25,6 +25,8 @@ static void *shaper_thread_loop(void *data)
return NULL;
}
+ swarmkv_register_thread(ctx->swarmkv_db);
+
//loop to process pkts
while(!quit) {
shaper_packet_recv_and_process(ctx);
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 09a0c24..02d7f33 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -27,8 +27,14 @@ extern "C" {
#define MICRO_SECONDS_PER_SEC 1000000
#define NANO_SECONDS_PER_SEC 1000000000
+#define NANO_SECONDS_PER_MILLI_SEC 1000000
+#define MILLI_SECONDS_PER_SEC 1000
+
#define SHAPING_LATENCY_THRESHOLD 2000000 //2s
+#define TOKEN_ENLARGE_TIMES 10
+#define TOKEN_GET_FAILED_INTERVAL_MS 1
+
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1"
#define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2"
@@ -137,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node)
return;
}
-struct shaping_flow* shaping_flow_new()
+struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
{
struct shaping_node *s_node = NULL;
int i;
@@ -157,6 +163,8 @@ struct shaping_flow* shaping_flow_new()
TAILQ_INIT(&s_node->shaping_flow.packet_queue);
s_node->shaping_flow.ref_count = 1;
s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
+ timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
+ timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
return &s_node->shaping_flow;
@@ -170,7 +178,8 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ timeouts_del(ctx->expires, &sf->timeout_handle);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
shaping_node_free(s_node);
}
@@ -250,19 +259,6 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
return;
}
-static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg)
-{
- struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
-
- shaper_global_stat_async_callback_inc(global_stat);
-
- if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(global_stat);
- }
-
- return;
-}
-
//return success(0) while any avl tree insert success
int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time)
{
@@ -277,20 +273,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
pkt_wrapper = shaper_first_pkt_get(sf);
assert(pkt_wrapper != NULL);
- if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) {
- if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) {
- sf->flag |= SESSION_UPDATE_PF_PRIO_LEN;
- }
- }
-
priority = s_rule_info->primary.priority;
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {// no borrow profile
@@ -302,10 +288,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un
avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns);
if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) {
ret = 0;
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
+ //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index);
}
}
@@ -345,10 +329,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->primary.priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority);
- }
}
if (s_rule_info->borrowing_num == 0) {
@@ -359,10 +339,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
priority = s_rule_info->borrowing[i].priority;
if (avl_node_in_tree(s_node->avl_node[priority])) {
avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]);
- if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) {
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority);
- }
+ //TODO: calculate queue_len for borrow profile and add judge when refresh stat????
}
}
@@ -416,6 +393,8 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
shaper_global_stat_async_callback_inc(arg->ctx->global_stat);
+ LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer);
+
if (reply->type != SWARMKV_REPLY_INTEGER) {
shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat);
goto END;
@@ -428,10 +407,16 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
s_pf_info->is_invalid = 0;
}
+ if (reply->integer == 0) {//no token
+ struct timespec curr_time;
+ clock_gettime(CLOCK_MONOTONIC, &curr_time);
+ s_pf_info->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ goto END;
+ }
+
shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile
END:
- __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
@@ -470,7 +455,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
struct shaping_async_cb_arg *arg = NULL;
char key[32] = {0};
- __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming");
@@ -483,14 +467,14 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
shaper_global_stat_async_invoke_inc(ctx->global_stat);
switch (pf_info->type) {
case PROFILE_TYPE_GENERIC:
- swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_HOST_FARINESS:
case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS:
- swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST:
- swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits, shaper_token_get_cb, arg);
+ swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg);
break;
default:
if (arg) {
@@ -499,11 +483,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
break;
}
- if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed
- shaper_deposit_token_sub(pf_info, req_token_bits, direction);
- return SHAPER_TOKEN_GET_SUCCESS;
- }
-
if (pf_info->is_invalid) {
if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token
return SHAPER_TOKEN_GET_SUCCESS;
@@ -551,7 +530,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
}
END:
- __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free
free(cb_arg);
cb_arg = NULL;
@@ -572,20 +550,15 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg->sf = sf;
arg->priority = priority;
- __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST);
__atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST);
shaper_global_stat_async_invoke_inc(ctx->global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
- if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) {
- return 0;
+ if (profile->is_priority_blocked) {
+ return 1;
} else {
- if (profile->is_priority_blocked) {
- return 1;
- } else {
- return 0;
- }
+ return 0;
}
}
@@ -609,6 +582,18 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f
return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet
}
+ if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) {
+ shaper_deposit_token_sub(profile, req_token_bytes * 8, direction);
+ return SHAPER_TOKEN_GET_SUCCESS;
+ }
+
+ struct timespec curr_timespec;
+ clock_gettime(CLOCK_MONOTONIC, &curr_timespec);
+ unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC;
+ if (curr_time_ms - profile->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms
+ return SHAPER_TOKEN_GET_FAILED;
+ }
+
if (shaper_profile_is_priority_blocked(ctx, sf, profile)) {
return SHAPER_TOKEN_GET_FAILED;
} else {
@@ -831,12 +816,12 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
break;
}
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
if (shaper_queue_empty(sf)) {
if (sf->flag & SESSION_CLOSE) {
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
}
return 0;
} else {
@@ -849,8 +834,8 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
} else {
shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail
if (sf->flag & SESSION_CLOSE) {
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
}
}
return 0;
@@ -911,14 +896,20 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
}
END:
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ time_t curr_time = time(NULL);
+ if (curr_time > sf->last_update_timeout_sec) {
+ timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ sf->last_update_timeout_sec = curr_time;
+ }
+
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str);
- shaping_flow_free(ctx, sf);
sf->flag &= (~SESSION_CLOSE);
+ shaping_flow_free(ctx, sf);
if (addr_str) {
free(addr_str);
@@ -931,6 +922,27 @@ END:
void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx)
{
+ swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
+
+ struct timeout *t = NULL;
+ struct shaping_flow *sf = NULL;
+ time_t curr_time = time(NULL);
+ int cnt = 0;
+
+ if (curr_time > ctx->last_update_timeout_sec) {
+ timeouts_update(ctx->expires, curr_time);
+ ctx->last_update_timeout_sec = curr_time;
+ }
+
+ t = timeouts_get(ctx->expires);
+ while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) {
+ sf = container_of(t, struct shaping_flow, timeout_handle);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ t = timeouts_get(ctx->expires);
+ cnt++;
+ }
+
if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) {
return;
}
@@ -1182,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
for (int i = 0; i < ctx->thread_num; i++) {
shaper_free(ctx->thread_ctx[i].sp);
session_table_destory(ctx->thread_ctx[i].session_table);
+ timeouts_close(ctx->thread_ctx[i].expires);
}
free(ctx->thread_ctx);
}
@@ -1196,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init()
{
struct shaping_system_conf conf;
struct shaping_ctx *ctx = NULL;
- int ret;
+ int ret, error;
memset(&conf, 0, sizeof(conf));
ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx));
@@ -1208,7 +1221,7 @@ struct shaping_ctx *shaping_engine_init()
}
/*init swarmkv*/
- ctx->swarmkv_db = shaper_swarmkv_init();
+ ctx->swarmkv_db = shaper_swarmkv_init(conf.work_thread_num);
if (ctx->swarmkv_db == NULL) {
goto ERROR;
}
@@ -1246,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db;
+ ctx->thread_ctx[i].expires = timeouts_open(0, &error);
ctx->thread_ctx[i].ref_ctx = ctx;
memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf));
}
diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp
index 4b4f21f..64db3e6 100644
--- a/shaping/src/shaper_maat.cpp
+++ b/shaping/src/shaper_maat.cpp
@@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf
}
if (sf->rule_num > 0 && priority_changed) {
- shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
}
sf->rule_num += rule_num;
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index f43f76f..af4a7ee 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -30,7 +30,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
return NULL;
}
- sf = shaping_flow_new();
+ sf = shaping_flow_new(ctx);
raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4);
sf->src_ip_str = addr_src_ip_to_str(&sf->tuple4);
sf->src_ip_str_len = strlen(sf->src_ip_str);
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index 6bd2cfb..6ede233 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -4,12 +4,14 @@
#include <sys/socket.h>
#include <arpa/inet.h>
#include <MESA/MESA_prof_load.h>
+#include <MESA/swarmkv.h>
#include <fieldstat.h>
#include "log.h"
#include "utils.h"
#include "shaper.h"
#include "shaper_stat.h"
+#include "shaper_global_stat.h"
#define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits"
@@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int
return;
}
-static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
+static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg)
+{
+ struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg;
+
+ shaper_global_stat_async_callback_inc(global_stat);
+
+ if (reply->type != SWARMKV_REPLY_INTEGER) {
+ shaper_global_stat_async_hincrby_failed_inc(global_stat);
+ }
+
+ return;
+}
+
+static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage)
{
struct shaping_stat_for_profile *profile_stat = &profile->stat;
+ struct shaping_stat *stat = ctx->stat;
unsigned long long old_latency;
shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type);
@@ -158,6 +174,9 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
if (need_update_guage) {
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id);
fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id);
+
+ shaper_global_stat_async_invoke_inc(ctx->global_stat);
+ swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
} else {
profile_stat->in.pkts = 0;
@@ -174,7 +193,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs
return;
}
-void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force)
+void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force)
{
struct shaping_rule_info *rule;
struct timespec curr_time;
@@ -199,10 +218,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int
for (int i = 0; i < sf->rule_num; i++) {
rule = &sf->matched_rule_infos[i];
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage);
for (int j = 0; j < rule->borrowing_num; j++) {
- shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
+ shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage);
}
}
diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp
index 6d2e32f..05ccecc 100644
--- a/shaping/src/shaper_swarmkv.cpp
+++ b/shaping/src/shaper_swarmkv.cpp
@@ -1,10 +1,10 @@
-#include <MESA/swarmkv.h>
#include <MESA/MESA_handle_logger.h>
#include <MESA/MESA_prof_load.h>
#include "log.h"
#include "shaper.h"
#include "utils.h"
+#include "shaper_swarmkv.h"
struct shaper_swarmkv_conf
{
@@ -97,7 +97,7 @@ void swarmkv_reload_log_level()
return;
}
-struct swarmkv* shaper_swarmkv_init()
+struct swarmkv* shaper_swarmkv_init(int caller_thread_num)
{
struct swarmkv_options *swarmkv_opts = NULL;
struct swarmkv *swarmkv_db = NULL;
@@ -120,6 +120,8 @@ struct swarmkv* shaper_swarmkv_init()
swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port);
swarmkv_options_set_log_path(swarmkv_opts, "log");
swarmkv_options_set_log_level(swarmkv_opts, conf.swarmkv_log_level);
+ swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num);
+ swarmkv_options_set_worker_thread_number(swarmkv_opts, 1);
swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err);
if (err) {