summaryrefslogtreecommitdiff
path: root/src/frag_reassembly_in.h
blob: 89ae0e61719da4987e50209181d214f263724cd7 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
#ifndef _FRAG_REASSEMBLY_IN_H
#define _FRAG_REASSEMBLY_IN_H

#include </usr/include/stdint.h>
#include <sys/queue.h>

#include 	"stream_fuzzy_hash.h"
#include 	"MESA_list_queue.h"
#include 	"MESA_timer.h"

#include 	"interval_index.h"
#include 	"hiredis.h"
#include 	"hircluster.h"
#include 	"soqav_dedup.h"

#include 	"main.h"
#include 	"sifter.h"
#include 	"frag_reassembly.h"
//#include 	"frag_voip.h"

#define		LAY_ADDR_CNT						8
#define		PID_MAX_LEN							64
#define  	SIFTER_MAX_NUM						8 	//for sifter

#define 	TD_LEN								64
#define 	REDIS_CMMD_MAXLEN					2048 /*redis�������󳤶�*/

/*��Ŀ����ǰ�˻���Ϣ��������*/
#define 	QD_MAXNUM							8	

/*log type*/
typedef enum
{
	MEDIA_NEW=0,
	MEDIA_RENEW,	
	MEDIA_RENEW_EXPIRE,
	MEDIA_EXPIRE,	
	MEDIA_MID_CHANGE,
	MEDIA_OFFSET_ZERO,	
	MEDIA_FROM_CAPIP,		
	MEDIA_FROM_CPZIP,	
	MEDIA_FROM_QDIP,	
}MEDIA_LOG_TYPE;

/*response from frag_index*/
#define CONVG_MAXNUM  	16
typedef struct frag_cnvg_s
{	
	text_t* 				opt[CONVG_MAXNUM];					 		
	int 					opt_num;	
 }frag_cnvg_t;

/*opt_unit_t is for trans*/
typedef struct opt_in_s
{
	uint32_t 				opt_len;    //the len of opt_value in CPZ
	uint8_t					opt_type; 
	char*					opt_value; 
}opt_in_t;

#define FRAG_FLAG_SEND_META		0x01	//��Ŀ�ĵ�һ�����ݰ�����Ҫ����Ԫ��Ϣ
#define FRAG_FLAG_MULTISRC		0x02    //��Դ��־�����͵�������ƿװ
#define FRAG_FLAG_WINS			0x04    //��ϵͳ��־������wins
typedef struct frag_in_s
{	
	uint64_t				mid;
	uint64_t				new_mid;    //�Ƕ�Դ�����:mid=new_mid; ��Դ�����:new_midΪ��Դ��۵�ID
	uint64_t				pid; 
	uint64_t				offset_in; 
	uint16_t				seq;
	uint64_t				offset:48;	
	char*					data;			//not copy
	uint32_t				datalen;
	uint32_t				multisrc_bizmanip;	  //��Դ�ۺϵ�IP	
//    int64_t 				create_time;	
	uint32_t				src_ip;	
	int 					thread_seq;	//record thread_seq when converge is not complete
	int 					frag_flag;	//�����ݰ��ı�־			
//	uint8_t					media_type;		
//	uint8_t					proto;		
}frag_in_t;

/*netdisk and other frag*/
#define	FRAG_UNIT_INFO_NUM	6	
typedef enum
{
	FRAG_UNIT_ID=0,				//from frag_sifter
	FRAG_UNIT_ABOFFSET=1,		//maybe from index info or frag_sifter
	FRAG_UNIT_REOFFSET=2, 		//maybe from index info or frag_sifter
	MEDIA_ID=3,					//maybe from index info or frag_sifter
	MEDIA_SIZE=4,				//maybe from index info or frag_sifter
	MEDIA_NAME=5,				//maybe from index info or frag_sifter		
}FRAG_UNIT_INFO;

/*frag_stat ���ε�״̬*/
#define 	STAT_INIT								0x00
#define 	STAT_CNVG_QUERY							0x01  //query but not result
#define 	STAT_CNVG_OK							0x02
#define 	STAT_INDEX_QUERY						0x03
#define 	STAT_OK									0x04

