diff options
Diffstat (limited to 'shaping/src')
| -rw-r--r-- | shaping/src/main.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 140 | ||||
| -rw-r--r-- | shaping/src/shaper_maat.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 2 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 27 | ||||
| -rw-r--r-- | shaping/src/shaper_swarmkv.cpp | 6 |
6 files changed, 108 insertions, 71 deletions
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 91e1202..f9a38b2 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -25,6 +25,8 @@ static void *shaper_thread_loop(void *data) return NULL; } + swarmkv_register_thread(ctx->swarmkv_db); + //loop to process pkts while(!quit) { shaper_packet_recv_and_process(ctx); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 09a0c24..02d7f33 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -27,8 +27,14 @@ extern "C" { #define MICRO_SECONDS_PER_SEC 1000000 #define NANO_SECONDS_PER_SEC 1000000000 +#define NANO_SECONDS_PER_MILLI_SEC 1000000 +#define MILLI_SECONDS_PER_SEC 1000 + #define SHAPING_LATENCY_THRESHOLD 2000000 //2s +#define TOKEN_ENLARGE_TIMES 10 +#define TOKEN_GET_FAILED_INTERVAL_MS 1 + #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 "HMGET tsg-shaping-%d priority-0" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_1 " priority-1" #define SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_3 SWARMKV_QUEUE_LEN_GET_CMD_PRIORITY_2 " priority-2" @@ -137,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node) return; } -struct shaping_flow* shaping_flow_new() +struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; int i; @@ -157,6 +163,8 @@ struct shaping_flow* shaping_flow_new() TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.ref_count = 1; s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; + timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); + timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); return &s_node->shaping_flow; @@ -170,7 +178,8 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + timeouts_del(ctx->expires, &sf->timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); shaping_node_free(s_node); } @@ -250,19 +259,6 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) return; } -static void swarmkv_reply_cb_do_nothing(const struct swarmkv_reply *reply, void * cb_arg) -{ - struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; - - shaper_global_stat_async_callback_inc(global_stat); - - if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_hincrby_failed_inc(global_stat); - } - - return; -} - //return success(0) while any avl tree insert success int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, unsigned long long enqueue_time) { @@ -277,20 +273,10 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un pkt_wrapper = shaper_first_pkt_get(sf); assert(pkt_wrapper != NULL); - if ((sf->flag & SESSION_UPDATE_PF_PRIO_LEN) == 0) { - if (sf->processed_pkts > CONFIRM_PRIORITY_PKTS) { - sf->flag |= SESSION_UPDATE_PF_PRIO_LEN; - } - } - priority = s_rule_info->primary.priority; avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) {// no borrow profile @@ -302,10 +288,8 @@ int shaper_flow_push(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, un avl_tree_node_key_set(s_node->avl_node[priority], pkt_wrapper->income_time_ns); if (0 == avl_tree_node_insert(sp->priority_trees[priority], s_node->avl_node[priority])) { ret = 0; - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d 1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? + //shaper_stat_queueing_pkt_inc(&s_rule_info->borrowing[i].stat, pkt_wrapper->direction, ctx->thread_index); } } @@ -345,10 +329,6 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->primary.priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->primary.id, priority); - } } if (s_rule_info->borrowing_num == 0) { @@ -359,10 +339,7 @@ void shaper_flow_pop(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) priority = s_rule_info->borrowing[i].priority; if (avl_node_in_tree(s_node->avl_node[priority])) { avl_tree_node_remove(sp->priority_trees[priority], s_node->avl_node[priority]); - if (sf->flag & SESSION_UPDATE_PF_PRIO_LEN) { - shaper_global_stat_async_invoke_inc(ctx->global_stat); - swarmkv_async_command(ctx->swarmkv_db, swarmkv_reply_cb_do_nothing, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d -1", s_rule_info->borrowing[i].id, priority); - } + //TODO: calculate queue_len for borrow profile and add judge when refresh stat???? } } @@ -416,6 +393,8 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg shaper_global_stat_async_callback_inc(arg->ctx->global_stat); + LOG_INFO("Swarmkv reply type =%d, integer =%llu",reply->type, reply->integer); + if (reply->type != SWARMKV_REPLY_INTEGER) { shaper_global_stat_async_tconsume_failed_inc(arg->ctx->global_stat); goto END; @@ -428,10 +407,16 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg s_pf_info->is_invalid = 0; } + if (reply->integer == 0) {//no token + struct timespec curr_time; + clock_gettime(CLOCK_MONOTONIC, &curr_time); + s_pf_info->last_failed_get_token_ms = curr_time.tv_sec * MILLI_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + goto END; + } + shaper_deposit_token_add(s_pf_info, reply->integer, arg->direction);//deposit tokens to profile END: - __atomic_sub_fetch(&s_pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; @@ -470,7 +455,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct struct shaping_async_cb_arg *arg = NULL; char key[32] = {0}; - __atomic_add_fetch(&pf_info->async_token_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); snprintf(key, sizeof(key), "tsg-shaping-%d-%s", pf_info->id, direction == SHAPING_DIR_OUT ? "outgoing" : "incoming"); @@ -483,14 +467,14 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct shaper_global_stat_async_invoke_inc(ctx->global_stat); switch (pf_info->type) { case PROFILE_TYPE_GENERIC: - swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits, shaper_token_get_cb, arg); + swarmkv_tconsume(ctx->swarmkv_db, key, strlen(key), req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; case PROFILE_TYPE_HOST_FARINESS: case PROFILE_TYPE_MAX_MIN_HOST_FAIRNESS: - swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits, shaper_token_get_cb, arg); + swarmkv_ftconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, sf->matched_rule_infos[sf->anchor].fair_factor, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; case PROFILE_TYPE_SPLIT_BY_LOCAL_HOST: - swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits, shaper_token_get_cb, arg); + swarmkv_btconsume(ctx->swarmkv_db, key, strlen(key), sf->src_ip_str, sf->src_ip_str_len, req_token_bits * TOKEN_ENLARGE_TIMES, shaper_token_get_cb, arg); break; default: if (arg) { @@ -499,11 +483,6 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct break; } - if (__atomic_load_n(&pf_info->async_token_ref_count, __ATOMIC_SEQ_CST) != 0) {//has async operation not completed - shaper_deposit_token_sub(pf_info, req_token_bits, direction); - return SHAPER_TOKEN_GET_SUCCESS; - } - if (pf_info->is_invalid) { if (profile_type == PROFILE_IN_RULE_TYPE_PRIMARY) {//for primary, means this rule don't need get token return SHAPER_TOKEN_GET_SUCCESS; @@ -551,7 +530,6 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb } END: - __atomic_sub_fetch(&s_pf_info->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); shaping_flow_free(arg->ctx, sf);//sub ref count and decide if need to free free(cb_arg); cb_arg = NULL; @@ -572,20 +550,15 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg->sf = sf; arg->priority = priority; - __atomic_add_fetch(&profile->async_queue_len_ref_count, 1, __ATOMIC_SEQ_CST); __atomic_add_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST); shaper_global_stat_async_invoke_inc(ctx->global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); - if (__atomic_load_n(&profile->async_queue_len_ref_count, __ATOMIC_SEQ_CST) != 0) { - return 0; + if (profile->is_priority_blocked) { + return 1; } else { - if (profile->is_priority_blocked) { - return 1; - } else { - return 0; - } + return 0; } } @@ -609,6 +582,18 @@ static int shaper_token_consume(struct shaping_thread_ctx *ctx, struct shaping_f return SHAPER_TOKEN_GET_PASS;//rule is disabled, don't need to get token and forward packet } + if (shaper_deposit_token_is_enough(profile, req_token_bytes * 8, direction)) { + shaper_deposit_token_sub(profile, req_token_bytes * 8, direction); + return SHAPER_TOKEN_GET_SUCCESS; + } + + struct timespec curr_timespec; + clock_gettime(CLOCK_MONOTONIC, &curr_timespec); + unsigned long long curr_time_ms = curr_timespec.tv_sec * MILLI_SECONDS_PER_SEC + curr_timespec.tv_nsec / NANO_SECONDS_PER_MILLI_SEC; + if (curr_time_ms - profile->last_failed_get_token_ms < TOKEN_GET_FAILED_INTERVAL_MS) {//if failed to get token in last 1ms, return failed; for swarmkv can't reproduce token in 1ms + return SHAPER_TOKEN_GET_FAILED; + } + if (shaper_profile_is_priority_blocked(ctx, sf, profile)) { return SHAPER_TOKEN_GET_FAILED; } else { @@ -831,12 +816,12 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ break; } - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); if (shaper_queue_empty(sf)) { if (sf->flag & SESSION_CLOSE) { - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); } return 0; } else { @@ -849,8 +834,8 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } else { shaper_queue_clear(sf, ctx);//first packet fail, then every packet will fail if (sf->flag & SESSION_CLOSE) { - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); } } return 0; @@ -911,14 +896,20 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu } END: - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 0); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + time_t curr_time = time(NULL); + if (curr_time > sf->last_update_timeout_sec) { + timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC); + sf->last_update_timeout_sec = curr_time; + } + if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); LOG_DEBUG("%s: shaping free a shaping_flow for session: %s", LOG_TAG_SHAPING, addr_str); - shaping_flow_free(ctx, sf); sf->flag &= (~SESSION_CLOSE); + shaping_flow_free(ctx, sf); if (addr_str) { free(addr_str); @@ -931,6 +922,27 @@ END: void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_thread_ctx *ctx) { + swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); + + struct timeout *t = NULL; + struct shaping_flow *sf = NULL; + time_t curr_time = time(NULL); + int cnt = 0; + + if (curr_time > ctx->last_update_timeout_sec) { + timeouts_update(ctx->expires, curr_time); + ctx->last_update_timeout_sec = curr_time; + } + + t = timeouts_get(ctx->expires); + while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) { + sf = container_of(t, struct shaping_flow, timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); + t = timeouts_get(ctx->expires); + cnt++; + } + if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) { return; } @@ -1182,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); session_table_destory(ctx->thread_ctx[i].session_table); + timeouts_close(ctx->thread_ctx[i].expires); } free(ctx->thread_ctx); } @@ -1196,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init() { struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; - int ret; + int ret, error; memset(&conf, 0, sizeof(conf)); ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx)); @@ -1208,7 +1221,7 @@ struct shaping_ctx *shaping_engine_init() } /*init swarmkv*/ - ctx->swarmkv_db = shaper_swarmkv_init(); + ctx->swarmkv_db = shaper_swarmkv_init(conf.work_thread_num); if (ctx->swarmkv_db == NULL) { goto ERROR; } @@ -1246,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; + ctx->thread_ctx[i].expires = timeouts_open(0, &error); ctx->thread_ctx[i].ref_ctx = ctx; memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf)); } diff --git a/shaping/src/shaper_maat.cpp b/shaping/src/shaper_maat.cpp index 4b4f21f..64db3e6 100644 --- a/shaping/src/shaper_maat.cpp +++ b/shaping/src/shaper_maat.cpp @@ -381,7 +381,7 @@ void shaper_rules_update(struct shaping_thread_ctx *ctx, struct shaping_flow *sf } if (sf->rule_num > 0 && priority_changed) { - shaper_stat_refresh(ctx->stat, sf, ctx->thread_index, 1); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); } sf->rule_num += rule_num; diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index f43f76f..af4a7ee 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -30,7 +30,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru return NULL; } - sf = shaping_flow_new(); + sf = shaping_flow_new(ctx); raw_packet_parser_get_most_inner_tuple4(raw_parser, &sf->tuple4); sf->src_ip_str = addr_src_ip_to_str(&sf->tuple4); sf->src_ip_str_len = strlen(sf->src_ip_str); diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index 6bd2cfb..6ede233 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -4,12 +4,14 @@ #include <sys/socket.h> #include <arpa/inet.h> #include <MESA/MESA_prof_load.h> +#include <MESA/swarmkv.h> #include <fieldstat.h> #include "log.h" #include "utils.h" #include "shaper.h" #include "shaper_stat.h" +#include "shaper_global_stat.h" #define SHAPER_STAT_ROW_NAME "traffic_shaping_rule_hits" @@ -131,9 +133,23 @@ static void shaper_stat_tags_build(int vsys_id, int rule_id, int profile_id, int return; } -static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) +static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, void * cb_arg) +{ + struct shaping_global_stat *global_stat = (struct shaping_global_stat *)cb_arg; + + shaper_global_stat_async_callback_inc(global_stat); + + if (reply->type != SWARMKV_REPLY_INTEGER) { + shaper_global_stat_async_hincrby_failed_inc(global_stat); + } + + return; +} + +static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, int vsys_id, int thread_id, int rule_id, struct shaping_profile_info *profile, int profile_type, int need_update_guage) { struct shaping_stat_for_profile *profile_stat = &profile->stat; + struct shaping_stat *stat = ctx->stat; unsigned long long old_latency; shaper_stat_tags_build(vsys_id, rule_id, profile->id, profile->priority, profile_type); @@ -158,6 +174,9 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs if (need_update_guage) { fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[IN_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->in.queue_len, tags, TAG_IDX_MAX, thread_id); fieldstat_dynamic_table_metric_value_incrby(stat->instance, stat->table_id, stat->column_ids[OUT_QUEUE_LEN_IDX], SHAPER_STAT_ROW_NAME, profile_stat->out.queue_len, tags, TAG_IDX_MAX, thread_id); + + shaper_global_stat_async_invoke_inc(ctx->global_stat); + swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, ctx->global_stat, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len); memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); } else { profile_stat->in.pkts = 0; @@ -174,7 +193,7 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_stat *stat, int vs return; } -void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int thread_id, int force) +void shaper_stat_refresh(struct shaping_thread_ctx *ctx, struct shaping_flow *sf, int thread_id, int force) { struct shaping_rule_info *rule; struct timespec curr_time; @@ -199,10 +218,10 @@ void shaper_stat_refresh(struct shaping_stat *stat, struct shaping_flow *sf, int for (int i = 0; i < sf->rule_num; i++) { rule = &sf->matched_rule_infos[i]; - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->primary, PROFILE_IN_RULE_TYPE_PRIMARY, need_update_guage); for (int j = 0; j < rule->borrowing_num; j++) { - shaper_stat_profile_metirc_refresh(stat, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); + shaper_stat_profile_metirc_refresh(ctx, rule->vsys_id, thread_id, rule->id, &rule->borrowing[j], PROFILE_IN_RULE_TYPE_BORROW, need_update_guage); } } diff --git a/shaping/src/shaper_swarmkv.cpp b/shaping/src/shaper_swarmkv.cpp index 6d2e32f..05ccecc 100644 --- a/shaping/src/shaper_swarmkv.cpp +++ b/shaping/src/shaper_swarmkv.cpp @@ -1,10 +1,10 @@ -#include <MESA/swarmkv.h> #include <MESA/MESA_handle_logger.h> #include <MESA/MESA_prof_load.h> #include "log.h" #include "shaper.h" #include "utils.h" +#include "shaper_swarmkv.h" struct shaper_swarmkv_conf { @@ -97,7 +97,7 @@ void swarmkv_reload_log_level() return; } -struct swarmkv* shaper_swarmkv_init() +struct swarmkv* shaper_swarmkv_init(int caller_thread_num) { struct swarmkv_options *swarmkv_opts = NULL; struct swarmkv *swarmkv_db = NULL; @@ -120,6 +120,8 @@ struct swarmkv* shaper_swarmkv_init() swarmkv_options_set_health_check_announce_port(swarmkv_opts, conf.swarmkv_health_check_announce_port); swarmkv_options_set_log_path(swarmkv_opts, "log"); swarmkv_options_set_log_level(swarmkv_opts, conf.swarmkv_log_level); + swarmkv_options_set_caller_thread_number(swarmkv_opts, caller_thread_num); + swarmkv_options_set_worker_thread_number(swarmkv_opts, 1); swarmkv_db = swarmkv_open(swarmkv_opts, conf.swarmkv_cluster_name, &err); if (err) { |
