diff options
| author | root <[email protected]> | 2023-09-28 02:22:51 +0000 |
|---|---|---|
| committer | root <[email protected]> | 2023-09-28 02:22:51 +0000 |
| commit | 38e67120f364de179945cc677d8d3333cb397c21 (patch) | |
| tree | 51b5e1841f027a64c7a5b01377b32ecd26b7e2a4 /shaping/src/shaper.cpp | |
| parent | e510917ce787526063bd72dee7ffe661db914142 (diff) | |
add timer for each session, to refresh stat when no packet coming
Diffstat (limited to 'shaping/src/shaper.cpp')
| -rw-r--r-- | shaping/src/shaper.cpp | 34 |
1 files changed, 32 insertions, 2 deletions
diff --git a/shaping/src/shaper.cpp b/shaping/src/shaper.cpp index 5a3c58d..0f119c9 100644 --- a/shaping/src/shaper.cpp +++ b/shaping/src/shaper.cpp @@ -143,7 +143,7 @@ static void shaping_node_free(struct shaping_node *s_node) return; } -struct shaping_flow* shaping_flow_new() +struct shaping_flow* shaping_flow_new(struct shaping_thread_ctx *ctx) { struct shaping_node *s_node = NULL; int i; @@ -163,6 +163,8 @@ struct shaping_flow* shaping_flow_new() TAILQ_INIT(&s_node->shaping_flow.packet_queue); s_node->shaping_flow.ref_count = 1; s_node->shaping_flow.priority = SHAPING_PRIORITY_NUM_MAX - 1; + timeout_init(&s_node->shaping_flow.timeout_handle, TIMEOUT_ABS); + timeouts_add(ctx->expires, &s_node->shaping_flow.timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); return &s_node->shaping_flow; @@ -176,6 +178,7 @@ void shaping_flow_free(struct shaping_thread_ctx *ctx, struct shaping_flow *sf) struct shaping_node *s_node = (struct shaping_node*)sf; if (__atomic_sub_fetch(&sf->ref_count, 1, __ATOMIC_SEQ_CST) == 0) { + timeouts_del(ctx->expires, &sf->timeout_handle); shaper_stat_refresh(ctx, sf, ctx->thread_index, 1); shaping_node_free(s_node); } @@ -894,6 +897,12 @@ void shaping_packet_process(struct shaping_thread_ctx *ctx, marsio_buff_t *rx_bu END: shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + time_t curr_time = time(NULL); + if (curr_time > sf->last_update_timeout_sec) { + timeouts_add(ctx->expires, &sf->timeout_handle, curr_time + SHAPING_STAT_REFRESH_INTERVAL_SEC); + sf->last_update_timeout_sec = curr_time; + } + if(sf->flag & SESSION_CLOSE) { if (shaper_queue_empty(sf)) { char *addr_str = addr_tuple4_to_str(&sf->tuple4); @@ -915,6 +924,25 @@ void polling_entry(struct shaper *sp, struct shaping_stat *stat, struct shaping_ { swarmkv_caller_loop(ctx->swarmkv_db, SWARMKV_LOOP_NONBLOCK, NULL); + struct timeout *t = NULL; + struct shaping_flow *sf = NULL; + time_t curr_time = time(NULL); + int cnt = 0; + + if (curr_time > ctx->last_update_timeout_sec) { + timeouts_update(ctx->expires, curr_time); + ctx->last_update_timeout_sec = curr_time; + } + + t = timeouts_get(ctx->expires); + while (t && cnt < SHAPING_STAT_REFRESH_MAX_PER_POLLING) { + sf = container_of(t, struct shaping_flow, timeout_handle); + shaper_stat_refresh(ctx, sf, ctx->thread_index, 0); + timeouts_add(ctx->expires, &sf->timeout_handle, time(NULL) + SHAPING_STAT_REFRESH_INTERVAL_SEC); + t = timeouts_get(ctx->expires); + cnt++; + } + if (shaper_global_stat_queueing_pkts_get(ctx->global_stat) == 0) { return; } @@ -1166,6 +1194,7 @@ void shaping_engine_destroy(struct shaping_ctx *ctx) for (int i = 0; i < ctx->thread_num; i++) { shaper_free(ctx->thread_ctx[i].sp); session_table_destory(ctx->thread_ctx[i].session_table); + timeouts_close(ctx->thread_ctx[i].expires); } free(ctx->thread_ctx); } @@ -1180,7 +1209,7 @@ struct shaping_ctx *shaping_engine_init() { struct shaping_system_conf conf; struct shaping_ctx *ctx = NULL; - int ret; + int ret, error; memset(&conf, 0, sizeof(conf)); ctx = (struct shaping_ctx *)calloc(1, sizeof(struct shaping_ctx)); @@ -1230,6 +1259,7 @@ struct shaping_ctx *shaping_engine_init() ctx->thread_ctx[i].maat_info = ctx->maat_info; ctx->thread_ctx[i].marsio_info = ctx->marsio_info; ctx->thread_ctx[i].swarmkv_db = ctx->swarmkv_db; + ctx->thread_ctx[i].expires = timeouts_open(0, &error); ctx->thread_ctx[i].ref_ctx = ctx; memcpy(&ctx->thread_ctx[i].conf, &conf, sizeof(conf)); } |
