summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
author杨威 <[email protected]>2023-01-07 16:51:01 +0800
committer杨威 <[email protected]>2023-01-09 15:05:04 +0800
commit0a4057a7df79352de0270b782cfdcf9ac9443870 (patch)
treee13f6c89dc9e093549e797bb9bc1fc65e0402a10
parentc8070f2aa7c563a229d79b69870601af6780ec01 (diff)
🧪 test(test_app.so): 增加timer相关的示例
-rw-r--r--include/private/stream_manage.h8
-rw-r--r--src/dealpkt/callapp.c14
-rw-r--r--src/dealpkt/deal_tcp.c4
-rw-r--r--src/dealpkt/plug_support.c13
-rw-r--r--src/dealpkt/stream_manage.c40
-rw-r--r--test/test_app_sapp.c34
6 files changed, 60 insertions, 53 deletions
diff --git a/include/private/stream_manage.h b/include/private/stream_manage.h
index 3fb3d69..9819f31 100644
--- a/include/private/stream_manage.h
+++ b/include/private/stream_manage.h
@@ -87,8 +87,6 @@ struct stream_list
int cnt;
int max_cnt;
struct timeouts *streamindex_timer;
- struct timeouts *user_define_timer;
- long user_define_timer_cnt;
long last_update_timer_s;
timeout_error_t timer_error;
};
@@ -106,15 +104,19 @@ struct global_stream
struct streamindex **tcp_stream_table;
unsigned short *tcp_stream_talbe_hash_count; /* lijia comment: һ��HASH_Slot�³�ͻ����Ԫ�ظ��� */
struct stream_list tcpList[MAX_TCP_STATE];
+
struct streamindex **udp_stream_table;
unsigned short *udp_stream_talbe_hash_count;/* lijia comment: һ��HASH_Slot�³�ͻ����Ԫ�ظ��� */
-
struct stream_list udpList[MAX_UDP_STATE];
struct STREAM_INDEX_LIST freeList;
struct stream_index_list_item *__freeList_real_head;
int freeList_cnt;
int freeList_max_cnt;
+
+ struct timeouts *user_define_timer;
+ long user_define_timer_cnt;
+ timeout_error_t timer_error;
};
typedef struct _stStreamFunInfo
diff --git a/src/dealpkt/callapp.c b/src/dealpkt/callapp.c
index 5137d91..a9aec00 100644
--- a/src/dealpkt/callapp.c
+++ b/src/dealpkt/callapp.c
@@ -739,20 +739,10 @@ int stream_process(struct streaminfo *a_stream,const void *this_iphdr, const voi
{
call_streamentry(a_stream, this_iphdr, transport_hdr, raw_pkt, pFunInfo);
pNext=pFunInfo->next;
- struct stream_list *lru_list_root;
- if(STREAM_TYPE_TCP != a_stream->type)
- {
- lru_list_root=&(G_MESA_GLOBAL_STREAM[a_stream->threadnum]->tcpList[a_stream->stream_state]);
- }
- else
- {
- lru_list_root=&(G_MESA_GLOBAL_STREAM[a_stream->threadnum]->udpList[a_stream->stream_state]);
- }
-
if(timeout_pending(&pFunInfo->timeout))
{
- timeouts_del(lru_list_root->user_define_timer ,&pFunInfo->timeout);
- lru_list_root->user_define_timer_cnt-=1;
+ timeouts_del(G_MESA_GLOBAL_STREAM[a_stream->threadnum]->user_define_timer ,&pFunInfo->timeout);
+ G_MESA_GLOBAL_STREAM[a_stream->threadnum]->user_define_timer_cnt-=1;
}
sapp_mem_free(SAPP_MEM_DYN_PLUG_CTRL,a_stream->threadnum,pFunInfo);
diff --git a/src/dealpkt/deal_tcp.c b/src/dealpkt/deal_tcp.c
index 5b6b660..5dd68c7 100644
--- a/src/dealpkt/deal_tcp.c
+++ b/src/dealpkt/deal_tcp.c
@@ -53,7 +53,7 @@ static inline void update_stream_list_raw_pkt_pointer(struct streaminfo_private
void addr_reverse_memcpy(const struct streaminfo_private *stack_pr,void *daddr, const void *saddr, int addrtype, int addrlen);
int copy_ipport_union_addr(struct streaminfo *pstream_heap, struct streaminfo *pstream_stack, int reverse);
void iterate_stream_list(const struct streaminfo *stream);
-int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index);
+int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int thread_id);
//int g_kill_tcp_remedy_sw = 0; /* 2016-06-21 lijia add, �Ƿ����FD���� */
@@ -1264,7 +1264,7 @@ static void tcp_change_stream_tonouse(struct streamindex *pindex)
�޷���TCP_NOUSE_STATE״̬�µ�����̭��ȥ,
���Դ˴���ɨ��һ��TCP_NOUSE_STATE�ij�ʱ��.
*/
- del_stream_by_time(plist, pindex);
+ del_stream_by_time(plist, pindex, threadnum);
}
diff --git a/src/dealpkt/plug_support.c b/src/dealpkt/plug_support.c
index fdb8af9..6b00bd9 100644
--- a/src/dealpkt/plug_support.c
+++ b/src/dealpkt/plug_support.c
@@ -799,20 +799,13 @@ int MESA_set_stream_opt(const struct streaminfo *pstream, enum MESA_stream_opt o
StreamFunInfo *funinfo = (StreamFunInfo *)pstream_pr->cur_plugin_cb_func;
funinfo->pstream = pstream;
funinfo->set_timer_s = set_timer_s;
- if(STREAM_TYPE_TCP != pstream->type)
- {
- lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->tcpList[pstream->stream_state]);
- }
- else
- {
- lru_list_root=&(G_MESA_GLOBAL_STREAM[pstream->threadnum]->udpList[pstream->stream_state]);
- }
if(false == timeout_pending(&funinfo->timeout))
{
timeout_init(&funinfo->timeout, TIMEOUT_ABS);
}
- timeouts_add(lru_list_root->user_define_timer, &funinfo->timeout, set_timer_s+g_CurrentTime);
- lru_list_root->user_define_timer_cnt+=1;
+ timeouts_add(G_MESA_GLOBAL_STREAM[pstream->threadnum]->user_define_timer, &funinfo->timeout, set_timer_s+g_CurrentTime);
+ G_MESA_GLOBAL_STREAM[pstream->threadnum]->user_define_timer_cnt+=1;
+ ret = 0;
}
break;
diff --git a/src/dealpkt/stream_manage.c b/src/dealpkt/stream_manage.c
index 0abcaeb..1247bd9 100644
--- a/src/dealpkt/stream_manage.c
+++ b/src/dealpkt/stream_manage.c
@@ -25,6 +25,7 @@
*/
#include "sapp_api.h"
#include "sapp_private_api.h"
+#include "timeout.h"
#ifdef __cplusplus
extern "C" {
@@ -81,19 +82,18 @@ static int init_stream_detail(struct global_stream *g_stream)
{
int i = 0;
memset(g_stream, 0, sizeof(struct global_stream));
+ g_stream->user_define_timer = timeouts_open(0, &g_stream->timer_error);
for (i = 0; i < MAX_TCP_STATE; i++)
{
g_stream->tcpList[i].max_cnt = tcpstate_num[i];
g_stream->tcpList[i].cnt = 0;
g_stream->tcpList[i].streamindex_timer = timeouts_open(0, &g_stream->tcpList[i].timer_error);
- g_stream->tcpList[i].user_define_timer = timeouts_open(0, &g_stream->tcpList[i].timer_error);
}
for (i = 0; i < MAX_UDP_STATE; i++)
{
g_stream->udpList[i].max_cnt = udpstate_num[i];
g_stream->udpList[i].cnt = 0;
g_stream->udpList[i].streamindex_timer = timeouts_open(0, &g_stream->udpList[i].timer_error);
- g_stream->udpList[i].user_define_timer = timeouts_open(0, &g_stream->udpList[i].timer_error);
}
@@ -437,15 +437,16 @@ void free_streamindex(int threadnum,struct streamindex *pindex)
modify by lqy 20150612
��ʱɾ��������ͷ����ʼ��һ��ɾ��
*/
-int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index)
+int del_stream_by_time(struct stream_list *plist, const struct streamindex *current_drive_index, int thread_id)
{
-
+ int ret = 0;
+ struct global_stream *g_stream = G_MESA_GLOBAL_STREAM[thread_id];
if(plist->last_update_timer_s < g_CurrentTime)
{
timeouts_update(plist->streamindex_timer, g_CurrentTime);
- if(true == timeouts_pending(plist->streamindex_timer))
+ if(g_stream->user_define_timer_cnt > 0)
{
- timeouts_update(plist->user_define_timer, g_CurrentTime);
+ timeouts_update(g_stream->user_define_timer, g_CurrentTime);
}
plist->last_update_timer_s = g_CurrentTime;
}
@@ -463,25 +464,25 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr
/* 2016-12-15 lijia add, ��ʱreset֮ǰ, ��Ҫ����link_state */
((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
}
- return 1;
+ ret = 1;
}
if (STREAM_TYPE_TCP == pstream->type)
{
((struct tcpdetail_private *)(pstream->pdetail))->link_state = STREAM_LINK_TIMEOUT;
tcp_free_stream(pindex, NULL, NULL, NULL);
- return 2;
+ ret = 2;
}
else
{
pstream_pr->stream_close_reason = STREAM_CLOSE_REASON_TIMEOUT;
udp_free_stream(pindex);
- return 2;
+ ret = 2;
}
}
- if(timeouts_pending(plist->streamindex_timer))
+ if(g_stream->user_define_timer_cnt > 0)
{
- t = timeouts_get(plist->user_define_timer);
+ t = timeouts_get(g_stream->user_define_timer);
if (t != NULL)
{
StreamFunInfo *funinfo = (StreamFunInfo *)sapp_get_struct_header(t, StreamFunInfo, timeout);
@@ -496,15 +497,15 @@ int del_stream_by_time(struct stream_list *plist, const struct streamindex *curr
if(app_ret == APP_STATE_GIVEME)
{
- timeouts_add(plist->user_define_timer, t, funinfo->set_timer_s+g_CurrentTime);
+ timeouts_add(g_stream->user_define_timer, t, funinfo->set_timer_s+g_CurrentTime);
}
else
{
- plist->user_define_timer_cnt -= 1;
+ g_stream->user_define_timer_cnt -= 1;
}
}
}
- return 0;
+ return ret;
}
/*
@@ -531,7 +532,7 @@ int lrustream(struct streamindex *pindex)
}
//if(g_CurrentTime > plist->last_check_timer_s)
{
- int ret = del_stream_by_time(plist, pindex);
+ int ret = del_stream_by_time(plist, pindex, pstream->threadnum);
if (1 == ret)
{
return 1;
@@ -1969,6 +1970,7 @@ void free_thread_stream(int thread_seq)
struct timeouts_it it_all;
g_stream=G_MESA_GLOBAL_STREAM[thread_seq];
+ timeouts_close(g_stream->user_define_timer);
for(j=0;j<MAX_TCP_STATE;j++)
{
plist=&(g_stream->tcpList[j]);
@@ -1989,8 +1991,6 @@ void free_thread_stream(int thread_seq)
}
}
timeouts_close(plist->streamindex_timer);
- assert(!timeouts_pending(plist->user_define_timer));
- timeouts_close(plist->user_define_timer);
}
for(j=0;j<MAX_UDP_STATE;j++)
{
@@ -2012,8 +2012,6 @@ void free_thread_stream(int thread_seq)
}
}
timeouts_close(plist->streamindex_timer);
- assert(!timeouts_pending(plist->user_define_timer));
- timeouts_close(plist->user_define_timer);
}
}
@@ -2578,7 +2576,7 @@ int polling_stream_timeout(int tid)
for(i = UDP_ONE_STATE; i < MAX_UDP_STATE; i++){
plist=&(G_MESA_GLOBAL_STREAM[tid]->udpList[i]);
if( plist->cnt > 0){
- ret = del_stream_by_time(plist, NULL);
+ ret = del_stream_by_time(plist, NULL, tid);
if(ret > 0){
has_work = POLLING_STATE_WORK;
}
@@ -2588,7 +2586,7 @@ int polling_stream_timeout(int tid)
for(i = TCP_SYN_STATE; i < MAX_TCP_STATE; i++){
plist=&(G_MESA_GLOBAL_STREAM[tid]->tcpList[i]);
if(plist->cnt > 0){ /* polling�׶ο��ܻ�û����, ����û�д�״̬����, plist����ΪNULL */
- ret = del_stream_by_time(plist, NULL);
+ ret = del_stream_by_time(plist, NULL, tid);
if(ret > 0){
has_work = POLLING_STATE_WORK;
}
diff --git a/test/test_app_sapp.c b/test/test_app_sapp.c
index 8a7885e..57b199c 100644
--- a/test/test_app_sapp.c
+++ b/test/test_app_sapp.c
@@ -130,7 +130,7 @@ static int update_hierarchical_layer_stat(const struct hierarchical_layer *strea
{
int i;
- if(search_g_hierarchical_layer_stat(stream_hierarchical_layer_stat) == 0){ //不存�? 是新的流结构
+ if(search_g_hierarchical_layer_stat(stream_hierarchical_layer_stat) == 0){ //不存�? �?新的流结�?
memcpy(&g_hierarchical_layer_stat[hierarchical_layer_stat_num], stream_hierarchical_layer_stat, sizeof(struct hierarchical_layer) * MAX_LAYER_DEPTH);
hierarchical_layer_stat_num++;
show_hierarchical_layer_stat();
@@ -486,6 +486,30 @@ char test_set_stream_timeout(struct streaminfo *pstream, void **pme, int thread
return APP_STATE_GIVEME;
}
+char test_set_stream_timer(struct streaminfo *pstream, void **pme, int thread_seq,void *a_packet)
+{
+ int ret;
+ int tout_val;
+ int opt_len;
+
+ if(pstream->pktstate== OP_STATE_PENDING){
+ tout_val = 1;
+ ret = MESA_set_stream_opt(pstream, MSO_STREAM_TIMER, &tout_val, sizeof(int));
+ if(ret < 0){
+ DPRINT("stream:%p, MESA_set_stream_opt error:\n", pstream);
+ return APP_STATE_DROPME;
+ }
+ DPRINT("stream: %p %s set timer at %llu!\n", pstream, printaddr(&pstream->addr, thread_seq), time(NULL));
+ }
+ if(pstream->pktstate == OP_STATE_TIMER){
+ DPRINT("stream timeup: %p %s at %llu!\n", pstream, printaddr(&pstream->addr, thread_seq), time(NULL));
+ }
+ if(pstream->pktstate == OP_STATE_CLOSE){
+ DPRINT("stream close: %p %s at %llu!\n", pstream, printaddr(&pstream->addr, thread_seq), time(NULL));
+ }
+ return APP_STATE_GIVEME;
+}
+
char test_sapp_get_platform_opt(struct streaminfo *pstream, void **pme, int thread_seq,void *a_packet)
{
unsigned long long totpkt, totbyte, rand_num;
@@ -1017,7 +1041,7 @@ test_set_stream_timeout(pstream, pme, thread_seq, a_packet);
}
-/* 关于网络相关字段, 均为网络�? network order */
+/* 关于网络相关字�??, 均为网络�? network order */
struct __test_inline_vxlan_hdr{
unsigned char flags;
@@ -1025,12 +1049,12 @@ struct __test_inline_vxlan_hdr{
#if 0
unsigned char reserved[3];
#else
- unsigned char nat_type; /* 复用�?个保留字�? 表示NAT类型 */
+ unsigned char nat_type; /* 复用�?�?保留字�?? 表示NAT类型 */
unsigned char reserved[2];
#endif
/*--------int delim -------*/
unsigned char vlan_id_half_high;
- unsigned char link_layer_type : 4; /* 二层报文封装格式 */
+ unsigned char link_layer_type : 4; /* 二层报文封�?�格�? */
unsigned char vlan_id_half_low : 4;
unsigned int dir : 1;
unsigned int link_id : 6;
@@ -1822,7 +1846,7 @@ static void test_inject_tcp_pkt_with_this_hdr(struct streaminfo *stream,void **p
raw_thdr = (struct mesa_tcp_hdr *)((char *)raw_ihdr + raw_ihdr->ip_hl*4);
raw_tcp_payload_len = ntohs(raw_ihdr->ip_len) - raw_ihdr->ip_hl*4 - raw_thdr->th_off * 4;
- /* 当前包是C2S方向的GET, �?要回复一个虚假的S2C方向的RESPONSE */
+ /* 当前包是C2S方向的GET, �?要回复一�?虚假的S2C方向的RESPONSE */
send_ihdr = (struct mesa_ip4_hdr *)pkt_header_payload;
send_thdr = (struct mesa_tcp_hdr *)((char *)send_ihdr + sizeof(struct mesa_ip4_hdr));