summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryangwei <[email protected]>2024-09-14 17:51:10 +0800
committeryangwei <[email protected]>2024-09-14 17:51:10 +0800
commit9cf9019635700d744285fd26654b593e4baddf0c (patch)
tree6d5454f4b711ac9235b91a56db0cc08745b3bdd2
parent07feab9f472f1cf987fd05024a61b127a7b403af (diff)
🐞 fix(del_stream_by_time): 移除在包处理流程中判断超时的逻辑,完全依赖polling
-rw-r--r--src/dealpkt/deal_tcp.c17
-rw-r--r--src/dealpkt/stream_manage.c93
2 files changed, 41 insertions, 69 deletions
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c
index 0ba46ce..30b0d87 100644
--- a/src/dealpkt/deal_tcp.c
+++ b/src/dealpkt/deal_tcp.c
@@ -18,7 +18,6 @@ static inline void update_stream_list_raw_pkt_pointer(struct streaminfo_private
void addr_reverse_memcpy(const struct streaminfo_private *stack_pr,void *daddr, const void *saddr, int addrtype, int addrlen);
int copy_ipport_union_addr(struct streaminfo *pstream_heap, struct streaminfo *pstream_stack, int reverse);
void iterate_stream_list(const struct streaminfo *stream);
-int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int thread_id, enum stream_type_t type);
int G_TCP_FLOW_STAT_PROJECT_ID = -1; /* 2016-07-14 lijia copy from sapp */
@@ -1196,16 +1195,6 @@ static void tcp_free_half(struct half_tcpstream *phalf,int threadnum)
//phalf = NULL;
}
-static int inline stream_in_timeout_list(const struct streamindex *pindex, const struct stream_list *plist)
-{
- if(timeout_pending((struct timeout *)&(pindex->timeout)))
- {
- return 1;
- }
- return 0;
-
-}
-
static void tcp_change_stream_tonouse(struct streamindex *pindex)
{
struct stream_list *plist;
@@ -1217,10 +1206,6 @@ static void tcp_change_stream_tonouse(struct streamindex *pindex)
plist=&(G_MESA_GLOBAL_STREAM[threadnum]->tcpList[pstream->stream_state]);
- if(stream_in_timeout_list(pindex, plist) == 0){
- return;
- }
-
streamleavlist(pindex,plist);
plist=&(G_MESA_GLOBAL_STREAM[threadnum]->tcpList[TCP_NOUSE_STATE]);
@@ -1239,8 +1224,6 @@ static void tcp_change_stream_tonouse(struct streamindex *pindex)
tcp_free_half(pdetail_pr->pserver,threadnum);
pdetail_pr->pserver = NULL;
}
-
- del_stream_by_time(plist, pindex, threadnum, STREAM_TYPE_TCP);
}
diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c
index 0bbcd6b..e03eec5 100644
--- a/src/dealpkt/stream_manage.c
+++ b/src/dealpkt/stream_manage.c
@@ -451,9 +451,9 @@ enum del_stream_by_time_returen_code
DEL_STREAM_BY_TIME_RET_NO_TOKEN = 3,
};
-int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int tid, enum stream_type_t type)
+int del_stream_by_time(struct stream_list *plist, int tid, enum stream_type_t type)
{
- int ret = DEL_STREAM_BY_TIME_RET_NORMAL;
+ int ret = 0;
struct timeout *t=NULL;
struct global_stream *g_stream = G_MESA_GLOBAL_STREAM[tid];
sapp_gval_mthread_sys_stat_t *local_sys_stat = &sapp_global_val->mthread_volatile[tid]->sys_stat;
@@ -474,51 +474,41 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr
}
if (plist->interval_to_next_timeout_ms == 0)
{
- t = timeouts_get(plist->streamindex_timer);
- if (t != NULL)
+ if (token_bucket_consume(plist->timeout_ratelimiter, 1, g_current_time_ms) > 0)
{
- if(type==STREAM_TYPE_UDP)local_sys_stat->count[SAPP_STAT_UDP_TRY_TIMEOUTS]++;
- else if(type==STREAM_TYPE_TCP)local_sys_stat->count[SAPP_STAT_TCP_TRY_TIMEOUTS]++;
-
- struct streamindex *pindex = (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout);
- struct streaminfo_private *pstream_pr = &(pindex->stream);
- struct streaminfo *pstream = &(pstream_pr->stream_public);
-
- if (pindex == current_drive_index)
+ t = timeouts_get(plist->streamindex_timer);
+ if (t != NULL)
{
+ if (type == STREAM_TYPE_UDP)
+ local_sys_stat->count[SAPP_STAT_UDP_TRY_TIMEOUTS]++;
+ else if (type == STREAM_TYPE_TCP)
+ local_sys_stat->count[SAPP_STAT_TCP_TRY_TIMEOUTS]++;
+
+ struct streamindex *pindex =
+ (struct streamindex *)sapp_get_struct_header(t, struct streamindex, timeout);
+ struct streaminfo_private *pstream_pr = &(pindex->stream);
+ struct streaminfo *pstream = &(pstream_pr->stream_public);
if (STREAM_TYPE_TCP == pstream->type)
{
((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
- }
- ret = DEL_STREAM_BY_TIME_RET_DEL_SELF;
- }
- else
- {
- if (token_bucket_consume(plist->timeout_ratelimiter, 1, g_current_time_ms) > 0)
- {
- if (STREAM_TYPE_TCP == pstream->type)
- {
- ((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
- tcp_free_stream(pindex, NULL, NULL, NULL);
- ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
- }
- else
- {
- pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT;
- udp_free_stream(pindex);
- ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
- }
+ tcp_free_stream(pindex, NULL, NULL, NULL);
+ ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
}
else
{
- timeouts_add(plist->streamindex_timer, t, g_CurrentTime_ms);
- if (type == STREAM_TYPE_UDP)
- local_sys_stat->count[SAPP_STAT_UDP_TIMEOUTS_OVERSPEED]++;
- else if (type == STREAM_TYPE_TCP)
- local_sys_stat->count[SAPP_STAT_TCP_TIMEOUTS_OVERSPEED]++;
+ pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT;
+ udp_free_stream(pindex);
+ ret = DEL_STREAM_BY_TIME_RET_DEL_OTHER;
}
}
}
+ else
+ {
+ if (type == STREAM_TYPE_UDP)
+ local_sys_stat->count[SAPP_STAT_UDP_TIMEOUTS_OVERSPEED]++;
+ else if (type == STREAM_TYPE_TCP)
+ local_sys_stat->count[SAPP_STAT_TCP_TIMEOUTS_OVERSPEED]++;
+ }
}
if(g_stream->user_define_timer_cnt > 0 && g_stream->interval_to_next_timeout_ms == 0)
{
@@ -550,20 +540,19 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr
int lrustream(struct streamindex *pindex)
{
- struct stream_list *plist=NULL;
- struct streaminfo_private *pstream_pr=&(pindex->stream);
- struct streaminfo *pstream = &pstream_pr->stream_public;
-
- if(STREAM_TYPE_UDP==pstream->type)
- {
- plist=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]);
+ struct stream_list *plist = NULL;
+ struct streaminfo_private *pstream_pr = &(pindex->stream);
+ struct streaminfo *pstream = &pstream_pr->stream_public;
- }
- else if (STREAM_TYPE_TCP==pstream->type)
- {
- plist=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]);
+ if (STREAM_TYPE_UDP == pstream->type)
+ {
+ plist = &(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]);
+ }
+ else if (STREAM_TYPE_TCP == pstream->type)
+ {
+ plist = &(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]);
+ }
- }
if (pstream->stream_state == TCP_SYN_STATE && pstream->type == STREAM_TYPE_TCP)
{
timeouts_add(plist->streamindex_timer, &pindex->timeout, tcp_opening_timeout * 1000 + g_CurrentTime_ms);
@@ -585,9 +574,9 @@ int lrustream(struct streamindex *pindex)
long set_timeout = pindex->stream.timeout;
if (set_timeout == 0)
set_timeout = MAX_DEFALUT_TIMEOUT_S;
- timeouts_add(plist->streamindex_timer, &pindex->timeout, set_timeout * 1000 + g_CurrentTime_ms);
+ timeouts_add(plist->streamindex_timer, &pindex->timeout, set_timeout * 1000 + g_current_time_ms);
}
- return del_stream_by_time(plist, pindex, pstream->threadnum, pstream->type);
+ return 0;
}
static int cmpaddr_mpls(const struct layer_addr_mpls *addr_heap, const struct layer_addr_mpls *addr_stack)
@@ -2542,7 +2531,7 @@ int polling_stream_timeout(int tid)
plist = &(G_MESA_GLOBAL_STREAM[tid]->udpList[i]);
if (plist->cnt > 0)
{
- ret = del_stream_by_time(plist, NULL, tid, STREAM_TYPE_UDP);
+ ret = del_stream_by_time(plist,tid, STREAM_TYPE_UDP);
if (ret > 0)
{
has_work = POLLING_STATE_WORK;
@@ -2556,7 +2545,7 @@ int polling_stream_timeout(int tid)
plist = &(G_MESA_GLOBAL_STREAM[tid]->tcpList[i]);
if (plist->cnt > 0)
{
- ret = del_stream_by_time(plist, NULL, tid, STREAM_TYPE_TCP);
+ ret = del_stream_by_time(plist,tid, STREAM_TYPE_TCP);
if (ret > 0)
{
has_work = POLLING_STATE_WORK;