diff options
| author | yangwei <[email protected]> | 2024-09-14 17:51:10 +0800 |
|---|---|---|
| committer | yangwei <[email protected]> | 2024-09-14 17:51:10 +0800 |
| commit | 9cf9019635700d744285fd26654b593e4baddf0c (patch) | |
| tree | 6d5454f4b711ac9235b91a56db0cc08745b3bdd2 | |
| parent | 07feab9f472f1cf987fd05024a61b127a7b403af (diff) | |
🐞 fix(del_stream_by_time): 移除在包处理流程中判断超时的逻辑,完全依赖polling
| -rw-r--r-- | src/dealpkt/deal_tcp.c | 17 | ||||
| -rw-r--r-- | src/dealpkt/stream_manage.c | 93 |
2 files changed, 41 insertions, 69 deletions
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c index 0ba46ce..30b0d87 100644 --- a/src/dealpkt/deal_tcp.c +++ b/src/dealpkt/deal_tcp.c @@ -18,7 +18,6 @@ static inline void update_stream_list_raw_pkt_pointer(struct streaminfo_private void addr_reverse_memcpy(const struct streaminfo_private *stack_pr,void *daddr, const void *saddr, int addrtype, int addrlen); int copy_ipport_union_addr(struct streaminfo *pstream_heap, struct streaminfo *pstream_stack, int reverse); void iterate_stream_list(const struct streaminfo *stream); -int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int thread_id, enum stream_type_t type); int G_TCP_FLOW_STAT_PROJECT_ID = -1; /* 2016-07-14 lijia copy from sapp */ @@ -1196,16 +1195,6 @@ static void tcp_free_half(struct half_tcpstream *phalf,int threadnum) //phalf = NULL; } -static int inline stream_in_timeout_list(const struct streamindex *pindex, const struct stream_list *plist) -{ - if(timeout_pending((struct timeout *)&(pindex->timeout))) - { - return 1; - } - return 0; - -} - static void tcp_change_stream_tonouse(struct streamindex *pindex) { struct stream_list *plist; @@ -1217,10 +1206,6 @@ static void tcp_change_stream_tonouse(struct streamindex *pindex) plist=&(G_MESA_GLOBAL_STREAM[threadnum]->tcpList[pstream->stream_state]); - if(stream_in_timeout_list(pindex, plist) == 0){ - return; - } - streamleavlist(pindex,plist); plist=&(G_MESA_GLOBAL_STREAM[threadnum]->tcpList[TCP_NOUSE_STATE]); @@ -1239,8 +1224,6 @@ static void tcp_change_stream_tonouse(struct streamindex *pindex) tcp_free_half(pdetail_pr->pserver,threadnum); pdetail_pr->pserver = NULL; } - - del_stream_by_time(plist, pindex, threadnum, STREAM_TYPE_TCP); } diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c index 0bbcd6b..e03eec5 100644 --- a/src/dealpkt/stream_manage.c +++ b/src/dealpkt/stream_manage.c @@ -451,9 +451,9 @@ enum del_stream_by_time_returen_code DEL_STREAM_BY_TIME_RET_NO_TOKEN = 3, }; -int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int tid, enum stream_type_t type) +int del_stream_by_time(struct stream_list *plist, int tid, enum stream_type_t type) { - int ret = DEL_STREAM_BY_TIME_RET_NORMAL; + int ret = 0; struct timeout *t=NULL; struct global_stream *g_stream = G_MESA_GLOBAL_STREAM[tid]; sapp_gval_mthread_sys_stat_t *local_sys_stat = &sapp_global_val->mthread_volatile[tid]->sys_stat; @@ -474,51 +474,41 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr } if (plist->interval_to_next_timeout_ms == 0) { - t = timeouts_get(plist->streamindex_timer); - if (t != NULL) + if (token_bucket_consume(plist->timeout_ratelimiter, 1, g_current_time_ms) > 0) { - if(type==STREAM_TYPE_UDP)local_sys_stat->count[SAPP_STAT_UDP_TRY_TIMEOUTS]++; - else if(type==STREAM_TYPE_TCP)local_sys_stat->count[SAPP_STAT_TCP_TRY_TIMEOUTS]++; - - struct streamindex *pindex = (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout); - struct streaminfo_private *pstream_pr = &(pindex->stream); - struct streaminfo *pstream = &(pstream_pr->stream_public); - - if (pindex == current_drive_index) + t = timeouts_get(plist->streamindex_timer); + if (t != NULL) { + if (type == STREAM_TYPE_UDP) + local_sys_stat->count[SAPP_STAT_UDP_TRY_TIMEOUTS]++; + else if (type == STREAM_TYPE_TCP) + local_sys_stat->count[SAPP_STAT_TCP_TRY_TIMEOUTS]++; + + struct streamindex *pindex = + (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout); + struct streaminfo_private *pstream_pr = &(pindex->stream); + struct streaminfo *pstream = &(pstream_pr->stream_public); if (STREAM_TYPE_TCP == pstream->type) { ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT; - } - ret = DEL_STREAM_BY_TIME_RET_DEL_SELF; - } - else - { - if (token_bucket_consume(plist->timeout_ratelimiter, 1, g_current_time_ms) > 0) - { - if (STREAM_TYPE_TCP == pstream->type) - { - ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT; - tcp_free_stream(pindex, NULL, NULL, NULL); - ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER; - } - else - { - pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT; - udp_free_stream(pindex); - ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER; - } + tcp_free_stream(pindex, NULL, NULL, NULL); + ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER; } else { - timeouts_add(plist->streamindex_timer, t, g_CurrentTime_ms); - if (type == STREAM_TYPE_UDP) - local_sys_stat->count[SAPP_STAT_UDP_TIMEOUTS_OVERSPEED]++; - else if (type == STREAM_TYPE_TCP) - local_sys_stat->count[SAPP_STAT_TCP_TIMEOUTS_OVERSPEED]++; + pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT; + udp_free_stream(pindex); + ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER; } } } + else + { + if (type == STREAM_TYPE_UDP) + local_sys_stat->count[SAPP_STAT_UDP_TIMEOUTS_OVERSPEED]++; + else if (type == STREAM_TYPE_TCP) + local_sys_stat->count[SAPP_STAT_TCP_TIMEOUTS_OVERSPEED]++; + } } if(g_stream->user_define_timer_cnt > 0 && g_stream->interval_to_next_timeout_ms == 0) { @@ -550,20 +540,19 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr int lrustream(struct streamindex *pindex) { - struct stream_list *plist=NULL; - struct streaminfo_private *pstream_pr=&(pindex->stream); - struct streaminfo *pstream = &pstream_pr->stream_public; - - if(STREAM_TYPE_UDP==pstream->type) - { - plist=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]); + struct stream_list *plist = NULL; + struct streaminfo_private *pstream_pr = &(pindex->stream); + struct streaminfo *pstream = &pstream_pr->stream_public; - } - else if (STREAM_TYPE_TCP==pstream->type) - { - plist=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]); + if (STREAM_TYPE_UDP == pstream->type) + { + plist = &(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]); + } + else if (STREAM_TYPE_TCP == pstream->type) + { + plist = &(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]); + } - } if (pstream->stream_state == TCP_SYN_STATE && pstream->type == STREAM_TYPE_TCP) { timeouts_add(plist->streamindex_timer, &pindex->timeout, tcp_opening_timeout * 1000 + g_CurrentTime_ms); @@ -585,9 +574,9 @@ int lrustream(struct streamindex *pindex) long set_timeout = pindex->stream.timeout; if (set_timeout == 0) set_timeout = MAX_DEFALUT_TIMEOUT_S; - timeouts_add(plist->streamindex_timer, &pindex->timeout, set_timeout * 1000 + g_CurrentTime_ms); + timeouts_add(plist->streamindex_timer, &pindex->timeout, set_timeout * 1000 + g_current_time_ms); } - return del_stream_by_time(plist, pindex, pstream->threadnum, pstream->type); + return 0; } static int cmpaddr_mpls(const struct layer_addr_mpls *addr_heap, const struct layer_addr_mpls *addr_stack) @@ -2542,7 +2531,7 @@ int polling_stream_timeout(int tid) plist = &(G_MESA_GLOBAL_STREAM[tid]->udpList[i]); if (plist->cnt > 0) { - ret = del_stream_by_time(plist, NULL, tid, STREAM_TYPE_UDP); + ret = del_stream_by_time(plist,tid, STREAM_TYPE_UDP); if (ret > 0) { has_work = POLLING_STATE_WORK; @@ -2556,7 +2545,7 @@ int polling_stream_timeout(int tid) plist = &(G_MESA_GLOBAL_STREAM[tid]->tcpList[i]); if (plist->cnt > 0) { - ret = del_stream_by_time(plist, NULL, tid, STREAM_TYPE_TCP); + ret = del_stream_by_time(plist,tid, STREAM_TYPE_TCP); if (ret > 0) { has_work = POLLING_STATE_WORK; |