/* frag_unit_t  flag*/
#define 	FRAG_UNIT_ADDMEDIA						0x01
#define 	FRAG_UNIT_MONITOR			 			0x02	
#define 	FRAG_UNIT_OFFSET						0x04	
#define 	FRAG_UNIT_RECORD						0x08	
#define 	FRAG_UNIT_MULTI							0x10	

#define		WEBMAIL_CONT_INFO_NUM					10

/*SIPѡ�����*/
typedef enum
{
        SIP_HMGET_INDEX,//NOT SAVE OPT
        SIP_KEY_INDEX,//NOT SAVE OPT
        SIP_DATA_FLAG_INDEX,//NOT SAVE OPT
        SIP_RTP_4TUPLE_OPT_INDEX,//NOT SAVE OPT
        SIP_URI_OPT_INDEX,

        SIP_FROM_OPT_INDEX,
        SIP_TO_OPT_INDEX,
        SIP_SGATEWAY_OPT_INDEX,//NOT NEED SENDBACK
        SIP_CGATEWAY_OPT_INDEX,//NOT NEED SENDBACK
        SIP_DURATION_OPT_INDEX,

        SIP_S_CODING_OPT_INDEX,
        SIP_C_CODING_OPT_INDEX,
        SIP_FROM_TAGS_OPT_INDEX,
        SIP_TO_TAGS_OPT_INDEX,
        SIP_CALL_ID_OPT_INDEX,

        SIP_CSEQ_OPT_INDEX,
        SIP_C_CONTACT_OPT_INDEX,
        SIP_S_CONTACT_OPT_INDEX,
        SIP_USERAGENT_OPT_INDEX,
        SIP_SERVER_OPT_INDEX,//NOT NEED SENDBACK

        SIP_C_CRYPTO_OPT_INDEX,//NOT NEED SENDBACK
        SIP_C_INLINE_OPT_INDEX,//NOT NEED SENDBACK
        SIP_S_CRYPTO_OPT_INDEX,//NOT NEED SENDBACK
        SIP_S_INLINE_OPT_INDEX,//NOT NEED SENDBACK
        SIP_RINGING_OPT_INDEX,//NOT NEED SENDBACK

        SIP_REASON_OPT_INDEX,
        SIP_SIP_4TUPLE_OPT_INDEX,
        SIP_S_VIA_OPT_INDEX,
        SIP_S_RECORD_ROUTES_OPT_INDEX,
        SIP_S_ROUTE_OPT_INDEX,

        SIP_C_VIA_OPT_INDEX,
        SIP_C_RECORD_ROUTES_OPT_INDEX,
        SIP_C_ROUTE_OPT_INDEX,
        SIP_RESCODE_OPT_INDEX,
        SIP_CAPIP_OPT_INDEX,

        SIP_OPT_NUM,
}sip_opt_index;
//#define 	SIP_OPT_NUM								33
/*SIP��ѯredis���������*/
//#define 	SIP_REDIS_CMMD_NUM						35
/*SIP��ѡ��*/
typedef struct sip_opt_s
{
	char	opt_name[32];
	int		opt_type;     /*��ȫ����־�����ѡ�����ͣ�����JC��־�����ѡ������*/
}sip_opt_t;

typedef struct qd_info_s
{
	uint64_t	mid;	
	uint32_t	cap_ip;
}qd_info_t;

typedef enum
{
	MEDIA_OPT_URL=0,		
	MEDIA_OPT_PID,
	MEDIA_OPT_ADDR,	
	MEDIA_OPT_SINGLE_KEY,
	MEDIA_OPT_UA,
	MEDIA_OPT_REFERER,
	MEDIA_OPT_ETAG,
	MEDIA_OPT_LAST_MODIFY,
	MEDIA_OPT_SERVER,
	MEDIA_OPT_TD_META,	
	MEDIA_OPT_OFFSET,
	MEDIA_OPT_C2S_CONT_TYPE,	
	MEDIA_OPT_S2C_CONT_TYPE,
	MEDIA_OPT_CAP_IP,
	MEDIA_OPT_PROTOCOL,
	
	MEDIA_OPT_INDEX_URL,
	MEDIA_OPT_INDEX_REFERER,
	MEDIA_OPT_INDEX_UA,	
	MEDIA_OPT_FD_SUBSTR,	
	
	MEDIA_OPT_S_IP,
	MEDIA_OPT_S_PORT,
	MEDIA_OPT_C_IP,
	MEDIA_OPT_C_PORT,
	
	MEDIA_OPT_MAXNUN,
}MEDIA_OPT;

