From eab63fb130f81b56f7a0425ae7c52b3ebfb45bdd Mon Sep 17 00:00:00 2001 From: 杨威 Date: Fri, 10 Jun 2022 14:22:05 +0800 Subject: ✨ feat(MSO_STREAM_LASTUPDATE_TIMESTAMP_MS): 新增流选项,支持获取当前流最近更新(活跃)时间 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- include/private/stream_internal.h | 1 + include/public/stream_inc/stream_control.h | 1 + src/dealpkt/deal_tcp.c | 9 ++- src/dealpkt/deal_udp.c | 5 +- src/dealpkt/plug_support.c | 12 ++++ src/dealpkt/stream_manage.c | 109 ----------------------------- 6 files changed, 24 insertions(+), 113 deletions(-) diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h index 02705af..2db5353 100644 --- a/include/private/stream_internal.h +++ b/include/private/stream_internal.h @@ -189,6 +189,7 @@ struct streaminfo_private */ unsigned long long global_stream_id; unsigned long long stream_create_timestamp_ms; + unsigned long long stream_lastupdate_timestamp_ms; /* ===8 bytes=== */ unsigned char layer_index:4; /* ʾǰļ, ײ㿪ʼ,һethernetʼ, 0ʼ, 4bit֧15Ƕ, ҲֹܷǶ׹(Ŀǰû) */ unsigned char create_dir_by_well_known_port:1; /* UDPTCPdata, well_known_port, ǰ˿ǿͻԭ򴴽 */ diff --git a/include/public/stream_inc/stream_control.h b/include/public/stream_inc/stream_control.h index 76c7b4a..c601e7a 100644 --- a/include/public/stream_inc/stream_control.h +++ b/include/public/stream_inc/stream_control.h @@ -45,6 +45,7 @@ enum MESA_stream_opt{ MSO_STREAM_PLUG_PME, /* opt_val type must be struct mso_plug_pme, this is a value-result argument, the caller should set plug_name and plug_entry_type, only support: TCP, TCP_ALL, UDP */ MSO_DROP_CURRENT_PKT, /* opt_val type must be int, value only be [0,1], notice the difference between MSO_DROP_CURRENT_PKT and MSO_DROP_STREAM, MSO_DROP_CURRENT_PKT only discard current packet, but MSO_DROP_STREAM discard all subsequent packets of stream */ MSO_HAVE_DUP_PKT, /* opt_val type must be int, value only be [0, 1, -2], if the current stream found duplicate packets ? 0:no; 1:yes; -2: not sure */ + MSO_STREAM_LASTUPDATE_TIMESTAMP_MS,/* latest pkt arrive timestamp of this stream, opt_val type must be unsigned long long */ __MSO_MAX, }; diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c index 82853c1..794294f 100644 --- a/src/dealpkt/deal_tcp.c +++ b/src/dealpkt/deal_tcp.c @@ -734,6 +734,7 @@ static struct streamindex *tcp_add_new_stream_bysyn(struct streamindex *pindex, pdetail_pr->C2S_first_ack_seq = ntohl (this_tcphdr->th_ack); } pdetail->lastmtime=g_CurrentTime; + pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pdetail_pr->link_state=STREAM_LINK_JUST_EST; pstream->stream_state=TCP_SYN_STATE; @@ -954,8 +955,9 @@ static struct streamindex *tcp_add_new_stream_bydata(struct streamindex *pindex, pstream->pktstate=OP_STATE_PENDING; } pdetail->createtime=g_CurrentTime; - pstream_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pdetail->lastmtime=g_CurrentTime; + pstream_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; + pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pdetail_pr->link_state=STREAM_LINK_DATA; if(pstream_pr->under_ddos_bypass){ @@ -2310,6 +2312,8 @@ static int tcp_deal_data_stream(struct streamindex *pindex,const void *this_iphd pdetail->lastmtime=(long)g_CurrentTime; + pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; + tcp_deal_ack(pstream,this_tcphdr); @@ -2397,8 +2401,6 @@ static int tcp_deal_data_stream(struct streamindex *pindex,const void *this_iphd ͳֵ, ԪӵʱһڷӼ. */ if(tcp_tuple4_reuse(pstream, pdetail_pr, this_tcphdr) == 0){ -// if(pdetail->lastmtime + 3 > (UINT64)g_CurrentTime){ - //printf("###### recv syn, but not tcp reuse, close return PASS for test!!\n"); return PASS; } } @@ -2895,6 +2897,7 @@ static int deal_tcp_stream(struct streamindex *pindex, const void *this_iphdr, s call_tcpall_after_reset = 1; } pdetail->lastmtime=g_CurrentTime; + pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pstream->addr.pkttype = PKT_TYPE_NORMAL;//add by lqy 20151222, init pkttype pdetail_pr->tcpoverlen=0;//add by lqy 20150325 diff --git a/src/dealpkt/deal_udp.c b/src/dealpkt/deal_udp.c index 28a2e2b..b077b91 100644 --- a/src/dealpkt/deal_udp.c +++ b/src/dealpkt/deal_udp.c @@ -37,6 +37,7 @@ static void udp_change_stream_state(struct streamindex *pindex, struct mesa_udp_ ulen = ntohs (udph->uh_ulen); datalen =ulen - sizeof (struct mesa_udp_hdr); pdetail->lastmtime=(long)g_CurrentTime; + a_udp_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; if(a_udp->curdir==DIR_C2S) { @@ -200,6 +201,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex, pstream_udp_pr->global_stream_id = get_global_stream_id(threadnum); pstream_udp_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pdetail->lastmtime=g_CurrentTime; + pstream_udp_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms; pstream_udp_pr->timeout=udp_reset_time; if(pstream_udp_pr->under_ddos_bypass){ @@ -470,7 +472,8 @@ static int dealipv4udppkt_dup_check(int tid, struct streaminfo_private *pstream_ is_dup_pkt = sapp_dup_pkt_identify_udp_v4(tid, pstream_pr, this_iphdr, udph, need_add_bloom_filter); } - return is_dup_pkt; + +return is_dup_pkt; } int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_iphdr, diff --git a/src/dealpkt/plug_support.c b/src/dealpkt/plug_support.c index 45f53e5..37813fc 100644 --- a/src/dealpkt/plug_support.c +++ b/src/dealpkt/plug_support.c @@ -1410,6 +1410,18 @@ int MESA_get_stream_opt(const struct streaminfo *pstream, enum MESA_stream_opt o *timestamp_ms = ((struct streaminfo_private *)pstream)->stream_create_timestamp_ms; } break; + case MSO_STREAM_LASTUPDATE_TIMESTAMP_MS: + { + if ((STREAM_TYPE_TCP != pstream->type) && (STREAM_TYPE_UDP != pstream->type)) + { + sapp_runtime_log(RLOG_LV_INFO, "%s,MESA_get_stream_opt() MSO_STREAM_LASTUPDATE_TIMESTAMP_MS error: stream type is not tcp or udp!\n",printaddr(&pstream->addr, pstream->threadnum)); + ret = -1; + break; + } + unsigned long long *timestamp_ms = (unsigned long long *)opt_val; + *timestamp_ms = ((struct streaminfo_private *)pstream)->stream_lastupdate_timestamp_ms; + } + break; case MSO_STREAM_PLUG_PME: ret = MESA_get_stream_plug_pme(pstream, opt_val, opt_val_len); diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c index 91ea577..ecf2d47 100644 --- a/src/dealpkt/stream_manage.c +++ b/src/dealpkt/stream_manage.c @@ -351,53 +351,6 @@ int stream_set_single_stream_timeout(const struct streaminfo *pstream,unsigned s /* óʱʱ, ǰһóʱʱ¼ */ sapp_global_mthread[pstream->threadnum].udp_stream_special_timeout_num++; } -#if 0 - lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]); - if(lru_list_root->head == lru_list_root->tail){ /* ǰʣһ, ƶ */ - ret = 0; - goto done; - } - - /* before-insert : head->tmp_index->tail */ - - assert(lru_list_root->head); - assert(lru_list_root->tail); - __do_streamleavlist(pindex, lru_list_root); /* ԭλժ */ - assert(lru_list_root->head); - assert(lru_list_root->tail); - /* - ǰõ³ʱʱ, LRUĩβʣʱ(ްʱ), - 仰˵, ̭ʱҲLRUĩβҪͺ, - Ҫtail(ʱµ)ƶ . - */ - /* after-insert : head->pindex->tmp_index->tail, tmp_index֮ǰ */ -long udp_times = 0; - tmp_index = lru_list_root->tail; /* timeoutֵ, Ӳױ̭, tailڵ㿪ʼȽ */ - while(tmp_index - && (tmp_index->stream.timeout != 0) /* timeoutĬΪ0, û賬ʱʱ, Ϊ޴ */ - && (timeout < (g_CurrentTime - tmp_index->stream.stream_public.pudpdetail->lastmtime))){ -printf("##### serach udp timeout place, %ld.....\n", udp_times++); - tmp_index = tmp_index->prev; - } - - /* λ */ - if(NULL == tmp_index){ /* һֱҵһڵ, pindexӦΪµhead */ - lru_list_root->head->prev = pindex; - pindex->next = lru_list_root->head; - lru_list_root->head = pindex; - pindex->prev = NULL; - }else{ - if(tmp_index->prev){ - tmp_index->prev->next = pindex; - }else{ - lru_list_root->head = pindex; /* Ϊheadڵ */ - } - pindex->prev = tmp_index->prev; - tmp_index->prev = pindex; - pindex->next = tmp_index; - } - lru_list_root->cnt++; /* __do_streamleavlist()м1, ˴ָ */ -#endif } else if (STREAM_TYPE_TCP==pstream->type) { @@ -415,50 +368,6 @@ printf("##### serach udp timeout place, %ld.....\n", udp_times++); /* óʱʱ, ǰһóʱʱ¼ */ sapp_global_mthread[pstream->threadnum].tcp_stream_special_timeout_num++; } -#if 0 - lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]); - if(lru_list_root->head == lru_list_root->tail){ /* ǰʣһ, ƶ */ - ret = 0; - goto done; - } - - assert(lru_list_root->head); - assert(lru_list_root->tail); - __do_streamleavlist(pindex, lru_list_root); /* ԭλժ, ˴ֱʹstreamleavlist, òִ */ - assert(lru_list_root->head); - assert(lru_list_root->tail); - /* - ǰõ³ʱʱ, LRUĩβʣʱ(ްʱ), - 仰˵, ̭ʱҲLRUĩβҪͺ, - Ҫtail(ʱµ)ƶ . - */ -long tcp_times = 0; - tmp_index = lru_list_root->tail; - while(tmp_index - && (tmp_index->stream.timeout != 0) /* timeoutĬΪ0, û賬ʱʱ, Ϊ޴ */ - && (timeout < (g_CurrentTime - tmp_index->stream.stream_public.ptcpdetail->lastmtime))){ -printf("##### serach tcp timeout place, %ld.....\n", tcp_times); - tmp_index = tmp_index->prev; - } - - /* λ */ - if(NULL == tmp_index){ /* һֱҵһڵ, pindexӦΪµhead */ - lru_list_root->head->prev = pindex; - pindex->next = lru_list_root->head; - lru_list_root->head = pindex; - pindex->prev = NULL; - }else{ - if(tmp_index->prev){ - tmp_index->prev->next = pindex; - }else{ - lru_list_root->head = pindex; /* Ϊheadڵ */ - } - pindex->prev = tmp_index->prev; - tmp_index->prev = pindex; - pindex->next = tmp_index; - } - lru_list_root->cnt++; /* __do_streamleavlist()м1, ˴ָ */ -#endif }else{ goto err; /* ֧TCPUDP */ } @@ -477,24 +386,6 @@ printf("##### serach tcp timeout place, %ld.....\n", tcp_times); err: return -1; } -#if 0 -void udp_set_stream_resettime(unsigned short timeout) -{ - udp_reset_time=timeout; -} -#endif - -#if 0 -void udp_set_stream_timeout(unsigned short timeout)//2014-12-25 lijia add, ݾƽ̨, xj_fd -{ - udp_reset_time=(unsigned short)timeout; -} - -void udp_set_stream_resettime(unsigned short timeout) -{ - udp_reset_time=(unsigned short)timeout; -}#endif -#endif int udp_set_stream_num(int udp1,int udp2,int udp3) { -- cgit v1.2.3