summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorroot <[email protected]>2023-12-15 10:50:52 +0000
committerroot <[email protected]>2023-12-15 10:50:52 +0000
commit55c8ad6b4c439b29f681a8c5e604646f81b8a768 (patch)
tree3f04d8590b26c41bf735eb931865979a1c87f8f6
parent908b0f3a8e6a492b6ba185cda47beae3a5aef39d (diff)
global stat 由原子操作改为每个线程自己计数,每次输出时合并
-rw-r--r--shaping/include/shaper.h3
-rw-r--r--shaping/include/shaper_global_stat.h84
-rw-r--r--shaping/src/main.cpp5
-rw-r--r--shaping/src/shaper.cpp83
-rw-r--r--shaping/src/shaper_global_stat.cpp337
-rw-r--r--shaping/src/shaper_session.cpp10
-rw-r--r--shaping/src/shaper_stat.cpp13
-rw-r--r--shaping/test/gtest_shaper.cpp10
8 files changed, 276 insertions, 269 deletions
diff --git a/shaping/include/shaper.h b/shaping/include/shaper.h
index 9ae3846..ea6e749 100644
--- a/shaping/include/shaper.h
+++ b/shaping/include/shaper.h
@@ -6,6 +6,7 @@
#include "session_table.h"
#include "utils.h"
#include "shaper_stat.h"
+#include "shaper_global_stat.h"
extern "C" {
#include "timeout.h"
}
@@ -52,10 +53,10 @@ struct shaping_system_conf {
struct shaping_thread_ctx {
pthread_t tid;
int thread_index;
+ struct shaping_global_stat_data thread_global_stat;
struct shaping_ctx *ref_ctx;
struct shaper *sp;
struct shaping_stat *stat;
- struct shaping_global_stat *global_stat;
struct shaping_marsio_info *marsio_info;
struct swarmkv *swarmkv_db;//handle of swarmkv
int swarmkv_aqm_prob;
diff --git a/shaping/include/shaper_global_stat.h b/shaping/include/shaper_global_stat.h
index 78ef41f..940745e 100644
--- a/shaping/include/shaper_global_stat.h
+++ b/shaping/include/shaper_global_stat.h
@@ -1,9 +1,6 @@
-#include <fieldstat.h>
+#pragma once
-enum shaping_global_stat_dir {
- SHAPING_GLOBAL_STAT_RX = 0,
- SHAPING_GLOBAL_STAT_TX
-};
+#include <fieldstat.h>
enum shaping_global_stat_column_index {
CURR_SESSION_NUM_IDX = 0,
@@ -86,47 +83,50 @@ struct shaping_global_stat {
struct fieldstat_instance *instance;
int column_ids[GLOBAL_STAT_COLUNM_IDX_MAX];
int swarmkv_latency_summary_id;
- struct shaping_global_stat_data local_stat_data;
+ struct shaping_global_stat_data *stat_data;
int output_interval_s;
};
-struct shaping_global_stat* shaper_global_stat_init();
+struct shaping_global_stat* shaper_global_stat_init(int work_thread_num);
void shaper_global_stat_destroy(struct shaping_global_stat *stat);
void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat, long long latency_us);
-void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat);
-void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len);
-void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len);
-long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat *stat);
-
-void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat);
-
-void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat);
-
-void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat *stat);
-
-void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat);
-void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat);
-
-void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len);
-void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len);
-
-void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len);
-void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len);
-
-void shaper_global_stat_refresh(struct shaping_global_stat *stat); \ No newline at end of file
+void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat);
+
+void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat_data *thread_global_stat);
+
+void shaper_global_stat_async_invoke_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_async_callback_inc(struct shaping_global_stat_data *thread_global_stat);
+
+void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat_data *thread_global_stat);
+
+void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat_data *thread_global_stat);
+void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat_data *thread_global_stat);
+
+void shaper_global_stat_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+
+void shaper_global_stat_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+
+void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len);
+
+void shaper_global_stat_refresh(struct shaping_ctx *ctx); \ No newline at end of file
diff --git a/shaping/src/main.cpp b/shaping/src/main.cpp
index 89cb176..d31f768 100644
--- a/shaping/src/main.cpp
+++ b/shaping/src/main.cpp
@@ -22,6 +22,7 @@ static void *shaper_thread_loop(void *data)
{
char thread_name[16] = {0};
struct shaping_thread_ctx *ctx = (struct shaping_thread_ctx *)data;
+ int output_interval_s = ctx->ref_ctx->global_stat->output_interval_s;
snprintf(thread_name, sizeof(thread_name), "shape-work-%d", ctx->thread_index);
prctl(PR_SET_NAME, (unsigned long long)thread_name, NULL, NULL, NULL);
@@ -40,7 +41,7 @@ static void *shaper_thread_loop(void *data)
session_table_reset_with_callback(ctx->session_table, shaper_session_data_free_cb, ctx);
__atomic_fetch_and(&ctx->session_need_reset, 0, __ATOMIC_SEQ_CST);
}
- marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, ctx->global_stat->output_interval_s * 1000);
+ marsio_poll_wait(ctx->marsio_info->instance, &ctx->marsio_info->mr_dev, 1, ctx->thread_index, output_interval_s * 1000);
}
shaper_thread_resource_clear();
@@ -111,7 +112,7 @@ int main(int argc, char **argv)
while(!quit) {
time_t curr_time = time(NULL);
if (curr_time - last_update_time >= ctx->global_stat->output_interval_s) {
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
last_update_time = curr_time;
}
sleep(ctx->global_stat->output_interval_s);
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6ae35f0..de31504 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -259,9 +259,9 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index);
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -429,15 +429,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(ctx->global_stat);
- shaper_global_stat_tconsume_callback_inc(ctx->global_stat);
+ shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
+ shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat);
+ shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
goto END;
}
@@ -558,8 +558,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
arg->direction = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- sheper_global_stat_tconsume_invoke_inc(ctx->global_stat);
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
@@ -610,15 +610,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(ctx->global_stat);
- shaper_global_stat_hmget_callback_inc(ctx->global_stat);
+ shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat);
pf_hash_node->is_priority_blocked[priority] = 0;
if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) {
- shaper_global_stat_async_hmget_failed_inc(ctx->global_stat);
+ shaper_global_stat_async_hmget_failed_inc(&ctx->thread_global_stat);
goto END;
}
@@ -667,8 +667,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg->priority = priority;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- shaper_global_stat_hmget_invoke_inc(ctx->global_stat);
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
END:
@@ -959,17 +959,17 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
break;
case SHAPING_DROP:
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
break;
case SHAPING_FORWARD:
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
shaper_packet_dequeue(sf);
@@ -1030,8 +1030,8 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (meta->is_tcp_pure_ctrl) {
shaper_token_consume_force(sf, meta);
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
@@ -1040,11 +1040,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
s_rule = &sf->matched_rule_infos[0];
if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) {
shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
- shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
} else {
shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
- shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
@@ -1054,17 +1054,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff);
switch (shaping_ret) {
case SHAPING_QUEUED:
- shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
break;
case SHAPING_DROP:
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
- shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
break;
default:
assert(0);
@@ -1120,7 +1120,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
cnt++;
}
- if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) {
+ if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) {
return;
}
@@ -1169,23 +1169,23 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread
switch (ctrl_data.state) {
case SESSION_STATE_OPENING:
- shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_opening_inc(&ctx->thread_global_stat);
//sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser);
break;
case SESSION_STATE_ACTIVE:
- shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_active_inc(&ctx->thread_global_stat);
sf = shaper_session_active(ctx, meta, &ctrl_data, &raw_parser);
break;
case SESSION_STATE_CLOSING:
- shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_close_inc(&ctx->thread_global_stat);
sf = shaper_session_close(ctx, meta);
break;
case SESSION_STATE_RESETALL:
- shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_resetall_inc(&ctx->thread_global_stat);
sf = shaper_session_reset_all(ctx, meta);
break;
default:
- shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_err_inc(&ctx->thread_global_stat);
assert(0);
}
@@ -1207,7 +1207,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
session_node = session_table_search_by_id(ctx->session_table, meta->session_id);
if (session_node) {
sf = (struct shaping_flow *)session_node->val_data;
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_rx_inc(&ctx->thread_global_stat, meta->raw_len);
}
return sf;
@@ -1233,11 +1233,11 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
} else {
sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
}
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len);
+ shaper_global_stat_throughput_rx_inc(&ctx->thread_global_stat, meta.raw_len);
if (meta.is_ctrl_pkt || !sf || sf->rule_num == 0) {//ctrl pkt need send directly
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len);
} else {
shaping_packet_process(ctx, rx_buff[i], &meta, sf);
}
@@ -1421,7 +1421,7 @@ struct shaping_ctx *shaping_engine_init()
goto ERROR;
}
- ctx->global_stat = shaper_global_stat_init();
+ ctx->global_stat = shaper_global_stat_init(conf.work_thread_num);
if (ctx->global_stat == NULL) {
goto ERROR;
}
@@ -1432,7 +1432,6 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
ctx->thread_ctx[i].stat = ctx->stat;
- ctx->thread_ctx[i].global_stat = ctx->global_stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
diff --git a/shaping/src/shaper_global_stat.cpp b/shaping/src/shaper_global_stat.cpp
index 748becb..771438a 100644
--- a/shaping/src/shaper_global_stat.cpp
+++ b/shaping/src/shaper_global_stat.cpp
@@ -1,3 +1,4 @@
+#include <cstring>
#include <stdlib.h>
#include <MESA/MESA_prof_load.h>
@@ -68,12 +69,13 @@ static void shaper_global_stat_fieldstat_reg(struct shaping_global_stat *stat)
return;
}
-struct shaping_global_stat* shaper_global_stat_init()
+struct shaping_global_stat* shaper_global_stat_init(int work_thread_num)
{
struct shaping_global_stat *stat = NULL;
struct shaping_global_stat_conf conf;
stat = (struct shaping_global_stat*)calloc(1, sizeof(struct shaping_global_stat));
+ stat->stat_data = (struct shaping_global_stat_data*)calloc(work_thread_num, sizeof(struct shaping_global_stat_data));
if (shaper_global_stat_conf_load(stat, &conf) != 0) {
LOG_ERROR("%s: shaping init metric conf failed", LOG_TAG_STAT);
@@ -140,305 +142,308 @@ void shaper_global_stat_swarmkv_latency_update(struct shaping_global_stat *stat,
return;
}
-void shaper_global_stat_curr_session_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_curr_session_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED);
+ thread_global_stat->curr_session_num++;
return;
}
-void shaper_global_stat_curr_session_dec(struct shaping_global_stat *stat)
+void shaper_global_stat_curr_session_dec(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_sub_fetch(&local_stat_data->curr_session_num, 1, __ATOMIC_RELAXED);
+ thread_global_stat->curr_session_num--;
return;
}
-void shaper_global_stat_queueing_inc(struct shaping_global_stat *stat, int pkt_len)
+void shaper_global_stat_queueing_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED);
+ thread_global_stat->queueing_pkts++;
+ thread_global_stat->queueing_bytes += pkt_len;
return;
}
-void shaper_global_stat_queueing_dec(struct shaping_global_stat *stat, int pkt_len)
+void shaper_global_stat_queueing_dec(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_sub_fetch(&local_stat_data->queueing_pkts, 1, __ATOMIC_RELAXED);
- __atomic_sub_fetch(&local_stat_data->queueing_bytes, pkt_len, __ATOMIC_RELAXED);
+ thread_global_stat->queueing_pkts--;
+ thread_global_stat->queueing_bytes -= pkt_len;
return;
}
-long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat *stat)
+long long shaper_global_stat_queueing_pkts_get(struct shaping_global_stat_data *thread_global_stat)
{
- return stat->local_stat_data.queueing_pkts;
+ return thread_global_stat->queueing_pkts;
}
-void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_err_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_error, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_error++;
return;
}
-void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_opening_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_opening, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_opening++;
return;
}
-void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_active_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_active, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_active++;
return;
}
-void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_close_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_close, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_close++;
return;
}
-void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_active_close_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_active_close, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_active_close++;
return;
}
-void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_ctrlpkt_resetall_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->ctrl_resetall, 1, __ATOMIC_RELAXED);
+ thread_global_stat->ctrl_resetall++;
return;
}
-void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_session_log_send_num_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->session_log_send, 1, __ATOMIC_RELAXED);
+ thread_global_stat->session_log_send++;
return;
}
-void shaper_global_stat_async_invoke_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_invoke_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_invoke, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_invoke++;
return;
}
-void shaper_global_stat_async_callback_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_callback_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_callback, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_callback++;
return;
}
-void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat *stat)
+void sheper_global_stat_tconsume_invoke_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_tconsume_invoke, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_tconsume_invoke++;
return;
}
-void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_tconsume_callback_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_tconsume_callback, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_tconsume_callback++;
return;
}
-void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_hincrby_invoke_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hincrby_invoke, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hmget_invoke++;
return;
}
-void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_hincrby_callback_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hincrby_callback, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hincrby_callback++;
return;
}
-void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_hmget_invoke_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hmget_invoke, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hmget_invoke++;
return;
}
-void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_hmget_callback_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hmget_callback, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hmget_callback++;
return;
}
-void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_tconsume_failed_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_tconsume_failed, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_tconsume_failed++;
return;
}
-void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_hincrby_failed_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hincrby_failed, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hincrby_failed++;
return;
}
-void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat *stat)
+void shaper_global_stat_async_hmget_failed_inc(struct shaping_global_stat_data *thread_global_stat)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
-
- __atomic_add_fetch(&local_stat_data->async_hmget_failed, 1, __ATOMIC_RELAXED);
+ thread_global_stat->async_hmget_failed++;
return;
}
-void shaper_global_stat_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+void shaper_global_stat_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic;
- if (dir == SHAPING_GLOBAL_STAT_RX) {
- __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED);
- } else {
- __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED);
- }
-
- return;
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic;
+ data->rx_pkts++;
+ data->rx_bytes += pkt_len;
}
-void shaper_global_stat_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+void shaper_global_stat_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.all_traffic;
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic;
+ data->tx_pkts++;
+ data->tx_bytes += pkt_len;
+}
- __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED);
+void shaper_global_stat_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->all_traffic;
+ data->drop_pkts++;
+ data->drop_bytes += pkt_len;
return;
}
-void shaper_global_stat_hit_policy_throughput_inc(struct shaping_global_stat *stat, enum shaping_global_stat_dir dir, int pkt_len)
+void shaper_global_stat_hit_policy_throughput_rx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic;
-
- if (dir == SHAPING_GLOBAL_STAT_RX) {
- __atomic_add_fetch(&data->rx_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->rx_bytes, pkt_len, __ATOMIC_RELAXED);
- } else {
- __atomic_add_fetch(&data->tx_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->tx_bytes, pkt_len, __ATOMIC_RELAXED);
- }
-
- return;
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
+ data->rx_pkts++;
+ data->rx_bytes += pkt_len;
}
-void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat *stat, int pkt_len)
+void shaper_global_stat_hit_policy_throughput_tx_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
{
- struct shaping_global_stat_traffic_data *data = &stat->local_stat_data.hit_policy_traffic;
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
+ data->tx_pkts++;
+ data->tx_bytes += pkt_len;
+}
- __atomic_add_fetch(&data->drop_pkts, 1, __ATOMIC_RELAXED);
- __atomic_add_fetch(&data->drop_bytes, pkt_len, __ATOMIC_RELAXED);
+void shaper_global_stat_hit_policy_drop_inc(struct shaping_global_stat_data *thread_global_stat, int pkt_len)
+{
+ struct shaping_global_stat_traffic_data *data = &thread_global_stat->hit_policy_traffic;
+ data->drop_pkts++;
+ data->drop_bytes += pkt_len;
return;
}
-void shaper_global_stat_refresh(struct shaping_global_stat *stat)
+void shaper_global_stat_refresh(struct shaping_ctx *ctx)
{
- struct shaping_global_stat_data *local_stat_data = &stat->local_stat_data;
- struct shaping_global_stat_traffic_data *all_traffic_data = &stat->local_stat_data.all_traffic;
- struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &stat->local_stat_data.hit_policy_traffic;
-
- fieldstat_value_set(stat->instance, stat->column_ids[CURR_SESSION_NUM_IDX], local_stat_data->curr_session_num);
- fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_PKTS_IDX], local_stat_data->queueing_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[QUEUEING_BYTES_IDX], local_stat_data->queueing_bytes);
+ static struct shaping_global_stat_data sum;
+ struct shaping_global_stat *global_stat = ctx->global_stat;
+ struct shaping_global_stat_data *stat_data = global_stat->stat_data;
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ERR_IDX], local_stat_data->ctrl_error);
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_OPENING_IDX], local_stat_data->ctrl_opening);
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_IDX], local_stat_data->ctrl_active);
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_CLOSE_IDX], local_stat_data->ctrl_close);
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], local_stat_data->ctrl_active_close);
- fieldstat_value_set(stat->instance, stat->column_ids[CTRL_RESETALL_IDX], local_stat_data->ctrl_resetall);
- fieldstat_value_set(stat->instance, stat->column_ids[SESSION_LOG_SEND_IDX], local_stat_data->session_log_send);
+ for (int i = 0; i < ctx->thread_num; i++) {
+ memcpy(&stat_data[i], &ctx->thread_ctx[i].thread_global_stat, sizeof(struct shaping_global_stat_data));
+ }
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_INVOKE_IDX], local_stat_data->async_invoke);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_CALLBACK_IDX], local_stat_data->async_callback);
+ memset(&sum, 0, sizeof(struct shaping_global_stat_data));
+ for (int i = 0; i < ctx->thread_num; i++) {
+ sum.curr_session_num += stat_data[i].curr_session_num;
+ sum.queueing_pkts += stat_data[i].queueing_pkts;
+ sum.queueing_bytes += stat_data[i].queueing_bytes;
+
+ sum.ctrl_error += stat_data[i].ctrl_error;
+ sum.ctrl_opening += stat_data[i].ctrl_opening;
+ sum.ctrl_active += stat_data[i].ctrl_active;
+ sum.ctrl_close += stat_data[i].ctrl_close;
+ sum.ctrl_active_close += stat_data[i].ctrl_active_close;
+ sum.ctrl_resetall += stat_data[i].ctrl_resetall;
+ sum.session_log_send += stat_data[i].session_log_send;
+
+ sum.async_invoke += stat_data[i].async_invoke;
+ sum.async_callback += stat_data[i].async_callback;
+
+ sum.async_tconsume_invoke += stat_data[i].async_tconsume_invoke;
+ sum.async_tconsume_callback += stat_data[i].async_tconsume_callback;
+ sum.async_hincrby_invoke += stat_data[i].async_hincrby_invoke;
+ sum.async_hincrby_callback += stat_data[i].async_hincrby_callback;
+ sum.async_hmget_invoke += stat_data[i].async_hmget_invoke;
+ sum.async_hmget_callback += stat_data[i].async_hmget_callback;
+
+ sum.async_tconsume_failed += stat_data[i].async_tconsume_failed;
+ sum.async_hincrby_failed += stat_data[i].async_hincrby_failed;
+ sum.async_hmget_failed += stat_data[i].async_hmget_failed;
+
+ sum.all_traffic.rx_pkts += stat_data[i].all_traffic.rx_pkts;
+ sum.all_traffic.rx_bytes += stat_data[i].all_traffic.rx_bytes;
+ sum.all_traffic.tx_pkts += stat_data[i].all_traffic.tx_pkts;
+ sum.all_traffic.tx_bytes += stat_data[i].all_traffic.tx_bytes;
+ sum.all_traffic.drop_pkts += stat_data[i].all_traffic.drop_pkts;
+ sum.all_traffic.drop_bytes += stat_data[i].all_traffic.drop_bytes;
+
+ sum.hit_policy_traffic.rx_pkts += stat_data[i].hit_policy_traffic.rx_pkts;
+ sum.hit_policy_traffic.rx_bytes += stat_data[i].hit_policy_traffic.rx_bytes;
+ sum.hit_policy_traffic.tx_pkts += stat_data[i].hit_policy_traffic.tx_pkts;
+ sum.hit_policy_traffic.tx_bytes += stat_data[i].hit_policy_traffic.tx_bytes;
+ sum.hit_policy_traffic.drop_pkts += stat_data[i].hit_policy_traffic.drop_pkts;
+ sum.hit_policy_traffic.drop_bytes += stat_data[i].hit_policy_traffic.drop_bytes;
+ }
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], local_stat_data->async_tconsume_invoke);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], local_stat_data->async_tconsume_callback);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], local_stat_data->async_hincrby_invoke);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], local_stat_data->async_hincrby_callback);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_INVOKE_IDX], local_stat_data->async_hmget_invoke);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], local_stat_data->async_hmget_callback);
+ struct shaping_global_stat_traffic_data *all_traffic_data = &sum.all_traffic;
+ struct shaping_global_stat_traffic_data *hit_policy_traffic_data = &sum.hit_policy_traffic;
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CURR_SESSION_NUM_IDX], sum.curr_session_num);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_PKTS_IDX], sum.queueing_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[QUEUEING_BYTES_IDX], sum.queueing_bytes);
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ERR_IDX], sum.ctrl_error);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_OPENING_IDX], sum.ctrl_opening);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_IDX], sum.ctrl_active);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_CLOSE_IDX], sum.ctrl_close);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_ACTIVE_CLOSE_IDX], sum.ctrl_active_close);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[CTRL_RESETALL_IDX], sum.ctrl_resetall);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[SESSION_LOG_SEND_IDX], sum.session_log_send);
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_INVOKE_IDX], sum.async_invoke);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_CALLBACK_IDX], sum.async_callback);
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_INVOKE_IDX], sum.async_tconsume_invoke);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_CALLBACK_IDX], sum.async_tconsume_callback);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_INVOKE_IDX], sum.async_hincrby_invoke);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_CALLBACK_IDX], sum.async_hincrby_callback);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_INVOKE_IDX], sum.async_hmget_invoke);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_CALLBACK_IDX], sum.async_hmget_callback);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_TCONSUME_FAILED], local_stat_data->async_tconsume_failed);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HINCRBY_FAILED], local_stat_data->async_hincrby_failed);
- fieldstat_value_set(stat->instance, stat->column_ids[ASYNC_HMGET_FAILED], local_stat_data->async_hmget_failed);
-
- fieldstat_value_set(stat->instance, stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes);
- fieldstat_value_set(stat->instance, stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes);
- fieldstat_value_set(stat->instance, stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes);
-
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
- fieldstat_value_set(stat->instance, stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
-
- fieldstat_passive_output(stat->instance);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_TCONSUME_FAILED], sum.async_tconsume_failed);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HINCRBY_FAILED], sum.async_hincrby_failed);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[ASYNC_HMGET_FAILED], sum.async_hmget_failed);
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_PKTS_IDX], all_traffic_data->rx_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[RX_BYTES_IDX], all_traffic_data->rx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_PKTS_IDX], all_traffic_data->tx_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[TX_BYTES_IDX], all_traffic_data->tx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_PKTS_IDX], all_traffic_data->drop_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[DROP_BYTES_IDX], all_traffic_data->drop_bytes);
+
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_PKTS_IDX], hit_policy_traffic_data->rx_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_RX_BYTES_IDX], hit_policy_traffic_data->rx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_PKTS_IDX], hit_policy_traffic_data->tx_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_TX_BYTES_IDX], hit_policy_traffic_data->tx_bytes);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_PKTS_IDX], hit_policy_traffic_data->drop_pkts);
+ fieldstat_value_set(global_stat->instance, global_stat->column_ids[HIT_POLICY_DROP_BYTES_IDX], hit_policy_traffic_data->drop_bytes);
+
+ fieldstat_passive_output(global_stat->instance);
} \ No newline at end of file
diff --git a/shaping/src/shaper_session.cpp b/shaping/src/shaper_session.cpp
index af4a7ee..4a42391 100644
--- a/shaping/src/shaper_session.cpp
+++ b/shaping/src/shaper_session.cpp
@@ -40,7 +40,7 @@ struct shaping_flow* shaper_session_opening(struct shaping_thread_ctx *ctx, stru
session_table_insert(ctx->session_table, meta->session_id, &sf->tuple4, sf, NULL);
- shaper_global_stat_curr_session_inc(ctx->global_stat);
+ shaper_global_stat_curr_session_inc(&ctx->thread_global_stat);
return sf;
}
@@ -153,7 +153,7 @@ static void shaper_session_log_send(struct shaping_thread_ctx *ctx, struct shapi
}
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &tx_buff, 1);
- shaper_global_stat_session_log_send_num_inc(ctx->global_stat);
+ shaper_global_stat_session_log_send_num_inc(&ctx->thread_global_stat);
END:
if (addr_str) {
@@ -184,10 +184,10 @@ struct shaping_flow* shaper_session_close(struct shaping_thread_ctx *ctx, struct
sf->flag |= SESSION_CLOSE;
}
- shaper_global_stat_ctrlpkt_active_close_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_active_close_inc(&ctx->thread_global_stat);
session_table_delete_by_id(ctx->session_table, meta->session_id);
- shaper_global_stat_curr_session_dec(ctx->global_stat);
+ shaper_global_stat_curr_session_dec(&ctx->thread_global_stat);
return sf;
}
@@ -232,7 +232,7 @@ void shaper_session_data_free_cb(void *session_data, void *data)
shaping_flow_free(ctx, sf);
}
- shaper_global_stat_curr_session_dec(ctx->global_stat);
+ shaper_global_stat_curr_session_dec(&ctx->thread_global_stat);
return;
} \ No newline at end of file
diff --git a/shaping/src/shaper_stat.cpp b/shaping/src/shaper_stat.cpp
index e9cd3e6..d8895fa 100644
--- a/shaping/src/shaper_stat.cpp
+++ b/shaping/src/shaper_stat.cpp
@@ -136,18 +136,19 @@ static void shaper_stat_swarmkv_hincrby_cb(const struct swarmkv_reply *reply, vo
{
struct shaping_hincrby_cb_arg *arg = (struct shaping_hincrby_cb_arg *)cb_arg;
struct shaping_thread_ctx *ctx = arg->ctx;
+ struct shaping_global_stat *global_stat = ctx->ref_ctx->global_stat;
struct timespec curr_time;
long long curr_time_us;
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(ctx->global_stat);
- shaper_global_stat_hincrby_callback_inc(ctx->global_stat);
+ shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hincrby_callback_inc(&ctx->thread_global_stat);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_hincrby_failed_inc(ctx->global_stat);
+ shaper_global_stat_async_hincrby_failed_inc(&ctx->thread_global_stat);
}
free(cb_arg);
@@ -190,8 +191,8 @@ static void shaper_stat_profile_metirc_refresh(struct shaping_thread_ctx *ctx, i
clock_gettime(CLOCK_MONOTONIC, &curr_time);
arg->ctx = ctx;
arg->start_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- shaper_global_stat_hincrby_invoke_inc(ctx->global_stat);
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hincrby_invoke_inc(&ctx->thread_global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_stat_swarmkv_hincrby_cb, arg, "HINCRBY tsg-shaping-%d priority-%d %lld", profile->id, profile->priority, profile_stat->in.queue_len + profile_stat->out.queue_len);
memset(profile_stat, 0, sizeof(struct shaping_stat_for_profile));
diff --git a/shaping/test/gtest_shaper.cpp b/shaping/test/gtest_shaper.cpp
index 8c120d9..e27a49f 100644
--- a/shaping/test/gtest_shaper.cpp
+++ b/shaping/test/gtest_shaper.cpp
@@ -272,7 +272,7 @@ TEST(single_session, udp_tx_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -351,7 +351,7 @@ TEST(max_min_host_fairness_profile, udp_tx_in_order)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -1552,7 +1552,7 @@ TEST(statistics, udp_drop_pkt)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);
@@ -1614,7 +1614,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
shaper_stat_refresh(&ctx->thread_ctx[0], sf, ctx->thread_ctx[0].thread_index, 1);
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
sleep(2);//wait telegraf generate metric
/*******judge global metric********/
@@ -1640,7 +1640,7 @@ TEST(statistics, udp_queueing_pkt)
/***********send stat data here********************/
fieldstat_dynamic_passive_output(ctx->stat->instance);//send metric manualy
- shaper_global_stat_refresh(ctx->global_stat);
+ shaper_global_stat_refresh(ctx);
shaper_thread_resource_clear();
shaping_engine_destroy(ctx);