typedef struct frag_unit_s
{	
	MESA_lqueue_head		frg_cnvg_lq;	
	MESA_lqueue_head		frg_index_lq;
	text_t** 				frg_info; 					 //useful info after sifter

	text_t* 				text;						 //from media_info opt to sifter
	int 					text_num;	
	int 					thread_seq;	

	uint64_t				pid;					 	 //from media_info
	uint64_t				media_len;					 //from media_info		
	       		
	uint32_t 				capIP;						//��ƿװ�������ݵ�ǰ�˻���IP
	uint32_t 				src_ip;	

#if K_PROJECT
	int						hitservice;   				 //from media_info	
#else
	uint8_t 				hitservice; 				 //from media_info	
	uint8_t					pad[3];
#endif
    uint8_t   				data_flag;					 //from media_info	
    int8_t   				flag;						 //from media_info	
	uint8_t					media_type;			     	 //from media_info		
	uint8_t					proto;						 //from data	
	
	qd_info_t 				qd_info_from_cpz[QD_MAXNUM];			  //ǰ�˻�������Ϣ
	opt_in_t* 				opt[MEDIA_OPT_MAXNUN];				  		   
	uint64_t				content_length;				  //from media_info opt 
	
	uint64_t				mid;	
	uint64_t				re_offset;		
	uint64_t				ab_offset;
	uint64_t				ab_offset_for_in;
	uint64_t				ab_offset_for_mime;			  //mime extract data, update offset

	int 					service_id;		
    uint32_t   				mediainfo_cnt;			
	
	uint8_t					repeat_not_proc;			  //such hls, osmf , same reoffset		
	uint8_t					frag_state; 	
	uint8_t					multi_flag;  	
    uint8_t 				qd_info_from_cpz_idx_last;

	opt_in_t*				sip_diadata_ID; 			//�洢META_OPT_SIP_DIADATA_ID 
	opt_in_t*				sip_data_dir; 				//�洢META_OPT_SIP_DATA_DIR
	opt_in_t*				sip_rate_info; 				//�洢META_OPT_SIP_DATA_DIR	
	opt_in_t*				sip_opt[SIP_OPT_NUM]; 	
}frag_unit_t;

#define INFO_MEDIA_NUM			1	
#define PID_MAXNUM				256	
#define	KEEP_REOFFSET_MAXNUM	2048

/*��Դ��ѯ���ؽ��ǰ�Ļ������*/
struct queue_item
{
	void* 						node;
	int							thread_seq;
	TAILQ_ENTRY(queue_item) 	entries;
};

TAILQ_HEAD(qw_queue, queue_item);

#define	AUDIO_WINS_DISABLE		0x01
#define	VEDIO_WINS_DISABLE		0x02
	
#define	SIP_SURVEY_TYPE_FD		0x01
#define	SIP_SURVEY_TYPE_JC		0x02
#define	SIP_SURVEY_TYPE_FD_JC	0x03

#define	MEDIA_SERVICE_TYPE_AV	0x00
#define	MEDIA_SERVICE_TYPE_FRAG	0x01
#define	MEDIA_SERVICE_TYPE_SIP	0x02
#define	MEDIA_SERVICE_TYPE_APP	0x03
#define	MEDIA_SERVICE_TYPE_PIC	0x04

