diff options
| author | root <[email protected]> | 2023-12-15 10:50:52 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2023-12-15 10:50:52 +0000 |
| commit | 55c8ad6b4c439b29f681a8c5e604646f81b8a768 (patch) | |
| tree | 3f04d8590b26c41bf735eb931865979a1c87f8f6 | |
| parent | 908b0f3a8e6a492b6ba185cda47beae3a5aef39d (diff) | |
global stat 由原子操作改为每个线程自己计数,每次输出时合并
| -rw-r--r-- | shaping/include/shaper.h | 3 | ||||
| -rw-r--r-- | shaping/include/shaper_global_stat.h | 84 | ||||
| -rw-r--r-- | shaping/src/main.cpp | 5 | ||||
| -rw-r--r-- | shaping/src/shaper.cpp | 83 | ||||
| -rw-r--r-- | shaping/src/shaper_global_stat.cpp | 337 | ||||
| -rw-r--r-- | shaping/src/shaper_session.cpp | 10 | ||||
| -rw-r--r-- | shaping/src/shaper_stat.cpp | 13 | ||||
| -rw-r--r-- | shaping/test/gtest_shaper.cpp | 10 |
8 files changed, 276 insertions, 269 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h index 9ae3846..ea6e749 100644 --- a/shaping/include/shaper.h +++ b/shaping/include/shaper.h @@ -6,6 +6,7 @@ #include "session_table.h" #include "utils.h" #include "shaper_stat.h" +#include "shaper_global_stat.h" extern "C" { #include "timeout.h" } @@ -52,10 +53,10 @@ struct shaping_system_conf { struct shaping_thread_ctx { pthread_t tid; int thread_index; + struct shaping_global_stat_data thread_global_stat; struct shaping_ctx *ref_ctx; struct shaper *sp; struct shaping_stat *stat; - struct shaping_global_stat *global_stat; struct shaping_marsio_info *marsio_info; struct swarmkv *swarmkv_db;//handle of swarmkv int swarmkv_aqm_prob; diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h index 78ef41f..940745e 100644 --- a/shaping/include/shaper_global_stat.h +++ b/shaping/include/shaper_global_stat.h @@ -1,9 +1,6 @@ -#include <fieldstat.h> +#pragma once -enum shaping_global_stat_dir { - SHAPING_GLOBAL_STAT_RX = 0, - SHAPING_GLOBAL_STAT_TX -}; +#include <fieldstat.h> enum shaping_global_stat_column_index { CURR_SESSION_NUM_IDX = 0, @@ -86,47 +83,50 @@ struct shaping_global_stat { struct fieldstat_instance *instance; int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX]; int swarmkv_latency_summary_id; - struct shaping_global_stat_data local_stat_data; + struct shaping_global_stat_data *stat_data; int output_interval_s; }; -struct shaping_global_stat* shaper_global_stat_init(); +struct shaping_global_stat* shaper_global_stat_init(int work_thread_num); void shaper_global_stat_destroy(struct shaping_global_stat *stat); void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us); -void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat); -void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat); -void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len); -void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len); -long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat *stat); - -void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat); -void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat); -void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat); -void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat); -void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat *stat); -void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat); -void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat); - -void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat); -void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat); - -void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat *stat); -void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat *stat); -void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat *stat); -void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat *stat); -void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat *stat); -void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat *stat); - -void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat); -void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat); -void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat); - -void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len); -void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len); - -void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len); -void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len); - -void shaper_global_stat_refresh(struct shaping_global_stat *stat);
\ No newline at end of file +void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat); + +void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat_data *thread_global_stat); + +void shaper_global_stat_async_invoke_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_async_callback_inc(struct shaping_global_stat_data *thread_global_stat); + +void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat_data *thread_global_stat); + +void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat_data *thread_global_stat); +void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat_data *thread_global_stat); + +void shaper_global_stat_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); + +void shaper_global_stat_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); + +void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); +void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len); + +void shaper_global_stat_refresh(struct shaping_ctx *ctx);
\ No newline at end of file diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp index 89cb176..d31f768 100644 --- a/shaping/src/main.cpp +++ b/shaping/src/main.cpp @@ -22,6 +22,7 @@ static void *shaper_thread_loop(void *data) { char thread_name[16] = {0}; struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data; + int output_interval_s = ctx->ref_ctx->global_stat->output_interval_s; snprintf(thread_name, sizeof(thread_name), "shape-work-%d", ctx->thread_index); prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL); @@ -40,7 +41,7 @@ static void *shaper_thread_loop(void *data) session_table_reset_with_callback(ctx->session_table, shaper_session_data_free_cb, ctx); __atomic_fetch_and(&ctx->session_need_reset, 0, __ATOMIC_SEQ_CST); } - marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, ctx->global_stat->output_interval_s * 1000); + marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, output_interval_s * 1000); } shaper_thread_resource_clear(); @@ -111,7 +112,7 @@ int main(int argc, char **argv) while(!quit) { time_t curr_time = time(NULL); if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) { - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); last_update_time = curr_time; } sleep(ctx->global_stat->output_interval_s); diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6ae35f0..de31504 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -259,9 +259,9 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -429,15 +429,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(ctx->global_stat); - shaper_global_stat_tconsume_callback_inc(ctx->global_stat); + shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); + shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat); + shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); goto END; } @@ -558,8 +558,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_async_invoke_inc(ctx->global_stat); - sheper_global_stat_tconsume_invoke_inc(ctx->global_stat); + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; @@ -610,15 +610,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(ctx->global_stat); - shaper_global_stat_hmget_callback_inc(ctx->global_stat); + shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); + shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat); pf_hash_node->is_priority_blocked[priority] = 0; if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { - shaper_global_stat_async_hmget_failed_inc(ctx->global_stat); + shaper_global_stat_async_hmget_failed_inc(&ctx->thread_global_stat); goto END; } @@ -667,8 +667,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg->priority = priority; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_async_invoke_inc(ctx->global_stat); - shaper_global_stat_hmget_invoke_inc(ctx->global_stat); + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); END: @@ -959,17 +959,17 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } break; case SHAPING_DROP: - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); break; case SHAPING_FORWARD: - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); @@ -1030,8 +1030,8 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (meta->is_tcp_pure_ctrl) { shaper_token_consume_force(sf, meta); marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } @@ -1040,11 +1040,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu s_rule = &sf->matched_rule_infos[0]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) { shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); - shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } @@ -1054,17 +1054,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: - shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); break; default: assert(0); @@ -1120,7 +1120,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ cnt++; } - if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) { + if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) { return; } @@ -1169,23 +1169,23 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread switch (ctrl_data.state) { case SESSION_STATE_OPENING: - shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_opening_inc(&ctx->thread_global_stat); //sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_ACTIVE: - shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_active_inc(&ctx->thread_global_stat); sf = shaper_session_active(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_CLOSING: - shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_close_inc(&ctx->thread_global_stat); sf = shaper_session_close(ctx, meta); break; case SESSION_STATE_RESETALL: - shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_resetall_inc(&ctx->thread_global_stat); sf = shaper_session_reset_all(ctx, meta); break; default: - shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_err_inc(&ctx->thread_global_stat); assert(0); } @@ -1207,7 +1207,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ session_node = session_table_search_by_id(ctx->session_table, meta->session_id); if (session_node) { sf = (struct shaping_flow *)session_node->val_data; - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_rx_inc(&ctx->thread_global_stat, meta->raw_len); } return sf; @@ -1233,11 +1233,11 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); } - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len); + shaper_global_stat_throughput_rx_inc(&ctx->thread_global_stat, meta.raw_len); if (meta.is_ctrl_pkt || !sf || sf->rule_num == 0) {//ctrl pkt need send directly marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len); } else { shaping_packet_process(ctx, rx_buff[i], &meta, sf); } @@ -1421,7 +1421,7 @@ struct shaping_ctx *shaping_engine_init() goto ERROR; } - ctx->global_stat = shaper_global_stat_init(); + ctx->global_stat = shaper_global_stat_init(conf.work_thread_num); if (ctx->global_stat == NULL) { goto ERROR; } @@ -1432,7 +1432,6 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); ctx->thread_ctx[i].stat = ctx->stat; - ctx->thread_ctx[i].global_stat = ctx->global_stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp index 748becb..771438a 100644 --- a/shaping/src/shaper_global_stat.cpp +++ b/shaping/src/shaper_global_stat.cpp @@ -1,3 +1,4 @@ +#include <cstring> #include <stdlib.h> #include <MESA/MESA_prof_load.h> @@ -68,12 +69,13 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat) return; } -struct shaping_global_stat* shaper_global_stat_init() +struct shaping_global_stat* shaper_global_stat_init(int work_thread_num) { struct shaping_global_stat *stat = NULL; struct shaping_global_stat_conf conf; stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat)); + stat->stat_data = (struct shaping_global_stat_data*)calloc(work_thread_num, sizeof(struct shaping_global_stat_data)); if (shaper_global_stat_conf_load(stat, &conf) != 0) { LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT); @@ -140,305 +142,308 @@ void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, return; } -void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat) +void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED); + thread_global_stat->curr_session_num++; return; } -void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat) +void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_sub_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED); + thread_global_stat->curr_session_num--; return; } -void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len) +void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED); + thread_global_stat->queueing_pkts++; + thread_global_stat->queueing_bytes += pkt_len; return; } -void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len) +void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_sub_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED); - __atomic_sub_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED); + thread_global_stat->queueing_pkts--; + thread_global_stat->queueing_bytes -= pkt_len; return; } -long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat *stat) +long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat) { - return stat->local_stat_data.queueing_pkts; + return thread_global_stat->queueing_pkts; } -void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_error, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_error++; return; } -void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_opening, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_opening++; return; } -void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_active, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_active++; return; } -void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_close, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_close++; return; } -void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_active_close, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_active_close++; return; } -void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat) +void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->ctrl_resetall, 1, __ATOMIC_RELAXED); + thread_global_stat->ctrl_resetall++; return; } -void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat) +void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->session_log_send, 1, __ATOMIC_RELAXED); + thread_global_stat->session_log_send++; return; } -void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_invoke_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_invoke, 1, __ATOMIC_RELAXED); + thread_global_stat->async_invoke++; return; } -void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_callback_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_callback, 1, __ATOMIC_RELAXED); + thread_global_stat->async_callback++; return; } -void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat *stat) +void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_tconsume_invoke, 1, __ATOMIC_RELAXED); + thread_global_stat->async_tconsume_invoke++; return; } -void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat *stat) +void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_tconsume_callback, 1, __ATOMIC_RELAXED); + thread_global_stat->async_tconsume_callback++; return; } -void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat *stat) +void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hincrby_invoke, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hmget_invoke++; return; } -void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat *stat) +void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hincrby_callback, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hincrby_callback++; return; } -void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat *stat) +void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hmget_invoke, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hmget_invoke++; return; } -void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat *stat) +void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hmget_callback, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hmget_callback++; return; } -void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_tconsume_failed, 1, __ATOMIC_RELAXED); + thread_global_stat->async_tconsume_failed++; return; } -void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hincrby_failed, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hincrby_failed++; return; } -void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat) +void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat_data *thread_global_stat) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - - __atomic_add_fetch(&local_stat_data->async_hmget_failed, 1, __ATOMIC_RELAXED); + thread_global_stat->async_hmget_failed++; return; } -void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +void shaper_global_stat_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic; - if (dir == SHAPING_GLOBAL_STAT_RX) { - __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED); - } else { - __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED); - } - - return; + struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic; + data->rx_pkts++; + data->rx_bytes += pkt_len; } -void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len) +void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic; + struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic; + data->tx_pkts++; + data->tx_bytes += pkt_len; +} - __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED); +void shaper_global_stat_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic; + data->drop_pkts++; + data->drop_bytes += pkt_len; return; } -void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len) +void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic; - - if (dir == SHAPING_GLOBAL_STAT_RX) { - __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED); - } else { - __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED); - } - - return; + struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; + data->rx_pkts++; + data->rx_bytes += pkt_len; } -void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len) +void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) { - struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic; + struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; + data->tx_pkts++; + data->tx_bytes += pkt_len; +} - __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED); - __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED); +void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len) +{ + struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic; + data->drop_pkts++; + data->drop_bytes += pkt_len; return; } -void shaper_global_stat_refresh(struct shaping_global_stat *stat) +void shaper_global_stat_refresh(struct shaping_ctx *ctx) { - struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data; - struct shaping_global_stat_traffic_data *all_traffic_data = &stat->local_stat_data.all_traffic; - struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat->local_stat_data.hit_policy_traffic; - - fieldstat_value_set(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], local_stat_data->curr_session_num); - fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], local_stat_data->queueing_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], local_stat_data->queueing_bytes); + static struct shaping_global_stat_data sum; + struct shaping_global_stat *global_stat = ctx->global_stat; + struct shaping_global_stat_data *stat_data = global_stat->stat_data; - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ERR_IDX], local_stat_data->ctrl_error); - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_OPENING_IDX], local_stat_data->ctrl_opening); - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_IDX], local_stat_data->ctrl_active); - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_CLOSE_IDX], local_stat_data->ctrl_close); - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], local_stat_data->ctrl_active_close); - fieldstat_value_set(stat->instance, stat->column_ids[CTRL_RESETALL_IDX], local_stat_data->ctrl_resetall); - fieldstat_value_set(stat->instance, stat->column_ids[SESSION_LOG_SEND_IDX], local_stat_data->session_log_send); + for (int i = 0; i < ctx->thread_num; i++) { + memcpy(&stat_data[i], &ctx->thread_ctx[i].thread_global_stat, sizeof(struct shaping_global_stat_data)); + } - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_INVOKE_IDX], local_stat_data->async_invoke); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_CALLBACK_IDX], local_stat_data->async_callback); + memset(&sum, 0, sizeof(struct shaping_global_stat_data)); + for (int i = 0; i < ctx->thread_num; i++) { + sum.curr_session_num += stat_data[i].curr_session_num; + sum.queueing_pkts += stat_data[i].queueing_pkts; + sum.queueing_bytes += stat_data[i].queueing_bytes; + + sum.ctrl_error += stat_data[i].ctrl_error; + sum.ctrl_opening += stat_data[i].ctrl_opening; + sum.ctrl_active += stat_data[i].ctrl_active; + sum.ctrl_close += stat_data[i].ctrl_close; + sum.ctrl_active_close += stat_data[i].ctrl_active_close; + sum.ctrl_resetall += stat_data[i].ctrl_resetall; + sum.session_log_send += stat_data[i].session_log_send; + + sum.async_invoke += stat_data[i].async_invoke; + sum.async_callback += stat_data[i].async_callback; + + sum.async_tconsume_invoke += stat_data[i].async_tconsume_invoke; + sum.async_tconsume_callback += stat_data[i].async_tconsume_callback; + sum.async_hincrby_invoke += stat_data[i].async_hincrby_invoke; + sum.async_hincrby_callback += stat_data[i].async_hincrby_callback; + sum.async_hmget_invoke += stat_data[i].async_hmget_invoke; + sum.async_hmget_callback += stat_data[i].async_hmget_callback; + + sum.async_tconsume_failed += stat_data[i].async_tconsume_failed; + sum.async_hincrby_failed += stat_data[i].async_hincrby_failed; + sum.async_hmget_failed += stat_data[i].async_hmget_failed; + + sum.all_traffic.rx_pkts += stat_data[i].all_traffic.rx_pkts; + sum.all_traffic.rx_bytes += stat_data[i].all_traffic.rx_bytes; + sum.all_traffic.tx_pkts += stat_data[i].all_traffic.tx_pkts; + sum.all_traffic.tx_bytes += stat_data[i].all_traffic.tx_bytes; + sum.all_traffic.drop_pkts += stat_data[i].all_traffic.drop_pkts; + sum.all_traffic.drop_bytes += stat_data[i].all_traffic.drop_bytes; + + sum.hit_policy_traffic.rx_pkts += stat_data[i].hit_policy_traffic.rx_pkts; + sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes; + sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts; + sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes; + sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts; + sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes; + } - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], local_stat_data->async_tconsume_invoke); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], local_stat_data->async_tconsume_callback); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], local_stat_data->async_hincrby_invoke); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], local_stat_data->async_hincrby_callback); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_INVOKE_IDX], local_stat_data->async_hmget_invoke); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], local_stat_data->async_hmget_callback); + struct shaping_global_stat_traffic_data *all_traffic_data = &sum.all_traffic; + struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &sum.hit_policy_traffic; + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CURR_SESSION_NUM_IDX], sum.curr_session_num); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_PKTS_IDX], sum.queueing_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_BYTES_IDX], sum.queueing_bytes); + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ERR_IDX], sum.ctrl_error); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_OPENING_IDX], sum.ctrl_opening); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_IDX], sum.ctrl_active); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_CLOSE_IDX], sum.ctrl_close); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], sum.ctrl_active_close); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_RESETALL_IDX], sum.ctrl_resetall); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[SESSION_LOG_SEND_IDX], sum.session_log_send); + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_INVOKE_IDX], sum.async_invoke); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_CALLBACK_IDX], sum.async_callback); + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], sum.async_tconsume_invoke); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], sum.async_tconsume_callback); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], sum.async_hincrby_invoke); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], sum.async_hincrby_callback); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], sum.async_hmget_invoke); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], sum.async_hmget_callback); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_FAILED], local_stat_data->async_tconsume_failed); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_FAILED], local_stat_data->async_hincrby_failed); - fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_FAILED], local_stat_data->async_hmget_failed); - - fieldstat_value_set(stat->instance, stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes); - fieldstat_value_set(stat->instance, stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes); - fieldstat_value_set(stat->instance, stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes); - - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); - fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); - - fieldstat_passive_output(stat->instance); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_FAILED], sum.async_tconsume_failed); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_FAILED], sum.async_hincrby_failed); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_FAILED], sum.async_hmget_failed); + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes); + + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts); + fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes); + + fieldstat_passive_output(global_stat->instance); }
\ No newline at end of file diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp index af4a7ee..4a42391 100644 --- a/shaping/src/shaper_session.cpp +++ b/shaping/src/shaper_session.cpp @@ -40,7 +40,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL); - shaper_global_stat_curr_session_inc(ctx->global_stat); + shaper_global_stat_curr_session_inc(&ctx->thread_global_stat); return sf; } @@ -153,7 +153,7 @@ static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shapi } marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &tx_buff, 1); - shaper_global_stat_session_log_send_num_inc(ctx->global_stat); + shaper_global_stat_session_log_send_num_inc(&ctx->thread_global_stat); END: if (addr_str) { @@ -184,10 +184,10 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct sf->flag |= SESSION_CLOSE; } - shaper_global_stat_ctrlpkt_active_close_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_active_close_inc(&ctx->thread_global_stat); session_table_delete_by_id(ctx->session_table, meta->session_id); - shaper_global_stat_curr_session_dec(ctx->global_stat); + shaper_global_stat_curr_session_dec(&ctx->thread_global_stat); return sf; } @@ -232,7 +232,7 @@ void shaper_session_data_free_cb(void *session_data, void *data) shaping_flow_free(ctx, sf); } - shaper_global_stat_curr_session_dec(ctx->global_stat); + shaper_global_stat_curr_session_dec(&ctx->thread_global_stat); return; }
\ No newline at end of file diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp index e9cd3e6..d8895fa 100644 --- a/shaping/src/shaper_stat.cpp +++ b/shaping/src/shaper_stat.cpp @@ -136,18 +136,19 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo { struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)cb_arg; struct shaping_thread_ctx *ctx = arg->ctx; + struct shaping_global_stat *global_stat = ctx->ref_ctx->global_stat; struct timespec curr_time; long long curr_time_us; clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(ctx->global_stat); - shaper_global_stat_hincrby_callback_inc(ctx->global_stat); + shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); + shaper_global_stat_hincrby_callback_inc(&ctx->thread_global_stat); if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_hincrby_failed_inc(ctx->global_stat); + shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat); } free(cb_arg); @@ -190,8 +191,8 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i clock_gettime(CLOCK_MONOTONIC, &curr_time); arg->ctx = ctx; arg->start_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_async_invoke_inc(ctx->global_stat); - shaper_global_stat_hincrby_invoke_inc(ctx->global_stat); + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len); memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile)); diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp index 8c120d9..e27a49f 100644 --- a/shaping/test/gtest_shaper.cpp +++ b/shaping/test/gtest_shaper.cpp @@ -272,7 +272,7 @@ TEST(single_session, udp_tx_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -351,7 +351,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -1552,7 +1552,7 @@ TEST(statistics, udp_drop_pkt) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); @@ -1614,7 +1614,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1); fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); sleep(2);//wait telegraf generate metric /*******judge global metric********/ @@ -1640,7 +1640,7 @@ TEST(statistics, udp_queueing_pkt) /***********send stat data here********************/ fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy - shaper_global_stat_refresh(ctx->global_stat); + shaper_global_stat_refresh(ctx); shaper_thread_resource_clear(); shaping_engine_destroy(ctx); |
