diff options
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 83 |
1 files changed, 41 insertions, 42 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 6ae35f0..de31504 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -259,9 +259,9 @@ void shaper_queue_clear(struct shaping_flow *sf, struct shaping_thread_ctx *ctx) shaper_stat_queueing_pkt_dec(&rule->primary.stat, pkt_wrapper->direction, ctx->thread_index); shaper_stat_drop_inc(&rule->primary.stat, pkt_wrapper->length, ctx->thread_index); - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); @@ -429,15 +429,15 @@ static void shaper_token_get_cb(const struct swarmkv_reply *reply, void * cb_arg clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(ctx->global_stat); - shaper_global_stat_tconsume_callback_inc(ctx->global_stat); + shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); + shaper_global_stat_tconsume_callback_inc(&ctx->thread_global_stat); LOG_INFO("Swarmkv reply type =%d, profile_id %d, direction =%d, integer =%llu",reply->type, profile->id, arg->direction, reply->integer); if (reply->type != SWARMKV_REPLY_INTEGER) { - shaper_global_stat_async_tconsume_failed_inc(ctx->global_stat); + shaper_global_stat_async_tconsume_failed_inc(&ctx->thread_global_stat); goto END; } @@ -558,8 +558,8 @@ static int shaper_token_get_from_profile(struct shaping_thread_ctx *ctx, struct arg->direction = direction; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_async_invoke_inc(ctx->global_stat); - sheper_global_stat_tconsume_invoke_inc(ctx->global_stat); + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + sheper_global_stat_tconsume_invoke_inc(&ctx->thread_global_stat); sf->ref_cnt++; @@ -610,15 +610,15 @@ static void shaper_queue_len_get_cb(const struct swarmkv_reply *reply, void * cb clock_gettime(CLOCK_MONOTONIC, &curr_time); curr_time_us = curr_time.tv_sec * MICRO_SECONDS_PER_SEC + curr_time.tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_swarmkv_latency_update(ctx->global_stat, curr_time_us - arg->start_time_us); + shaper_global_stat_swarmkv_latency_update(ctx->ref_ctx->global_stat, curr_time_us - arg->start_time_us); - shaper_global_stat_async_callback_inc(ctx->global_stat); - shaper_global_stat_hmget_callback_inc(ctx->global_stat); + shaper_global_stat_async_callback_inc(&ctx->thread_global_stat); + shaper_global_stat_hmget_callback_inc(&ctx->thread_global_stat); pf_hash_node->is_priority_blocked[priority] = 0; if (!reply || (reply->type != SWARMKV_REPLY_NIL && reply->type != SWARMKV_REPLY_ARRAY)) { - shaper_global_stat_async_hmget_failed_inc(ctx->global_stat); + shaper_global_stat_async_hmget_failed_inc(&ctx->thread_global_stat); goto END; } @@ -667,8 +667,8 @@ static int shaper_profile_is_priority_blocked(struct shaping_thread_ctx *ctx, st arg->priority = priority; arg->start_time_us = curr_timespec->tv_sec * MICRO_SECONDS_PER_SEC + curr_timespec->tv_nsec / NANO_SECONDS_PER_MICRO_SEC; - shaper_global_stat_async_invoke_inc(ctx->global_stat); - shaper_global_stat_hmget_invoke_inc(ctx->global_stat); + shaper_global_stat_async_invoke_inc(&ctx->thread_global_stat); + shaper_global_stat_hmget_invoke_inc(&ctx->thread_global_stat); swarmkv_async_command(ctx->swarmkv_db, shaper_queue_len_get_cb, arg, swarmkv_queue_len_get_cmd[priority], profile->id); END: @@ -959,17 +959,17 @@ static int shaper_polling_first_pkt_token_get(struct shaper *sp, struct shaping_ } break; case SHAPING_DROP: - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_drop_inc(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_buff_free(ctx->marsio_info->instance, &pkt_wrapper->pkt_buff, 1, 0, ctx->thread_index); shaper_packet_dequeue(sf); break; case SHAPING_FORWARD: - shaper_global_stat_queueing_dec(ctx->global_stat, pkt_wrapper->length); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, pkt_wrapper->length); + shaper_global_stat_queueing_dec(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, pkt_wrapper->length); marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &pkt_wrapper->pkt_buff, 1); shaper_packet_dequeue(sf); @@ -1030,8 +1030,8 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu if (meta->is_tcp_pure_ctrl) { shaper_token_consume_force(sf, meta); marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); shaper_stat_forward_all_rule_inc(stat, sf, meta->dir, meta->raw_len, ctx->thread_index); goto END;//for tcp pure control pkt, transmit it directly } @@ -1040,11 +1040,11 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu s_rule = &sf->matched_rule_infos[0]; if (0 == shaper_packet_enqueue(ctx, sf, rx_buff, meta)) { shaper_stat_queueing_pkt_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); - shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); } else { shaper_stat_drop_inc(&s_rule->primary.stat, meta->dir, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); } @@ -1054,17 +1054,17 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu shaping_ret = shaper_pkt_action_decide_no_queue(ctx, sf, meta, &sf->matched_rule_infos[sf->anchor].primary, rx_buff); switch (shaping_ret) { case SHAPING_QUEUED: - shaper_global_stat_queueing_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_queueing_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_DROP: marsio_buff_free(marsio_info->instance, &rx_buff, 1, 0, ctx->thread_index); - shaper_global_stat_drop_inc(ctx->global_stat, meta->raw_len); - shaper_global_stat_hit_policy_drop_inc(ctx->global_stat, meta->raw_len); + shaper_global_stat_drop_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_drop_inc(&ctx->thread_global_stat, meta->raw_len); break; case SHAPING_FORWARD: marsio_send_burst(marsio_info->mr_path, ctx->thread_index, &rx_buff, 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta->raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); + shaper_global_stat_hit_policy_throughput_tx_inc(&ctx->thread_global_stat, meta->raw_len); break; default: assert(0); @@ -1120,7 +1120,7 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ cnt++; } - if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) { + if (shaper_global_stat_queueing_pkts_get(&ctx->thread_global_stat) == 0) { return; } @@ -1169,23 +1169,23 @@ static struct shaping_flow* shaper_ctrl_pkt_session_handle(struct shaping_thread switch (ctrl_data.state) { case SESSION_STATE_OPENING: - shaper_global_stat_ctrlpkt_opening_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_opening_inc(&ctx->thread_global_stat); //sf = shaper_session_opening(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_ACTIVE: - shaper_global_stat_ctrlpkt_active_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_active_inc(&ctx->thread_global_stat); sf = shaper_session_active(ctx, meta, &ctrl_data, &raw_parser); break; case SESSION_STATE_CLOSING: - shaper_global_stat_ctrlpkt_close_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_close_inc(&ctx->thread_global_stat); sf = shaper_session_close(ctx, meta); break; case SESSION_STATE_RESETALL: - shaper_global_stat_ctrlpkt_resetall_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_resetall_inc(&ctx->thread_global_stat); sf = shaper_session_reset_all(ctx, meta); break; default: - shaper_global_stat_ctrlpkt_err_inc(ctx->global_stat); + shaper_global_stat_ctrlpkt_err_inc(&ctx->thread_global_stat); assert(0); } @@ -1207,7 +1207,7 @@ static struct shaping_flow *shaper_raw_pkt_session_handle(struct shaping_thread_ session_node = session_table_search_by_id(ctx->session_table, meta->session_id); if (session_node) { sf = (struct shaping_flow *)session_node->val_data; - shaper_global_stat_hit_policy_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta->raw_len); + shaper_global_stat_hit_policy_throughput_rx_inc(&ctx->thread_global_stat, meta->raw_len); } return sf; @@ -1233,11 +1233,11 @@ void shaper_packet_recv_and_process(struct shaping_thread_ctx *ctx) } else { sf = shaper_raw_pkt_session_handle(ctx, rx_buff[i], &meta); } - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_RX, meta.raw_len); + shaper_global_stat_throughput_rx_inc(&ctx->thread_global_stat, meta.raw_len); if (meta.is_ctrl_pkt || !sf || sf->rule_num == 0) {//ctrl pkt need send directly marsio_send_burst(ctx->marsio_info->mr_path, ctx->thread_index, &rx_buff[i], 1); - shaper_global_stat_throughput_inc(ctx->global_stat, SHAPING_GLOBAL_STAT_TX, meta.raw_len); + shaper_global_stat_throughput_tx_inc(&ctx->thread_global_stat, meta.raw_len); } else { shaping_packet_process(ctx, rx_buff[i], &meta, sf); } @@ -1421,7 +1421,7 @@ struct shaping_ctx *shaping_engine_init() goto ERROR; } - ctx->global_stat = shaper_global_stat_init(); + ctx->global_stat = shaper_global_stat_init(conf.work_thread_num); if (ctx->global_stat == NULL) { goto ERROR; } @@ -1432,7 +1432,6 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].thread_index = i; ctx->thread_ctx[i].sp = shaper_new(conf.priority_queue_len_max); ctx->thread_ctx[i].stat = ctx->stat; - ctx->thread_ctx[i].global_stat = ctx->global_stat; ctx->thread_ctx[i].session_table = session_table_create(); ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; |