/*ÿ����Ŀ�������߳�*/
#define OPT_MAXNUN 				256
typedef struct media_s
{		
	IVI_t*					ivi;							//complete frag removal
	IVI_t*                  save_ivi;
	sfh_instance_t*         fuzzy;	
	uint64_t				fuzzy_acc_len;             		//SFH_feed acc len	
	
	MESA_lqueue_head		app_frg_lq;						//app���ݻ���

	text_t* 				media_info[INFO_MEDIA_NUM]; 	//MEDIA_INFO_TYPE		
	
	uint64_t				mid_after_multisrc;			    //��Դ֮��mid���ܱ��޸�
	uint64_t				mid;							//ǰ�˷��͵Ľ�ĿID
	uint64_t*				pid;							//PID_MAXNUM, need when media_json 
    int64_t 				create_time;	
    int64_t 				renew_time;	
    int64_t 				lastpkt_time;	
    int64_t 				dedup_query_time;	

	char					td[TD_LEN];						
	char*					td_data;	
	uint32_t				td_datalen;		
	uint32_t				addrlist_len;					//for app
	char*					addrlist;
	
	opt_in_t* 				opt[MEDIA_OPT_MAXNUN][OPT_MAXNUN];	
	int						opt_index;
	int						url_opt_index;					//����url�����ڵ�����λ��

	struct qw_queue 		query_wait_lq;				 	//�ȴ����ض�Դ��ѯ�Ľ��
	MESA_timer_index_t* 	timer_idx;
	MESA_timer_index_t* 	index_query_timer_idx;

	uint64_t				byte_in;	
    uint64_t				byte_proc;		
    uint32_t				pkt_in;		
    uint32_t				pkt_proc;
	
	uint64_t				maxoffset;			 			
	uint64_t				media_len;					 	//from more frag_unit 	

#if K_PROJECT
    int	 					hit_service;					//monitor flag   
#else
	uint8_t 				hit_service;					//monitor flag	 
	uint8_t 				pad[3];					//monitor flag	 
#endif
	uint8_t					proto;							 //from data, from frag_unit
	uint8_t					media_type;			     	 	//from media_info, from frag_unit	
    uint8_t 				data_flag;						//monitor flag       
    int8_t 					meta_flag;						

	int8_t 					flag;							// PROG_FLAG_EXCP  PROG_FLAG_DUMP  
	uint8_t 				qdinfo_idx_last;
    uint8_t 				pid_idx_last;		
    uint8_t 				repeat_reoffset_idx_last;
	
	uint64_t				acc_offset;                 	//HLS acc offset	
	int64_t					repeat_reoffset[KEEP_REOFFSET_MAXNUM];  //HLS reoffset that have proc,��ౣ��KEEP_REOFFSET_MAXNUM��Ƭ���ظ��Ľ���ȥ��

	qd_info_t 				qd_info[QD_MAXNUM];	   			//����ǰ�˻�����mid��capIP��Ӧ��ϵ
	qd_info_t 				qd_info_from_cpz[QD_MAXNUM];	//��Դ�����£�������ƴװ��ǰ����Ϣ

	uint32_t				frag_unit_cnt;					//for pid
	uint32_t				configID;
	uint32_t				multisrc_bizmanip;
	int 					thread_seq;	                	//the first frag_unit thread_seq		
	
    char 					wins_dest_disabled_bit;			//stop send to subsystem,equal to MAX_EXCP_PORT
	uint8_t					media_service_type;			  	//HLS or OSMF...  SIP AV
	int8_t					td_query;			  			//0:init 
	int8_t					cache_flag;			  			
	uint8_t					td_complete;			  		//0:init 	
    uint8_t 				qdinfo_from_cpz_idx_last;	

    uint8_t 				sip_survey_type;				//SIP_SURVEY_TYPE_FD SIP_SURVEY_TYPE_JC SIP_SURVEY_TYPE_FD_JC
   //uint8_t 				sip_sendlog_flag;				
	opt_in_t*				sip_opt[SIP_OPT_NUM];			//opttype is fulllog type
	opt_in_t*				sip_rate_info; 				
	opt_in_t*				sip_diadata_ID; 				
	uint64_t				re_offset;						
	char*					fd_buf;								
	char*					jc_buf;								
	uint32_t				fd_buflen;								
	uint32_t				jc_buflen;	

	/*������*/
    int64_t 				survey_time;	
	uint32_t				cfg_id;					//config ID
	uint8_t					service;	
	char					level;					//the level of check result 	
	char					log_url[512];
	char* 	 				monitor_path;
	//char					frag_bitmap[KEEP_REOFFSET_MAXNUM/8+1];			//bitmap for frag removal
}media_t;

