summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp83
1 files changed, 41 insertions, 42 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 6ae35f0..de31504 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -259,9 +259,9 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx)
shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index);
shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index);
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
@@ -429,15 +429,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(ctx->global_stat);
- shaper_global_stat_tconsume_callback_inc(ctx->global_stat);
+ shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
+ shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat);
LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer);
if (reply->type != SWARMKV_REPLY_INTEGER) {
- shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat);
+ shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat);
goto END;
}
@@ -558,8 +558,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct
arg->direction = direction;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- sheper_global_stat_tconsume_invoke_inc(ctx->global_stat);
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat);
sf->ref_cnt++;
@@ -610,15 +610,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb
clock_gettime(CLOCK_MONOTONIC, &curr_time);
curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us);
+ shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us);
- shaper_global_stat_async_callback_inc(ctx->global_stat);
- shaper_global_stat_hmget_callback_inc(ctx->global_stat);
+ shaper_global_stat_async_callback_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat);
pf_hash_node->is_priority_blocked[priority] = 0;
if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) {
- shaper_global_stat_async_hmget_failed_inc(ctx->global_stat);
+ shaper_global_stat_async_hmget_failed_inc(&ctx->thread_global_stat);
goto END;
}
@@ -667,8 +667,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st
arg->priority = priority;
arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC;
- shaper_global_stat_async_invoke_inc(ctx->global_stat);
- shaper_global_stat_hmget_invoke_inc(ctx->global_stat);
+ shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat);
+ shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat);
swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id);
END:
@@ -959,17 +959,17 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_
}
break;
case SHAPING_DROP:
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index);
shaper_packet_dequeue(sf);
break;
case SHAPING_FORWARD:
- shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length);
+ shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length);
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1);
shaper_packet_dequeue(sf);
@@ -1030,8 +1030,8 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
if (meta->is_tcp_pure_ctrl) {
shaper_token_consume_force(sf, meta);
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index);
goto END;//for tcp pure control pkt, transmit it directly
}
@@ -1040,11 +1040,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
s_rule = &sf->matched_rule_infos[0];
if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) {
shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
- shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
} else {
shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index);
- shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
}
@@ -1054,17 +1054,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff);
switch (shaping_ret) {
case SHAPING_QUEUED:
- shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len);
break;
case SHAPING_DROP:
marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index);
- shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len);
- shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len);
+ shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len);
break;
case SHAPING_FORWARD:
marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len);
break;
default:
assert(0);
@@ -1120,7 +1120,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
cnt++;
}
- if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) {
+ if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) {
return;
}
@@ -1169,23 +1169,23 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread
switch (ctrl_data.state) {
case SESSION_STATE_OPENING:
- shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_opening_inc(&ctx->thread_global_stat);
//sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser);
break;
case SESSION_STATE_ACTIVE:
- shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_active_inc(&ctx->thread_global_stat);
sf = shaper_session_active(ctx, meta, &ctrl_data, &raw_parser);
break;
case SESSION_STATE_CLOSING:
- shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_close_inc(&ctx->thread_global_stat);
sf = shaper_session_close(ctx, meta);
break;
case SESSION_STATE_RESETALL:
- shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_resetall_inc(&ctx->thread_global_stat);
sf = shaper_session_reset_all(ctx, meta);
break;
default:
- shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat);
+ shaper_global_stat_ctrlpkt_err_inc(&ctx->thread_global_stat);
assert(0);
}
@@ -1207,7 +1207,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_
session_node = session_table_search_by_id(ctx->session_table, meta->session_id);
if (session_node) {
sf = (struct shaping_flow *)session_node->val_data;
- shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len);
+ shaper_global_stat_hit_policy_throughput_rx_inc(&ctx->thread_global_stat, meta->raw_len);
}
return sf;
@@ -1233,11 +1233,11 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx)
} else {
sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta);
}
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len);
+ shaper_global_stat_throughput_rx_inc(&ctx->thread_global_stat, meta.raw_len);
if (meta.is_ctrl_pkt || !sf || sf->rule_num == 0) {//ctrl pkt need send directly
marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1);
- shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len);
+ shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len);
} else {
shaping_packet_process(ctx, rx_buff[i], &meta, sf);
}
@@ -1421,7 +1421,7 @@ struct shaping_ctx *shaping_engine_init()
goto ERROR;
}
- ctx->global_stat = shaper_global_stat_init();
+ ctx->global_stat = shaper_global_stat_init(conf.work_thread_num);
if (ctx->global_stat == NULL) {
goto ERROR;
}
@@ -1432,7 +1432,6 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].thread_index = i;
ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max);
ctx->thread_ctx[i].stat = ctx->stat;
- ctx->thread_ctx[i].global_stat = ctx->global_stat;
ctx->thread_ctx[i].session_table = session_table_create();
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;