summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author杨威 <[email protected]>2022-06-10 14:22:05 +0800
committer杨威 <[email protected]>2022-06-10 14:22:05 +0800
commiteab63fb130f81b56f7a0425ae7c52b3ebfb45bdd (patch)
treec72383c3341d9fa110c480aa65857c80deff43e6
parentb831e1c5b00d916aef08ae289fd5665861936ebd (diff)
✨ feat(MSO_STREAM_LASTUPDATE_TIMESTAMP_MS): 新增流选项,支持获取当前流最近更新(活跃)时间v4.2.86CT_inline_version
-rw-r--r--include/private/stream_internal.h1
-rw-r--r--include/public/stream_inc/stream_control.h1
-rw-r--r--src/dealpkt/deal_tcp.c9
-rw-r--r--src/dealpkt/deal_udp.c5
-rw-r--r--src/dealpkt/plug_support.c12
-rw-r--r--src/dealpkt/stream_manage.c109
6 files changed, 24 insertions, 113 deletions
diff --git a/include/private/stream_internal.h b/include/private/stream_internal.h
index 02705af..2db5353 100644
--- a/include/private/stream_internal.h
+++ b/include/private/stream_internal.h
@@ -189,6 +189,7 @@ struct streaminfo_private
*/
unsigned long long global_stream_id;
unsigned long long stream_create_timestamp_ms;
+ unsigned long long stream_lastupdate_timestamp_ms;
/* ===8 bytes=== */
unsigned char layer_index:4; /* ��ʾ��ǰ��ļ���, ����ײ㿪ʼ,һ����ethernet��ʼ, ��0��ʼ����, 4bit���֧��15��Ƕ��, Ҳ�ܷ�ֹ����Ƕ�׹���(Ŀǰ��û��������) */
unsigned char create_dir_by_well_known_port:1; /* UDP��TCP��data������, ����well_known_port����, �����ǰ���˿��ǿͻ���ԭ�򴴽��� */
diff --git a/include/public/stream_inc/stream_control.h b/include/public/stream_inc/stream_control.h
index 76c7b4a..c601e7a 100644
--- a/include/public/stream_inc/stream_control.h
+++ b/include/public/stream_inc/stream_control.h
@@ -45,6 +45,7 @@ enum MESA_stream_opt{
MSO_STREAM_PLUG_PME, /* opt_val type must be struct mso_plug_pme, this is a value-result argument, the caller should set plug_name and plug_entry_type, only support: TCP, TCP_ALL, UDP */
MSO_DROP_CURRENT_PKT, /* opt_val type must be int, value only be [0,1], notice the difference between MSO_DROP_CURRENT_PKT and MSO_DROP_STREAM, MSO_DROP_CURRENT_PKT only discard current packet, but MSO_DROP_STREAM discard all subsequent packets of stream */
MSO_HAVE_DUP_PKT, /* opt_val type must be int, value only be [0, 1, -2], if the current stream found duplicate packets ? 0:no; 1:yes; -2: not sure */
+ MSO_STREAM_LASTUPDATE_TIMESTAMP_MS,/* latest pkt arrive timestamp of this stream, opt_val type must be unsigned long long */
__MSO_MAX,
};
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c
index 82853c1..794294f 100644
--- a/src/dealpkt/deal_tcp.c
+++ b/src/dealpkt/deal_tcp.c
@@ -734,6 +734,7 @@ static struct streamindex *tcp_add_new_stream_bysyn(struct streamindex *pindex,
pdetail_pr->C2S_first_ack_seq = ntohl (this_tcphdr->th_ack);
}
pdetail->lastmtime=g_CurrentTime;
+ pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pdetail_pr->link_state=STREAM_LINK_JUST_EST;
pstream->stream_state=TCP_SYN_STATE;
@@ -954,8 +955,9 @@ static struct streamindex *tcp_add_new_stream_bydata(struct streamindex *pindex,
pstream->pktstate=OP_STATE_PENDING;
}
pdetail->createtime=g_CurrentTime;
- pstream_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pdetail->lastmtime=g_CurrentTime;
+ pstream_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
+ pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pdetail_pr->link_state=STREAM_LINK_DATA;
if(pstream_pr->under_ddos_bypass){
@@ -2310,6 +2312,8 @@ static int tcp_deal_data_stream(struct streamindex *pindex,const void *this_iphd
pdetail->lastmtime=(long)g_CurrentTime;
+ pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
+
tcp_deal_ack(pstream,this_tcphdr);
@@ -2397,8 +2401,6 @@ static int tcp_deal_data_stream(struct streamindex *pindex,const void *this_iphd
��������ͳ��ֵ, ��Ԫ�������������ӵ�ʱ����һ���ڷ��Ӽ�.
*/
if(tcp_tuple4_reuse(pstream, pdetail_pr, this_tcphdr) == 0){
-// if(pdetail->lastmtime + 3 > (UINT64)g_CurrentTime){
- //printf("###### recv syn, but not tcp reuse, close return PASS for test!!\n");
return PASS;
}
}
@@ -2895,6 +2897,7 @@ static int deal_tcp_stream(struct streamindex *pindex, const void *this_iphdr, s
call_tcpall_after_reset = 1;
}
pdetail->lastmtime=g_CurrentTime;
+ pstream_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pstream->addr.pkttype = PKT_TYPE_NORMAL;//add by lqy 20151222, init pkttype
pdetail_pr->tcpoverlen=0;//add by lqy 20150325
diff --git a/src/dealpkt/deal_udp.c b/src/dealpkt/deal_udp.c
index 28a2e2b..b077b91 100644
--- a/src/dealpkt/deal_udp.c
+++ b/src/dealpkt/deal_udp.c
@@ -37,6 +37,7 @@ static void udp_change_stream_state(struct streamindex *pindex, struct mesa_udp_
ulen = ntohs (udph->uh_ulen);
datalen =ulen - sizeof (struct mesa_udp_hdr);
pdetail->lastmtime=(long)g_CurrentTime;
+ a_udp_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
if(a_udp->curdir==DIR_C2S)
{
@@ -200,6 +201,7 @@ static struct streamindex *udp_add_new_stream(struct streamindex *pindex,
pstream_udp_pr->global_stream_id = get_global_stream_id(threadnum);
pstream_udp_pr->stream_create_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pdetail->lastmtime=g_CurrentTime;
+ pstream_udp_pr->stream_lastupdate_timestamp_ms=sapp_global_val->individual_volatile->current_time_ms;
pstream_udp_pr->timeout=udp_reset_time;
if(pstream_udp_pr->under_ddos_bypass){
@@ -470,7 +472,8 @@ static int dealipv4udppkt_dup_check(int tid, struct streaminfo_private *pstream_
is_dup_pkt = sapp_dup_pkt_identify_udp_v4(tid, pstream_pr, this_iphdr, udph, need_add_bloom_filter);
}
- return is_dup_pkt;
+
+return is_dup_pkt;
}
int dealipv4udppkt(struct streamindex *pindex, const struct mesa_ip4_hdr * this_iphdr,
diff --git a/src/dealpkt/plug_support.c b/src/dealpkt/plug_support.c
index 45f53e5..37813fc 100644
--- a/src/dealpkt/plug_support.c
+++ b/src/dealpkt/plug_support.c
@@ -1410,6 +1410,18 @@ int MESA_get_stream_opt(const struct streaminfo *pstream, enum MESA_stream_opt o
*timestamp_ms = ((struct streaminfo_private *)pstream)->stream_create_timestamp_ms;
}
break;
+ case MSO_STREAM_LASTUPDATE_TIMESTAMP_MS:
+ {
+ if ((STREAM_TYPE_TCP != pstream->type) && (STREAM_TYPE_UDP != pstream->type))
+ {
+ sapp_runtime_log(RLOG_LV_INFO, "%s,MESA_get_stream_opt() MSO_STREAM_LASTUPDATE_TIMESTAMP_MS error: stream type is not tcp or udp!\n",printaddr(&pstream->addr, pstream->threadnum));
+ ret = -1;
+ break;
+ }
+ unsigned long long *timestamp_ms = (unsigned long long *)opt_val;
+ *timestamp_ms = ((struct streaminfo_private *)pstream)->stream_lastupdate_timestamp_ms;
+ }
+ break;
case MSO_STREAM_PLUG_PME:
ret = MESA_get_stream_plug_pme(pstream, opt_val, opt_val_len);
diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c
index 91ea577..ecf2d47 100644
--- a/src/dealpkt/stream_manage.c
+++ b/src/dealpkt/stream_manage.c
@@ -351,53 +351,6 @@ int stream_set_single_stream_timeout(const struct streaminfo *pstream,unsigned s
/* �������������ó�ʱʱ��, ��ǰ����һ�����ó�ʱʱ���¼���� */
sapp_global_mthread[pstream->threadnum].udp_stream_special_timeout_num++;
}
-#if 0
- lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]);
- if(lru_list_root->head == lru_list_root->tail){ /* ��ǰ��ʣ���һ����, �����ƶ� */
- ret = 0;
- goto done;
- }
-
- /* before-insert : head->tmp_index->tail */
-
- assert(lru_list_root->head);
- assert(lru_list_root->tail);
- __do_streamleavlist(pindex, lru_list_root); /* ��ԭλ��ժ�� */
- assert(lru_list_root->head);
- assert(lru_list_root->tail);
- /*
- �����ǰ�����õ��³�ʱʱ��, ��LRUĩβ����ʣ��ʱ��(�ް�����ʱ��)����,
- ���仰˵, ��������̭������ʱ��Ҳ��LRUĩβ����Ҫ�ͺ�,
- ������Ҫ��tail(ʱ����µ�)�����ƶ� .
- */
- /* after-insert : head->pindex->tmp_index->tail, ����tmp_index֮ǰ */
-long udp_times = 0;
- tmp_index = lru_list_root->tail; /* �����timeoutֵ��, �Ӳ��ױ���̭��, ��tail�ڵ㿪ʼ�Ƚ� */
- while(tmp_index
- && (tmp_index->stream.timeout != 0) /* timeoutĬ��Ϊ0����, ��û�賬ʱʱ��, ����Ϊ�����޴� */
- && (timeout < (g_CurrentTime - tmp_index->stream.stream_public.pudpdetail->lastmtime))){
-printf("##### serach udp timeout place, %ld.....\n", udp_times++);
- tmp_index = tmp_index->prev;
- }
-
- /* ������� */
- if(NULL == tmp_index){ /* һֱ�ҵ����һ���ڵ�, pindexӦ����Ϊ�µ�head */
- lru_list_root->head->prev = pindex;
- pindex->next = lru_list_root->head;
- lru_list_root->head = pindex;
- pindex->prev = NULL;
- }else{
- if(tmp_index->prev){
- tmp_index->prev->next = pindex;
- }else{
- lru_list_root->head = pindex; /* ��Ϊhead�ڵ� */
- }
- pindex->prev = tmp_index->prev;
- tmp_index->prev = pindex;
- pindex->next = tmp_index;
- }
- lru_list_root->cnt++; /* ��__do_streamleavlist()�м�1, �˴��ָ����� */
-#endif
}
else if (STREAM_TYPE_TCP==pstream->type)
{
@@ -415,50 +368,6 @@ printf("##### serach udp timeout place, %ld.....\n", udp_times++);
/* �������������ó�ʱʱ��, ��ǰ����һ�����ó�ʱʱ���¼���� */
sapp_global_mthread[pstream->threadnum].tcp_stream_special_timeout_num++;
}
-#if 0
- lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]);
- if(lru_list_root->head == lru_list_root->tail){ /* ��ǰ��ʣ���һ����, �����ƶ� */
- ret = 0;
- goto done;
- }
-
- assert(lru_list_root->head);
- assert(lru_list_root->tail);
- __do_streamleavlist(pindex, lru_list_root); /* ��ԭλ��ժ��, �˴�����ֱ��ʹ��streamleavlist, ���ò��ִ��� */
- assert(lru_list_root->head);
- assert(lru_list_root->tail);
- /*
- �����ǰ�����õ��³�ʱʱ��, ��LRUĩβ����ʣ��ʱ��(�ް�����ʱ��)����,
- ���仰˵, ��������̭������ʱ��Ҳ��LRUĩβ����Ҫ�ͺ�,
- ������Ҫ��tail(ʱ����µ�)�����ƶ� .
- */
-long tcp_times = 0;
- tmp_index = lru_list_root->tail;
- while(tmp_index
- && (tmp_index->stream.timeout != 0) /* timeoutĬ��Ϊ0����, ��û�賬ʱʱ��, ����Ϊ�����޴� */
- && (timeout < (g_CurrentTime - tmp_index->stream.stream_public.ptcpdetail->lastmtime))){
-printf("##### serach tcp timeout place, %ld.....\n", tcp_times);
- tmp_index = tmp_index->prev;
- }
-
- /* ������� */
- if(NULL == tmp_index){ /* һֱ�ҵ����һ���ڵ�, pindexӦ����Ϊ�µ�head */
- lru_list_root->head->prev = pindex;
- pindex->next = lru_list_root->head;
- lru_list_root->head = pindex;
- pindex->prev = NULL;
- }else{
- if(tmp_index->prev){
- tmp_index->prev->next = pindex;
- }else{
- lru_list_root->head = pindex; /* ��Ϊhead�ڵ� */
- }
- pindex->prev = tmp_index->prev;
- tmp_index->prev = pindex;
- pindex->next = tmp_index;
- }
- lru_list_root->cnt++; /* ��__do_streamleavlist()�м�1, �˴��ָ����� */
-#endif
}else{
goto err; /* ��֧��TCP��UDP�� */
}
@@ -477,24 +386,6 @@ printf("##### serach tcp timeout place, %ld.....\n", tcp_times);
err:
return -1;
}
-#if 0
-void udp_set_stream_resettime(unsigned short timeout)
-{
- udp_reset_time=timeout;
-}
-#endif
-
-#if 0
-void udp_set_stream_timeout(unsigned short timeout)//2014-12-25 lijia add, ���ݾ�ƽ̨, xj_fd
-{
- udp_reset_time=(unsigned short)timeout;
-}
-
-void udp_set_stream_resettime(unsigned short timeout)
-{
- udp_reset_time=(unsigned short)timeout;
-}#endif
-#endif
int udp_set_stream_num(int udp1,int udp2,int udp3)
{