/*recv : add_media_info for cnvg_hash*/
typedef struct rssb_media_info_s
{	
	uint64_t 				pid;
	uint64_t 				media_len;
	char* 					opt; 
	
	int 					opt_num;
	int 					thread_seq;	
	
	uint32_t				cap_IP;
	uint32_t				src_ip;	
	
#if K_PROJECT
	int						hitservice;
#else
	uint8_t 				hitservice;	
	uint8_t 				pad[3];
#endif
	uint8_t					media_type;
	uint8_t					protocol;
	uint8_t					data_flag;
	int8_t					flag;
}rssb_media_info_t;

typedef struct frag_reassembly_s
{	
	void*					sifter;
	void*					logger;
	void*					media_logger;

	MESA_htable_handle      converge_hash;  /*frag ��Ƭ�ỰHASH*/	
	
	MESA_lqueue_head*  		wait_lq; 	
	
	uint16_t 				lq_num;
	uint16_t				logger_level;		
	
	uint32_t				cnvg_hash_thread_safe;
	uint32_t				cnvg_hash_size;
	uint32_t				cnvg_hash_max_elem_num; 
	uint32_t				cnvg_hash_expire_time;	

	/*stat**/
	uint32_t				sysinfo_interval;	
	uint32_t				stat_interval;	

	void*					sysinfo_handle;
	void*					stat_handle;
	
	/*sysinfo*/
	uint64_t                stat_info[RSSB_LOG_TYPE_MAXNUM];    
	uint64_t				data_info[RSSB_DATALOG_TYPE_MAXNUM][LOG_STAT_MAXNUM];	
	uint64_t				sysinfo_stat[RSSB_SYSLOG_TYPE_MAXNUM][SYSLOG_STAT_MAXNUM];	

	int						log_field_id[RSSB_LOG_TYPE_MAXNUM];	
	int						datalog_column_id[LOG_STAT_MAXNUM];	
	int						datalog_line_id[RSSB_DATALOG_TYPE_MAXNUM];
	int						syslog_column_id[SYSLOG_STAT_MAXNUM];	
	int						syslog_line_id[RSSB_SYSLOG_TYPE_MAXNUM];	

	/*redis*/
	struct timeval 			redis_tv;
	char					redis_addr[20480];
	redisClusterContext* 	redis_cluster_ctx[MAX_THREAD_NUM];
	redisContext* 			redis_ctx[MAX_THREAD_NUM];	
	int						redis_cluster_switch;	
	char					redis_ip[32];
	int						redis_port;
	
	uint32_t				wait_lq_num;
}frag_reassembly_t;

/*printaddr ��ʽ, used in app*/
typedef struct __touple4_type
{
	char 					sip[64];
	char 					dip[64];
	unsigned short 			sport;
	unsigned short 			dport;
	int 					addr_type;
}touple4_type_t;

#define FRAG_CONTAIN_MAXNUM		8	                    //ȥ��:��ȫ���ǵ�frag��������ȥ��
/*for create_media  because IVI */
typedef struct frag_ivi_info_s
{	
	frag_unit_t*			frg_unit;
	frag_in_t*				frg;
	//query_detail_t*		query_detail;
	frag_in_t* 				frg_array[FRAG_CONTAIN_MAXNUM];//frag������Ϊȥ�ر����Ϊ���frag	
	//char					td_result[TD_LEN];
	uint64_t				mid;
	int						frg_array_cnt;
	uint8_t					td_query;					//TD_QUERY_TYPE_MULTISRC TD_QUERY_TYPE_DEDUP
	uint8_t					thread_seq;					//for av dedup
}frag_ivi_info_t;


#ifdef __cplusplus
extern "C" {
#endif
int 		frag_add_wait_lq(frag_ivi_info_t* frag_ivi_info, uint32_t frag_stat, int thread_seq);
void 		init_frag_unit(frag_unit_t* frg_unit, uchar protocol);
int 		media_create(frag_unit_t* frg_unit);
int 		frag_service(frag_ivi_info_t* frag_ivi_info, uint32_t src_ip, int thread_seq);
void 		free_opt(opt_in_t** data);
void 		free_text(text_t** pp, int n_p);
int 		free_frag_in(void *data, long data_len, void *arg);
uint64_t 	make_mid(char * key, unsigned int size, unsigned char type);
void 		free_frag_unit(void* data);
void 		free_media(void* data);
int 		expire_media_hash_node(void *data, int eliminate_type);

#ifdef __cplusplus
}
#endif


#endif