summaryrefslogtreecommitdiff
path: root/shaping/src/shaper.cpp
diff options
context:
space:
mode:
authorroot <[email protected]>2023-09-28 02:22:51 +0000
committerroot <[email protected]>2023-09-28 02:22:51 +0000
commit38e67120f364de179945cc677d8d3333cb397c21 (patch)
tree51b5e1841f027a64c7a5b01377b32ecd26b7e2a4 /shaping/src/shaper.cpp
parente510917ce787526063bd72dee7ffe661db914142 (diff)
add timer for each session, to refresh stat when no packet coming
Diffstat (limited to 'shaping/src/shaper.cpp')
-rw-r--r--shaping/src/shaper.cpp34
1 files changed, 32 insertions, 2 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp
index 5a3c58d..0f119c9 100644
--- a/shaping/src/shaper.cpp
+++ b/shaping/src/shaper.cpp
@@ -143,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node)
return;
}
-struct shaping_flow* shaping_flow_new()
+struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx)
{
struct shaping_node *s_node = NULL;
int i;
@@ -163,6 +163,8 @@ struct shaping_flow* shaping_flow_new()
TAILQ_INIT(&s_node->shaping_flow.packet_queue);
s_node->shaping_flow.ref_count = 1;
s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1;
+ timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS);
+ timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
return &s_node->shaping_flow;
@@ -176,6 +178,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf)
struct shaping_node *s_node = (struct shaping_node*)sf;
if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) {
+ timeouts_del(ctx->expires, &sf->timeout_handle);
shaper_stat_refresh(ctx, sf, ctx->thread_index, 1);
shaping_node_free(s_node);
}
@@ -894,6 +897,12 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu
END:
shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ time_t curr_time = time(NULL);
+ if (curr_time > sf->last_update_timeout_sec) {
+ timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ sf->last_update_timeout_sec = curr_time;
+ }
+
if(sf->flag & SESSION_CLOSE) {
if (shaper_queue_empty(sf)) {
char *addr_str = addr_tuple4_to_str(&sf->tuple4);
@@ -915,6 +924,25 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_
{
swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL);
+ struct timeout *t = NULL;
+ struct shaping_flow *sf = NULL;
+ time_t curr_time = time(NULL);
+ int cnt = 0;
+
+ if (curr_time > ctx->last_update_timeout_sec) {
+ timeouts_update(ctx->expires, curr_time);
+ ctx->last_update_timeout_sec = curr_time;
+ }
+
+ t = timeouts_get(ctx->expires);
+ while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) {
+ sf = container_of(t, struct shaping_flow, timeout_handle);
+ shaper_stat_refresh(ctx, sf, ctx->thread_index, 0);
+ timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC);
+ t = timeouts_get(ctx->expires);
+ cnt++;
+ }
+
if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) {
return;
}
@@ -1166,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx)
for (int i = 0; i < ctx->thread_num; i++) {
shaper_free(ctx->thread_ctx[i].sp);
session_table_destory(ctx->thread_ctx[i].session_table);
+ timeouts_close(ctx->thread_ctx[i].expires);
}
free(ctx->thread_ctx);
}
@@ -1180,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init()
{
struct shaping_system_conf conf;
struct shaping_ctx *ctx = NULL;
- int ret;
+ int ret, error;
memset(&conf, 0, sizeof(conf));
ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx));
@@ -1230,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init()
ctx->thread_ctx[i].maat_info = ctx->maat_info;
ctx->thread_ctx[i].marsio_info = ctx->marsio_info;
ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db;
+ ctx->thread_ctx[i].expires = timeouts_open(0, &error);
ctx->thread_ctx[i].ref_ctx = ctx;
memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf));
}